This is an automated email from the ASF dual-hosted git repository. mgreber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit ae4a3a9b88a9b6016b1221f4f7cd0e41772a65ac Author: Alexey Serbin <[email protected]> AuthorDate: Mon Aug 12 22:50:46 2024 -0700 [cfile] modernize CFileWriter * use PREDICT_FALSE for data corruption conditions * fix const-correctness of the CFileWriter::block() method * other minor updates This patch doesn't contain any functional modifications. Change-Id: I6038de39f912b054320b97af11145627c68ba00a Reviewed-on: http://gerrit.cloudera.org:8080/21670 Reviewed-by: Zoltan Martonka <[email protected]> Tested-by: Marton Greber <[email protected]> Reviewed-by: Marton Greber <[email protected]> --- src/kudu/cfile/cfile_writer.cc | 115 ++++++++++++++++++++++++++--------------- src/kudu/cfile/cfile_writer.h | 44 ++-------------- 2 files changed, 77 insertions(+), 82 deletions(-) diff --git a/src/kudu/cfile/cfile_writer.cc b/src/kudu/cfile/cfile_writer.cc index af1e3b198..8bf2e9011 100644 --- a/src/kudu/cfile/cfile_writer.cc +++ b/src/kudu/cfile/cfile_writer.cc @@ -40,6 +40,7 @@ #include "kudu/common/types.h" #include "kudu/gutil/port.h" #include "kudu/util/array_view.h" // IWYU pragma: keep +#include "kudu/util/bitmap.h" #include "kudu/util/coding.h" #include "kudu/util/coding-inl.h" #include "kudu/util/compression/compression_codec.h" @@ -49,6 +50,7 @@ #include "kudu/util/hexdump.h" #include "kudu/util/logging.h" #include "kudu/util/pb_util.h" +#include "kudu/util/rle-encoding.h" DEFINE_int32(cfile_default_block_size, 256*1024, "The default block size to use in cfiles"); TAG_FLAG(cfile_default_block_size, advanced); @@ -66,7 +68,6 @@ using kudu::fs::BlockCreationTransaction; using kudu::fs::BlockManager; using kudu::fs::WritableBlock; using std::accumulate; -using std::pair; using std::string; using std::unique_ptr; using std::vector; @@ -81,6 +82,41 @@ const size_t kChecksumSize = sizeof(uint32_t); static const size_t kMinBlockSize = 512; +class NullBitmapBuilder { + public: + explicit NullBitmapBuilder(size_t initial_row_capacity) + : nitems_(0), + bitmap_(BitmapSize(initial_row_capacity)), + rle_encoder_(&bitmap_, 1) { + } + + size_t nitems() const { + return nitems_; + } + + // If value parameter is true, it means that all values in this run are null + void AddRun(bool value, size_t run_length = 1) { + nitems_ += run_length; + rle_encoder_.Put(value, run_length); + } + + // the returned Slice is only valid until this Builder is destroyed or Reset + Slice Finish() { + int len = rle_encoder_.Flush(); + return Slice(bitmap_.data(), len); + } + + void Reset() { + nitems_ = 0; + rle_encoder_.Clear(); + } + + private: + size_t nitems_; + faststring bitmap_; + RleEncoder<bool> rle_encoder_; +}; + //////////////////////////////////////////////////////////// // CFileWriter //////////////////////////////////////////////////////////// @@ -90,17 +126,17 @@ CFileWriter::CFileWriter(WriterOptions options, const TypeInfo* typeinfo, bool is_nullable, unique_ptr<WritableBlock> block) - : options_(std::move(options)), - block_(std::move(block)), - off_(0), - value_count_(0), - is_nullable_(is_nullable), - typeinfo_(typeinfo), - state_(kWriterInitialized) { + : options_(std::move(options)), + block_(std::move(block)), + off_(0), + value_count_(0), + is_nullable_(is_nullable), + typeinfo_(typeinfo), + state_(kWriterInitialized) { EncodingType encoding = options_.storage_attributes.encoding; Status s = TypeEncodingInfo::Get(typeinfo_, encoding, &type_encoding_info_); - if (!s.ok()) { - // TODO: we should somehow pass some contextual info about the + if (PREDICT_FALSE(!s.ok())) { + // TODO(af): we should somehow pass some contextual info about the // tablet here. WARN_NOT_OK(s, "Falling back to default encoding"); s = TypeEncodingInfo::Get(typeinfo, @@ -117,7 +153,7 @@ CFileWriter::CFileWriter(WriterOptions options, if (options_.storage_attributes.cfile_block_size <= 0) { options_.storage_attributes.cfile_block_size = FLAGS_cfile_default_block_size; } - if (options_.storage_attributes.cfile_block_size < kMinBlockSize) { + if (PREDICT_FALSE(options_.storage_attributes.cfile_block_size < kMinBlockSize)) { LOG(WARNING) << "Configured block size " << options_.storage_attributes.cfile_block_size << " smaller than minimum allowed value " << kMinBlockSize << ": using minimum."; @@ -156,7 +192,7 @@ Status CFileWriter::Start() { CFileHeaderPB header; FlushMetadataToPB(header.mutable_metadata()); - uint32_t pb_size = header.ByteSizeLong(); + const uint32_t pb_size = header.ByteSizeLong(); faststring header_str; // First the magic. @@ -273,21 +309,19 @@ void CFileWriter::AddMetadataPair(const Slice& key, const Slice& value) { } string CFileWriter::GetMetaValueOrDie(Slice key) const { - typedef pair<string, string> ss_pair; - for (const ss_pair& entry : unflushed_metadata_) { - if (Slice(entry.first) == key) { - return entry.second; + for (const auto& [m_key, m_val] : unflushed_metadata_) { + if (Slice(m_key) == key) { + return m_val; } } LOG(FATAL) << "Missing metadata entry: " << KUDU_REDACT(key.ToDebugString()); } void CFileWriter::FlushMetadataToPB(RepeatedPtrField<FileMetadataPairPB>* field) { - typedef pair<string, string> ss_pair; - for (const ss_pair& entry : unflushed_metadata_) { + for (const auto& [key, val] : unflushed_metadata_) { FileMetadataPairPB* pb = field->Add(); - pb->set_key(entry.first); - pb->set_value(entry.second); + pb->set_key(key); + pb->set_value(val); } unflushed_metadata_.clear(); } @@ -354,11 +388,8 @@ Status CFileWriter::AppendNullableEntries(const uint8_t* bitmap, } Status CFileWriter::FinishCurDataBlock() { - uint32_t num_elems_in_block = data_block_->Count(); - if (is_nullable_) { - num_elems_in_block = non_null_bitmap_builder_->nitems(); - } - + const uint32_t num_elems_in_block = + is_nullable_ ? non_null_bitmap_builder_->nitems() : data_block_->Count(); if (PREDICT_FALSE(num_elems_in_block == 0)) { return Status::OK(); } @@ -423,8 +454,8 @@ Status CFileWriter::AppendRawBlock(vector<Slice> data_slices, BlockPointer ptr; Status s = AddBlock(std::move(data_slices), &ptr, name_for_log); - if (!s.ok()) { - LOG(WARNING) << "Unable to append block to file: " << s.ToString(); + if (PREDICT_FALSE(!s.ok())) { + LOG(ERROR) << "Unable to append block to file: " << s.ToString(); return s; } @@ -444,30 +475,29 @@ Status CFileWriter::AppendRawBlock(vector<Slice> data_slices, if (options_.optimize_index_keys) { GetSeparatingKey(validx_prev, &idx_key); } - VLOG(1) << "Appending validx entry\n" << - kudu::HexDump(idx_key); - s = validx_builder_->Append(idx_key, ptr); - if (!s.ok()) { - LOG(WARNING) << "Unable to append to value index: " << s.ToString(); + VLOG(1) << "Appending validx entry\n" << kudu::HexDump(idx_key); + Status s = validx_builder_->Append(idx_key, ptr); + if (PREDICT_FALSE(!s.ok())) { + LOG(ERROR) << "Unable to append to value index: " << s.ToString(); return s; } } - return s; + return Status::OK(); } Status CFileWriter::AddBlock(vector<Slice> data_slices, BlockPointer* block_ptr, const char* name_for_log) { - uint64_t start_offset = off_; + const uint64_t start_offset = off_; vector<Slice> out_slices; if (block_compressor_ != nullptr) { // Write compressed block Status s = block_compressor_->Compress(std::move(data_slices), &out_slices); - if (!s.ok()) { - LOG(WARNING) << "Unable to compress block at offset " << off_ - << ": " << s.ToString(); + if (PREDICT_FALSE(!s.ok())) { + LOG(ERROR) << "Unable to compress block at offset " << start_offset + << ": " << s.ToString(); return s; } } else { @@ -487,7 +517,8 @@ Status CFileWriter::AddBlock(vector<Slice> data_slices, RETURN_NOT_OK(WriteRawData(out_slices)); - uint64_t total_size = off_ - start_offset; + DCHECK_GT(off_, start_offset); + const uint64_t total_size = off_ - start_offset; *block_ptr = BlockPointer(start_offset, total_size); VLOG(1) << "Appended " << name_for_log @@ -501,10 +532,10 @@ Status CFileWriter::WriteRawData(const vector<Slice>& data) { return sum + curr.size(); }); Status s = block_->AppendV(data); - if (!s.ok()) { - LOG(WARNING) << "Unable to append data of size " - << data_size << " at offset " << off_ - << ": " << s.ToString(); + if (PREDICT_FALSE(!s.ok())) { + LOG(ERROR) << "Unable to append data of size " + << data_size << " at offset " << off_ + << ": " << s.ToString(); } off_ += data_size; return s; diff --git a/src/kudu/cfile/cfile_writer.h b/src/kudu/cfile/cfile_writer.h index b5d6b61fa..3c36efb03 100644 --- a/src/kudu/cfile/cfile_writer.h +++ b/src/kudu/cfile/cfile_writer.h @@ -28,10 +28,8 @@ #include "kudu/fs/block_id.h" #include "kudu/fs/block_manager.h" #include "kudu/gutil/macros.h" -#include "kudu/util/bitmap.h" #include "kudu/util/compression/compression.pb.h" #include "kudu/util/faststring.h" -#include "kudu/util/rle-encoding.h" #include "kudu/util/slice.h" #include "kudu/util/status.h" @@ -54,43 +52,10 @@ extern const char kMagicStringV2[]; extern const int kMagicLength; extern const size_t kChecksumSize; -class NullBitmapBuilder { - public: - explicit NullBitmapBuilder(size_t initial_row_capacity) - : nitems_(0), - bitmap_(BitmapSize(initial_row_capacity)), - rle_encoder_(&bitmap_, 1) { - } - - size_t nitems() const { - return nitems_; - } - - // If value parameter is true, it means that all values in this run are null - void AddRun(bool value, size_t run_length = 1) { - nitems_ += run_length; - rle_encoder_.Put(value, run_length); - } - - // the returned Slice is only valid until this Builder is destroyed or Reset - Slice Finish() { - int len = rle_encoder_.Flush(); - return Slice(bitmap_.data(), len); - } - - void Reset() { - nitems_ = 0; - rle_encoder_.Clear(); - } - - private: - size_t nitems_; - faststring bitmap_; - RleEncoder<bool> rle_encoder_; -}; +class NullBitmapBuilder; // Main class used to write a CFile. -class CFileWriter { +class CFileWriter final { public: explicit CFileWriter(WriterOptions options, const TypeInfo* typeinfo, @@ -166,7 +131,7 @@ class CFileWriter { std::string ToString() const { return block_->id().ToString(); } - fs::WritableBlock* block() const { return block_.get(); } + const fs::WritableBlock* block() const { return block_.get(); } // Wrapper for AddBlock() to append the dictionary block to the end of a Cfile. Status AppendDictBlock(std::vector<Slice> data_slices, @@ -218,7 +183,7 @@ class CFileWriter { faststring tmp_buf_; // Metadata which has been added to the writer but not yet flushed. - std::vector<std::pair<std::string, std::string> > unflushed_metadata_; + std::vector<std::pair<std::string, std::string>> unflushed_metadata_; std::unique_ptr<BlockBuilder> data_block_; std::unique_ptr<IndexTreeBuilder> posidx_builder_; @@ -236,6 +201,5 @@ class CFileWriter { DISALLOW_COPY_AND_ASSIGN(CFileWriter); }; - } // namespace cfile } // namespace kudu
