This is an automated email from the ASF dual-hosted git repository.

mgreber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit ae4a3a9b88a9b6016b1221f4f7cd0e41772a65ac
Author: Alexey Serbin <[email protected]>
AuthorDate: Mon Aug 12 22:50:46 2024 -0700

    [cfile] modernize CFileWriter
    
      * use PREDICT_FALSE for data corruption conditions
      * fix const-correctness of the CFileWriter::block() method
      * other minor updates
    
    This patch doesn't contain any functional modifications.
    
    Change-Id: I6038de39f912b054320b97af11145627c68ba00a
    Reviewed-on: http://gerrit.cloudera.org:8080/21670
    Reviewed-by: Zoltan Martonka <[email protected]>
    Tested-by: Marton Greber <[email protected]>
    Reviewed-by: Marton Greber <[email protected]>
---
 src/kudu/cfile/cfile_writer.cc | 115 ++++++++++++++++++++++++++---------------
 src/kudu/cfile/cfile_writer.h  |  44 ++--------------
 2 files changed, 77 insertions(+), 82 deletions(-)

diff --git a/src/kudu/cfile/cfile_writer.cc b/src/kudu/cfile/cfile_writer.cc
index af1e3b198..8bf2e9011 100644
--- a/src/kudu/cfile/cfile_writer.cc
+++ b/src/kudu/cfile/cfile_writer.cc
@@ -40,6 +40,7 @@
 #include "kudu/common/types.h"
 #include "kudu/gutil/port.h"
 #include "kudu/util/array_view.h" // IWYU pragma: keep
+#include "kudu/util/bitmap.h"
 #include "kudu/util/coding.h"
 #include "kudu/util/coding-inl.h"
 #include "kudu/util/compression/compression_codec.h"
@@ -49,6 +50,7 @@
 #include "kudu/util/hexdump.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/pb_util.h"
+#include "kudu/util/rle-encoding.h"
 
 DEFINE_int32(cfile_default_block_size, 256*1024, "The default block size to 
use in cfiles");
 TAG_FLAG(cfile_default_block_size, advanced);
@@ -66,7 +68,6 @@ using kudu::fs::BlockCreationTransaction;
 using kudu::fs::BlockManager;
 using kudu::fs::WritableBlock;
 using std::accumulate;
-using std::pair;
 using std::string;
 using std::unique_ptr;
 using std::vector;
@@ -81,6 +82,41 @@ const size_t kChecksumSize = sizeof(uint32_t);
 
 static const size_t kMinBlockSize = 512;
 
+class NullBitmapBuilder {
+ public:
+  explicit NullBitmapBuilder(size_t initial_row_capacity)
+      : nitems_(0),
+        bitmap_(BitmapSize(initial_row_capacity)),
+        rle_encoder_(&bitmap_, 1) {
+  }
+
+  size_t nitems() const {
+    return nitems_;
+  }
+
+  // If value parameter is true, it means that all values in this run are null
+  void AddRun(bool value, size_t run_length = 1) {
+    nitems_ += run_length;
+    rle_encoder_.Put(value, run_length);
+  }
+
+  // the returned Slice is only valid until this Builder is destroyed or Reset
+  Slice Finish() {
+    int len = rle_encoder_.Flush();
+    return Slice(bitmap_.data(), len);
+  }
+
+  void Reset() {
+    nitems_ = 0;
+    rle_encoder_.Clear();
+  }
+
+ private:
+  size_t nitems_;
+  faststring bitmap_;
+  RleEncoder<bool> rle_encoder_;
+};
+
 ////////////////////////////////////////////////////////////
 // CFileWriter
 ////////////////////////////////////////////////////////////
@@ -90,17 +126,17 @@ CFileWriter::CFileWriter(WriterOptions options,
                          const TypeInfo* typeinfo,
                          bool is_nullable,
                          unique_ptr<WritableBlock> block)
-  : options_(std::move(options)),
-    block_(std::move(block)),
-    off_(0),
-    value_count_(0),
-    is_nullable_(is_nullable),
-    typeinfo_(typeinfo),
-    state_(kWriterInitialized) {
+    : options_(std::move(options)),
+      block_(std::move(block)),
+      off_(0),
+      value_count_(0),
+      is_nullable_(is_nullable),
+      typeinfo_(typeinfo),
+      state_(kWriterInitialized) {
   EncodingType encoding = options_.storage_attributes.encoding;
   Status s = TypeEncodingInfo::Get(typeinfo_, encoding, &type_encoding_info_);
-  if (!s.ok()) {
-    // TODO: we should somehow pass some contextual info about the
+  if (PREDICT_FALSE(!s.ok())) {
+    // TODO(af): we should somehow pass some contextual info about the
     // tablet here.
     WARN_NOT_OK(s, "Falling back to default encoding");
     s = TypeEncodingInfo::Get(typeinfo,
@@ -117,7 +153,7 @@ CFileWriter::CFileWriter(WriterOptions options,
   if (options_.storage_attributes.cfile_block_size <= 0) {
     options_.storage_attributes.cfile_block_size = 
FLAGS_cfile_default_block_size;
   }
-  if (options_.storage_attributes.cfile_block_size < kMinBlockSize) {
+  if (PREDICT_FALSE(options_.storage_attributes.cfile_block_size < 
kMinBlockSize)) {
     LOG(WARNING) << "Configured block size " << 
options_.storage_attributes.cfile_block_size
                  << " smaller than minimum allowed value " << kMinBlockSize
                  << ": using minimum.";
@@ -156,7 +192,7 @@ Status CFileWriter::Start() {
   CFileHeaderPB header;
   FlushMetadataToPB(header.mutable_metadata());
 
-  uint32_t pb_size = header.ByteSizeLong();
+  const uint32_t pb_size = header.ByteSizeLong();
 
   faststring header_str;
   // First the magic.
@@ -273,21 +309,19 @@ void CFileWriter::AddMetadataPair(const Slice& key, const 
Slice& value) {
 }
 
 string CFileWriter::GetMetaValueOrDie(Slice key) const {
-  typedef pair<string, string> ss_pair;
-  for (const ss_pair& entry : unflushed_metadata_) {
-    if (Slice(entry.first) == key) {
-      return entry.second;
+  for (const auto& [m_key, m_val] : unflushed_metadata_) {
+    if (Slice(m_key) == key) {
+      return m_val;
     }
   }
   LOG(FATAL) << "Missing metadata entry: " << KUDU_REDACT(key.ToDebugString());
 }
 
 void CFileWriter::FlushMetadataToPB(RepeatedPtrField<FileMetadataPairPB>* 
field) {
-  typedef pair<string, string> ss_pair;
-  for (const ss_pair& entry : unflushed_metadata_) {
+  for (const auto& [key, val] : unflushed_metadata_) {
     FileMetadataPairPB* pb = field->Add();
-    pb->set_key(entry.first);
-    pb->set_value(entry.second);
+    pb->set_key(key);
+    pb->set_value(val);
   }
   unflushed_metadata_.clear();
 }
@@ -354,11 +388,8 @@ Status CFileWriter::AppendNullableEntries(const uint8_t* 
bitmap,
 }
 
 Status CFileWriter::FinishCurDataBlock() {
-  uint32_t num_elems_in_block = data_block_->Count();
-  if (is_nullable_) {
-    num_elems_in_block = non_null_bitmap_builder_->nitems();
-  }
-
+  const uint32_t num_elems_in_block =
+      is_nullable_ ? non_null_bitmap_builder_->nitems() : data_block_->Count();
   if (PREDICT_FALSE(num_elems_in_block == 0)) {
     return Status::OK();
   }
@@ -423,8 +454,8 @@ Status CFileWriter::AppendRawBlock(vector<Slice> 
data_slices,
 
   BlockPointer ptr;
   Status s = AddBlock(std::move(data_slices), &ptr, name_for_log);
-  if (!s.ok()) {
-    LOG(WARNING) << "Unable to append block to file: " << s.ToString();
+  if (PREDICT_FALSE(!s.ok())) {
+    LOG(ERROR) << "Unable to append block to file: " << s.ToString();
     return s;
   }
 
@@ -444,30 +475,29 @@ Status CFileWriter::AppendRawBlock(vector<Slice> 
data_slices,
     if (options_.optimize_index_keys) {
       GetSeparatingKey(validx_prev, &idx_key);
     }
-    VLOG(1) << "Appending validx entry\n" <<
-            kudu::HexDump(idx_key);
-    s = validx_builder_->Append(idx_key, ptr);
-    if (!s.ok()) {
-      LOG(WARNING) << "Unable to append to value index: " << s.ToString();
+    VLOG(1) << "Appending validx entry\n" << kudu::HexDump(idx_key);
+    Status s = validx_builder_->Append(idx_key, ptr);
+    if (PREDICT_FALSE(!s.ok())) {
+      LOG(ERROR) << "Unable to append to value index: " << s.ToString();
       return s;
     }
   }
 
-  return s;
+  return Status::OK();
 }
 
 Status CFileWriter::AddBlock(vector<Slice> data_slices,
                              BlockPointer* block_ptr,
                              const char* name_for_log) {
-  uint64_t start_offset = off_;
+  const uint64_t start_offset = off_;
   vector<Slice> out_slices;
 
   if (block_compressor_ != nullptr) {
     // Write compressed block
     Status s = block_compressor_->Compress(std::move(data_slices), 
&out_slices);
-    if (!s.ok()) {
-      LOG(WARNING) << "Unable to compress block at offset " << off_
-                   << ": " << s.ToString();
+    if (PREDICT_FALSE(!s.ok())) {
+      LOG(ERROR) << "Unable to compress block at offset " << start_offset
+                 << ": " << s.ToString();
       return s;
     }
   } else {
@@ -487,7 +517,8 @@ Status CFileWriter::AddBlock(vector<Slice> data_slices,
 
   RETURN_NOT_OK(WriteRawData(out_slices));
 
-  uint64_t total_size = off_ - start_offset;
+  DCHECK_GT(off_, start_offset);
+  const uint64_t total_size = off_ - start_offset;
 
   *block_ptr = BlockPointer(start_offset, total_size);
   VLOG(1) << "Appended " << name_for_log
@@ -501,10 +532,10 @@ Status CFileWriter::WriteRawData(const vector<Slice>& 
data) {
                                   return sum + curr.size();
                                 });
   Status s = block_->AppendV(data);
-  if (!s.ok()) {
-    LOG(WARNING) << "Unable to append data of size "
-                 << data_size << " at offset " << off_
-                 << ": " << s.ToString();
+  if (PREDICT_FALSE(!s.ok())) {
+    LOG(ERROR) << "Unable to append data of size "
+               << data_size << " at offset " << off_
+               << ": " << s.ToString();
   }
   off_ += data_size;
   return s;
diff --git a/src/kudu/cfile/cfile_writer.h b/src/kudu/cfile/cfile_writer.h
index b5d6b61fa..3c36efb03 100644
--- a/src/kudu/cfile/cfile_writer.h
+++ b/src/kudu/cfile/cfile_writer.h
@@ -28,10 +28,8 @@
 #include "kudu/fs/block_id.h"
 #include "kudu/fs/block_manager.h"
 #include "kudu/gutil/macros.h"
-#include "kudu/util/bitmap.h"
 #include "kudu/util/compression/compression.pb.h"
 #include "kudu/util/faststring.h"
-#include "kudu/util/rle-encoding.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 
@@ -54,43 +52,10 @@ extern const char kMagicStringV2[];
 extern const int kMagicLength;
 extern const size_t kChecksumSize;
 
-class NullBitmapBuilder {
- public:
-  explicit NullBitmapBuilder(size_t initial_row_capacity)
-    : nitems_(0),
-      bitmap_(BitmapSize(initial_row_capacity)),
-      rle_encoder_(&bitmap_, 1) {
-  }
-
-  size_t nitems() const {
-    return nitems_;
-  }
-
-  // If value parameter is true, it means that all values in this run are null
-  void AddRun(bool value, size_t run_length = 1) {
-    nitems_ += run_length;
-    rle_encoder_.Put(value, run_length);
-  }
-
-  // the returned Slice is only valid until this Builder is destroyed or Reset
-  Slice Finish() {
-    int len = rle_encoder_.Flush();
-    return Slice(bitmap_.data(), len);
-  }
-
-  void Reset() {
-    nitems_ = 0;
-    rle_encoder_.Clear();
-  }
-
- private:
-  size_t nitems_;
-  faststring bitmap_;
-  RleEncoder<bool> rle_encoder_;
-};
+class NullBitmapBuilder;
 
 // Main class used to write a CFile.
-class CFileWriter {
+class CFileWriter final {
  public:
   explicit CFileWriter(WriterOptions options,
                        const TypeInfo* typeinfo,
@@ -166,7 +131,7 @@ class CFileWriter {
 
   std::string ToString() const { return block_->id().ToString(); }
 
-  fs::WritableBlock* block() const { return block_.get(); }
+  const fs::WritableBlock* block() const { return block_.get(); }
 
   // Wrapper for AddBlock() to append the dictionary block to the end of a 
Cfile.
   Status AppendDictBlock(std::vector<Slice> data_slices,
@@ -218,7 +183,7 @@ class CFileWriter {
   faststring tmp_buf_;
 
   // Metadata which has been added to the writer but not yet flushed.
-  std::vector<std::pair<std::string, std::string> > unflushed_metadata_;
+  std::vector<std::pair<std::string, std::string>> unflushed_metadata_;
 
   std::unique_ptr<BlockBuilder> data_block_;
   std::unique_ptr<IndexTreeBuilder> posidx_builder_;
@@ -236,6 +201,5 @@ class CFileWriter {
   DISALLOW_COPY_AND_ASSIGN(CFileWriter);
 };
 
-
 } // namespace cfile
 } // namespace kudu

Reply via email to