This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a1c19fd0 KUDU-1261 writing/reading of array data blocks
9a1c19fd0 is described below

commit 9a1c19fd0853aef54bdd52af69194ee43f2a33b6
Author: Alexey Serbin <[email protected]>
AuthorDate: Wed Apr 23 07:32:55 2025 -0700

    KUDU-1261 writing/reading of array data blocks
    
    The CFile reader and writer are updated to support reading and writing
    nullable array data blocks as per the documented specification [1].
    The new functionality is covered by newly added tests in cfile-test.cc.
    Follow-up patches (such as [2]) contain end-to-end tests using Kudu
    mini-cluster and Kudu C++ client.
    
    With this patch, writing/reading of array data works end-to-end
    of all the existing scalar types except for INT128/DECIMAL128
    (the latter isn't supported by the serdes layer, but everything
    else can support 128-bit integers).
    
    Below is the list of caveats (TODOs) known at the time of writing:
      * the DICTIONARY encoder for string/binaries needs updating to
        allow for masking 'block is full' bit; as of this changelist
        storing strings/binaries works with PLAIN and PREFIX_ENCODING,
        but has edge cases where it's not working in DICTIONARY encoding
      * memory management isn't yet optimal when reading array cells
        in CFileIterator
      * maximum number of elements in an array isn't yet configurable
      * the configured maximum number of elements in an array isn't yet
        enforced
      * for future proofing (e.g., thinking of switching to Apache Arrow
        format instead of Flatbuffers) it's necessary to add meta-info,
        so the sender and the recipient can tell what format to use for
        for exchanging the data of NESTED data types
      * support for array data blocks needs to be explicitly enabled
        by setting --cfile_support_arrays=true
    
    [1] https://gerrit.cloudera.org/#/c/22058
    [2] https://gerrit.cloudera.org/#/c/23220
    
    Change-Id: I5825c939cb40d350a23a78609115f26e62cca270
    Reviewed-on: http://gerrit.cloudera.org:8080/22868
    Reviewed-by: Abhishek Chennaka <[email protected]>
    Tested-by: Alexey Serbin <[email protected]>
---
 src/kudu/cfile/binary_dict_block.cc     |   8 +-
 src/kudu/cfile/binary_dict_block.h      |   2 +-
 src/kudu/cfile/binary_plain_block.cc    |   2 +-
 src/kudu/cfile/binary_plain_block.h     |   2 +-
 src/kudu/cfile/binary_prefix_block.cc   |   2 +-
 src/kudu/cfile/binary_prefix_block.h    |   2 +-
 src/kudu/cfile/block_encodings.h        |  29 +-
 src/kudu/cfile/bshuf_block.h            |   6 +-
 src/kudu/cfile/cfile-test-base.h        | 671 +++++++++++++++++++++++++-------
 src/kudu/cfile/cfile-test.cc            | 309 ++++++++++++++-
 src/kudu/cfile/cfile_reader.cc          | 266 ++++++++++++-
 src/kudu/cfile/cfile_reader.h           |  20 +-
 src/kudu/cfile/cfile_util.h             |   5 +-
 src/kudu/cfile/cfile_writer.cc          | 318 ++++++++++++++-
 src/kudu/cfile/cfile_writer.h           |  25 +-
 src/kudu/cfile/plain_bitmap_block.h     |   2 +-
 src/kudu/cfile/plain_block.h            |   2 +-
 src/kudu/cfile/rle_block.h              |   4 +-
 src/kudu/common/columnblock-test-util.h |  11 +-
 src/kudu/common/columnblock.h           |   1 +
 src/kudu/common/rowblock.h              |   2 +
 src/kudu/common/rowblock_memory.h       |   2 +-
 src/kudu/tablet/multi_column_writer.cc  |  20 +-
 23 files changed, 1509 insertions(+), 202 deletions(-)

diff --git a/src/kudu/cfile/binary_dict_block.cc 
b/src/kudu/cfile/binary_dict_block.cc
index c860ef468..0c0c89746 100644
--- a/src/kudu/cfile/binary_dict_block.cc
+++ b/src/kudu/cfile/binary_dict_block.cc
@@ -68,7 +68,7 @@ BinaryDictBlockBuilder::BinaryDictBlockBuilder(const 
WriterOptions* options)
 
 void BinaryDictBlockBuilder::Reset() {
   if (mode_ == kCodeWordMode &&
-      dict_block_.IsBlockFull()) {
+      dict_block_.IsBlockFullImpl()) {
     mode_ = kPlainBinaryMode;
     data_builder_.reset(new BinaryPlainBlockBuilder(options_));
   } else {
@@ -96,9 +96,9 @@ void BinaryDictBlockBuilder::Finish(rowid_t ordinal_pos, 
vector<Slice>* slices)
 //
 // If it is the latter case, all the subsequent data blocks will switch to
 // StringPlainBlock automatically.
-bool BinaryDictBlockBuilder::IsBlockFull() const {
-  if (data_builder_->IsBlockFull()) return true;
-  if (dict_block_.IsBlockFull() && (mode_ == kCodeWordMode)) return true;
+bool BinaryDictBlockBuilder::IsBlockFullImpl() const {
+  if (data_builder_->IsBlockFullImpl()) return true;
+  if (dict_block_.IsBlockFullImpl() && (mode_ == kCodeWordMode)) return true;
   return false;
 }
 
diff --git a/src/kudu/cfile/binary_dict_block.h 
b/src/kudu/cfile/binary_dict_block.h
index c653be1e8..2e4e2ce9a 100644
--- a/src/kudu/cfile/binary_dict_block.h
+++ b/src/kudu/cfile/binary_dict_block.h
@@ -80,7 +80,7 @@ class BinaryDictBlockBuilder final : public BlockBuilder {
  public:
   explicit BinaryDictBlockBuilder(const WriterOptions* options);
 
-  bool IsBlockFull() const override;
+  bool IsBlockFullImpl() const override;
 
   // Append the dictionary block for the current cfile to the end of the cfile 
and set the footer
   // accordingly.
diff --git a/src/kudu/cfile/binary_plain_block.cc 
b/src/kudu/cfile/binary_plain_block.cc
index 8d4f8d218..24c95c1b2 100644
--- a/src/kudu/cfile/binary_plain_block.cc
+++ b/src/kudu/cfile/binary_plain_block.cc
@@ -66,7 +66,7 @@ void BinaryPlainBlockBuilder::Reset() {
   finished_ = false;
 }
 
-bool BinaryPlainBlockBuilder::IsBlockFull() const {
+bool BinaryPlainBlockBuilder::IsBlockFullImpl() const {
   return size_estimate_ > options_->storage_attributes.cfile_block_size;
 }
 
diff --git a/src/kudu/cfile/binary_plain_block.h 
b/src/kudu/cfile/binary_plain_block.h
index 8bea41a27..fe773749d 100644
--- a/src/kudu/cfile/binary_plain_block.h
+++ b/src/kudu/cfile/binary_plain_block.h
@@ -60,7 +60,7 @@ class BinaryPlainBlockBuilder final : public BlockBuilder {
  public:
   explicit BinaryPlainBlockBuilder(const WriterOptions* options);
 
-  bool IsBlockFull() const override;
+  bool IsBlockFullImpl() const override;
 
   int Add(const uint8_t* vals, size_t count) override;
 
diff --git a/src/kudu/cfile/binary_prefix_block.cc 
b/src/kudu/cfile/binary_prefix_block.cc
index 0b053448c..1536315e7 100644
--- a/src/kudu/cfile/binary_prefix_block.cc
+++ b/src/kudu/cfile/binary_prefix_block.cc
@@ -96,7 +96,7 @@ void BinaryPrefixBlockBuilder::Reset() {
   last_val_.clear();
 }
 
-bool BinaryPrefixBlockBuilder::IsBlockFull() const {
+bool BinaryPrefixBlockBuilder::IsBlockFullImpl() const {
   // TODO(todd): take restarts size into account
   return buffer_.size() > options_->storage_attributes.cfile_block_size;
 }
diff --git a/src/kudu/cfile/binary_prefix_block.h 
b/src/kudu/cfile/binary_prefix_block.h
index afffcd107..7e4a91e11 100644
--- a/src/kudu/cfile/binary_prefix_block.h
+++ b/src/kudu/cfile/binary_prefix_block.h
@@ -47,7 +47,7 @@ class BinaryPrefixBlockBuilder final : public BlockBuilder {
  public:
   explicit BinaryPrefixBlockBuilder(const WriterOptions* options);
 
-  bool IsBlockFull() const override;
+  bool IsBlockFullImpl() const override;
 
   int Add(const uint8_t* vals, size_t count) override;
 
diff --git a/src/kudu/cfile/block_encodings.h b/src/kudu/cfile/block_encodings.h
index 62e7fba8c..0b94c24bb 100644
--- a/src/kudu/cfile/block_encodings.h
+++ b/src/kudu/cfile/block_encodings.h
@@ -39,7 +39,9 @@ class CFileWriter;
 
 class BlockBuilder {
  public:
-  BlockBuilder() = default;
+  BlockBuilder()
+      : block_full_masked_(false) {
+  }
   virtual ~BlockBuilder() = default;
 
   // Append extra information to the end of the current cfile, for example:
@@ -53,7 +55,15 @@ class BlockBuilder {
   // A block is full if it its estimated size is larger than the configured
   // WriterOptions' cfile_block_size.
   // If it is full, the cfile writer will call FinishCurDataBlock().
-  virtual bool IsBlockFull() const = 0;
+  bool IsBlockFull() const {
+    if (block_full_masked_) {
+      return false;
+    }
+    return IsBlockFullImpl();
+  }
+
+  // The indication of the 'block is full' not affected by 
SetBlockFullMasked().
+  virtual bool IsBlockFullImpl() const = 0;
 
   // Add a sequence of values to the block.
   // Returns the number of values actually added, which may be less
@@ -94,7 +104,22 @@ class BlockBuilder {
   // If no keys have been added, returns Status::NotFound
   virtual Status GetLastKey(void* key) const = 0;
 
+  // Mask the 'block is full' criterion. This is used when writing array data
+  // blocks: the contents of a single array cell cannot be split between
+  // different array data blocks. The cfile writer uses this to make sure the
+  // encoder isn't switching to a new block when writing data within
+  // a single array cell.
+  void SetBlockFullMasked(bool block_full_masked) {
+    block_full_masked_ = block_full_masked;
+  }
+
+  // Whether the 'block is full' is masked.
+  bool IsBlockFullMasked() const {
+    return block_full_masked_;
+  }
+
  private:
+  bool block_full_masked_;
   DISALLOW_COPY_AND_ASSIGN(BlockBuilder);
 };
 
diff --git a/src/kudu/cfile/bshuf_block.h b/src/kudu/cfile/bshuf_block.h
index a4e8450e1..9fd218a7c 100644
--- a/src/kudu/cfile/bshuf_block.h
+++ b/src/kudu/cfile/bshuf_block.h
@@ -119,13 +119,13 @@ class BShufBlockBuilder final : public BlockBuilder {
     rem_elem_capacity_ = block_size / size_of_type;
   }
 
-  bool IsBlockFull() const override {
-    return rem_elem_capacity_ == 0;
+  bool IsBlockFullImpl() const override {
+    return rem_elem_capacity_ <= 0;
   }
 
   int Add(const uint8_t* vals_void, size_t count) override {
     DCHECK(!finished_);
-    int to_add = std::min<int>(rem_elem_capacity_, count);
+    int to_add = IsBlockFullMasked() ? count : 
std::min<int>(rem_elem_capacity_, count);
     data_.append(vals_void, to_add * size_of_type);
     count_ += to_add;
     rem_elem_capacity_ -= to_add;
diff --git a/src/kudu/cfile/cfile-test-base.h b/src/kudu/cfile/cfile-test-base.h
index 2f4b41d4a..cc637a115 100644
--- a/src/kudu/cfile/cfile-test-base.h
+++ b/src/kudu/cfile/cfile-test-base.h
@@ -21,6 +21,7 @@
 #include <functional>
 #include <memory>
 #include <string>
+#include <type_traits>
 #include <utility>
 #include <vector>
 
@@ -29,6 +30,8 @@
 #include "kudu/cfile/cfile.pb.h"
 #include "kudu/cfile/cfile_reader.h"
 #include "kudu/cfile/cfile_writer.h"
+#include "kudu/common/array_cell_view.h"
+#include "kudu/common/array_type_serdes.h"
 #include "kudu/common/columnblock.h"
 #include "kudu/common/columnblock-test-util.h"
 #include "kudu/fs/fs_manager.h"
@@ -48,6 +51,28 @@ DEFINE_int32(cfile_test_block_size, 1024,
 namespace kudu {
 namespace cfile {
 
+// The true/false pattern alternates every 2^(N-1) consequitive integer
+// input values, cycling between:
+//   2^(N-1) true
+//   2^(N-1) alternating true/false
+//   2^(N-1) false
+//   2^(N-1) alternating true/false
+//
+// This is useful for stress-testing the run-length coding of NULL values
+// in data blocks.
+template<size_t N>
+bool IsNullAlternating(size_t n) {
+  switch ((n >> N) & 3) {
+    case 1:
+    case 3:
+      return n & 1;
+    case 2:
+      return false;
+    default:
+      return true;
+  }
+}
+
 // Abstract test data generator.
 // You must implement BuildTestValue() to return your test value.
 //    Usage example:
@@ -60,18 +85,22 @@ namespace cfile {
 template<DataType DATA_TYPE, bool HAS_NULLS>
 class DataGenerator {
  public:
-  static bool has_nulls() {
+  static constexpr bool has_nulls() {
     return HAS_NULLS;
   }
 
-  static const DataType kDataType;
+  static constexpr bool is_array() {
+    return false;
+  }
+
+  static constexpr const DataType kDataType = DATA_TYPE;
 
   typedef typename DataTypeTraits<DATA_TYPE>::cpp_type cpp_type;
 
-  DataGenerator() :
-    block_entries_(0),
-    total_entries_(0)
-  {}
+  DataGenerator()
+      : block_entries_(0),
+        total_entries_(0) {
+  }
 
   virtual ~DataGenerator() = default;
 
@@ -105,25 +134,7 @@ class DataGenerator {
     if (!HAS_NULLS) {
       return false;
     }
-
-    // The NULL pattern alternates every 32 rows, cycling between:
-    //   32 NULL
-    //   32 alternating NULL/NOTNULL
-    //   32 NOT-NULL
-    //   32 alternating NULL/NOTNULL
-    // This is to ensure that we stress the run-length coding for
-    // NULL value.
-    switch ((n >> 6) & 3) {
-      case 0:
-        return true;
-      case 1:
-      case 3:
-        return n & 1;
-      case 2:
-        return false;
-      default:
-        LOG(FATAL);
-    }
+    return IsNullAlternating<6>(n);
   }
 
   virtual void Resize(size_t num_entries) {
@@ -147,16 +158,13 @@ class DataGenerator {
     return values_[index];
   }
 
- private:
+ protected:
   std::unique_ptr<cpp_type[]> values_;
   std::unique_ptr<uint8_t[]> non_null_bitmap_;
   size_t block_entries_;
   size_t total_entries_;
 };
 
-template<DataType DATA_TYPE, bool HAS_NULLS>
-const DataType DataGenerator<DATA_TYPE, HAS_NULLS>::kDataType = DATA_TYPE;
-
 template<bool HAS_NULLS>
 class UInt8DataGenerator : public DataGenerator<UINT8, HAS_NULLS> {
  public:
@@ -336,6 +344,380 @@ class RandomInt32DataGenerator : public 
DataGenerator<INT32, /* HAS_NULLS= */ fa
   }
 };
 
+// Generic data generation interface for array cells. It's supposed to be
+// pluggable with minimal constexpr-eval style changes into the places
+// where DataGenerator is used.
+template<DataType DATA_TYPE,
+         typename FB_TYPE,
+         bool HAS_NULLS,
+         bool HAS_NULLS_IN_ARRAY,
+         size_t MAX_NUM_ELEMENTS_IN_ARRAY>
+class ArrayDataGenerator {
+ public:
+  typedef typename DataTypeTraits<DATA_TYPE>::cpp_type cpp_type;
+
+  static constexpr const DataType kDataType = DATA_TYPE;
+
+  static constexpr bool has_nulls() {
+    return HAS_NULLS;
+  }
+
+  static constexpr bool is_array() {
+    return true;
+  }
+
+  ArrayDataGenerator()
+      : block_entries_(0),
+        total_entries_(0),
+        total_elem_count_(0),
+        values_total_sum_(0),
+        str_values_total_size_(0) {
+  }
+
+  virtual ~ArrayDataGenerator() = default;
+
+  void Reset() {
+    block_entries_ = 0;
+    total_entries_ = 0;
+    total_elem_count_ = 0;
+    values_total_sum_ = 0;
+    str_values_total_size_ = 0;
+  }
+
+  void Build(size_t num_entries) {
+    Build(total_entries_, num_entries);
+    total_entries_ += num_entries;
+  }
+
+  // Build "num_entries" of array cells using (offset + i) as value.
+  // The data is available via 'values()', non-null for whole array cells
+  // via 'non_null_bitmap()', and validity of elements in each of the generated
+  // arrays via 'cell_non_null_bitmaps()'. Those are alive while an instance
+  // of this class is alive, and invalidated by next calls to either 'Build()'
+  // or 'Resize()'.
+  void Build(size_t offset, size_t num_entries) {
+    Resize(num_entries);
+
+    DCHECK_EQ(num_entries, array_cells_.size());
+    DCHECK_EQ(num_entries, flatbuffers_.size());
+    DCHECK_EQ(num_entries, cell_non_null_bitmaps_.size());
+    DCHECK_EQ(num_entries, cell_non_null_bitmaps_container_.size());
+    DCHECK_EQ(num_entries, cell_non_null_bitmaps_sizes_.size());
+    DCHECK_EQ(num_entries, values_vector_.size());
+    DCHECK_EQ(num_entries, values_vector_str_.size());
+    DCHECK_EQ(num_entries, block_entries_);
+
+    for (size_t i = 0; i < num_entries; ++i) {
+      const bool is_null_array =
+          HAS_NULLS ? TestValueShouldBeNull(offset + i) : false;
+      BitmapChange(non_null_bitmap_.get(), i, !is_null_array);
+
+      if (is_null_array) {
+        cell_non_null_bitmaps_sizes_[i] = 0;
+        cell_non_null_bitmaps_[i] = nullptr;
+        array_cells_[i].clear();
+        values_vector_[i].clear();
+        values_vector_str_[i].clear();
+        flatbuffers_[i].reset();
+        continue;
+      }
+
+      const size_t array_elem_num = random() % MAX_NUM_ELEMENTS_IN_ARRAY;
+      total_elem_count_ += array_elem_num;
+
+      // Set the number of bits for the array bitmap correspondingly.
+      cell_non_null_bitmaps_sizes_[i] = array_elem_num;
+      if (array_elem_num == 0) {
+        // Build an empty array for this cell.
+        values_vector_[i].clear();
+        values_vector_str_[i].clear();
+
+        array_cells_[i] = Slice(static_cast<uint8_t*>(nullptr), 0);
+      } else {
+        // Helper container for flatbuffer's serialization.
+        std::vector<bool> validity_src(array_elem_num, false);
+        values_vector_[i].resize(array_elem_num);
+        values_vector_str_[i].resize(array_elem_num);
+
+        cell_non_null_bitmaps_[i] = cell_non_null_bitmaps_container_[i].get();
+        uint8_t* array_non_null_bitmap = cell_non_null_bitmaps_[i];
+        DCHECK(array_non_null_bitmap);
+        for (size_t j = 0; j < array_elem_num; ++j) {
+          const bool array_elem_is_null =
+              HAS_NULLS_IN_ARRAY ? TestValueInArrayShouldBeNull(offset + i + j)
+                                 : false;
+          BitmapChange(array_non_null_bitmap, j, !array_elem_is_null);
+          validity_src[j] = !array_elem_is_null;
+          if constexpr (DATA_TYPE == STRING) {
+            auto& str = values_vector_str_[i][j];
+            if (array_elem_is_null) {
+              str.clear();
+              values_vector_[i][j] = Slice();
+            } else {
+              str.reserve(20);
+              str = std::to_string(random());
+              values_vector_[i][j] = Slice(str.data(), str.size());
+              str_values_total_size_ += str.size();
+            }
+          } else if constexpr (DATA_TYPE == BINARY) {
+            auto& str = values_vector_str_[i][j];
+            if (array_elem_is_null) {
+              str.clear();
+              values_vector_[i][j] = Slice();
+            } else {
+              str.reserve(45);
+              const auto r = random();
+              str.push_back(r % 128);
+              str.push_back(42 + r % 32);
+              str.append(std::to_string(r));
+              str.append({ 0x00, 0x7f, 0x42 });
+              str.append(std::to_string(random()));
+              values_vector_[i][j] = Slice(str.data(), str.size());
+              str_values_total_size_ += str.size();
+            }
+          } else {
+            static_assert(!std::is_same<Slice, cpp_type>::value,
+                          "cannot be a binary type");
+            if (array_elem_is_null) {
+              values_vector_[i][j] = 0;
+            } else {
+              values_vector_[i][j] = BuildTestValue(i, offset + i + j);
+              
IncrementValuesTotalSum(static_cast<size_t>(values_vector_[i][j]));
+            }
+          }
+        }
+
+        size_t buf_size = 0;
+        const auto s = Serialize<DATA_TYPE, FB_TYPE>(
+            reinterpret_cast<uint8_t*>(values_vector_[i].data()),
+            array_elem_num,
+            validity_src,
+            &flatbuffers_[i],
+            &buf_size);
+        CHECK_OK(s);
+        array_cells_[i] = Slice(flatbuffers_[i].get(), buf_size);
+      }
+    }
+  }
+
+  virtual cpp_type BuildTestValue(size_t block_index, size_t value) = 0;
+
+  bool TestValueShouldBeNull(size_t n) {
+    if (!HAS_NULLS) {
+      return false;
+    }
+    return IsNullAlternating</*N=*/5>(n);
+  }
+
+  bool TestValueInArrayShouldBeNull(size_t n) {
+    if (!HAS_NULLS_IN_ARRAY) {
+      return false;
+    }
+    return IsNullAlternating</*N=*/6>(n);
+  }
+
+  // Resize() resets all the underlying data if 'num_entries' is greater
+  // than 'block_entries_'.
+  virtual void Resize(size_t num_entries) {
+    cell_non_null_bitmaps_container_.resize(num_entries);
+
+    cell_non_null_bitmaps_.resize(num_entries);
+    cell_non_null_bitmaps_sizes_.resize(num_entries);
+
+    array_cells_.resize(num_entries);
+    flatbuffers_.resize(num_entries);
+    values_vector_.resize(num_entries);
+    values_vector_str_.resize(num_entries);
+
+    if (block_entries_ >= num_entries) {
+      block_entries_ = num_entries;
+      return;
+    }
+
+    for (auto i = 0; i < num_entries; ++i) {
+      // Allocate up to the maximum possible number of elements in each array
+      // cell. The cells are populated with elements (or nullified) by Build().
+      cell_non_null_bitmaps_sizes_[i] = MAX_NUM_ELEMENTS_IN_ARRAY;
+      cell_non_null_bitmaps_container_[i].reset(new 
uint8_t[BitmapSize(cell_non_null_bitmaps_sizes_[i])]);
+      cell_non_null_bitmaps_[i] = cell_non_null_bitmaps_container_[i].get();
+
+      array_cells_[i] = Slice();
+      flatbuffers_[i].reset();
+      values_vector_[i].clear();
+      values_vector_str_[i].clear();
+    }
+    non_null_bitmap_.reset(new uint8_t[BitmapSize(num_entries)]);
+    block_entries_ = num_entries;
+    // The 'total_entries_' isn't touched -- it corresponds to the offset
+    // in the generated CFile, so it's possible to continue writing into
+    // the same file even after Resize().
+  }
+
+  size_t block_entries() const {
+    return block_entries_;
+  }
+  size_t total_entries() const {
+    return total_entries_;
+  }
+
+  const uint8_t* non_null_bitmap() const {
+    return non_null_bitmap_.get();
+  }
+
+  const void* cells() const {
+    return reinterpret_cast<const void*>(array_cells_.data());
+  }
+
+  size_t total_elem_count() const {
+    return total_elem_count_;
+  }
+
+  size_t values_total_sum() const {
+    return values_total_sum_;
+  }
+
+  size_t str_values_total_size() const {
+    return str_values_total_size_;
+  }
+
+ private:
+  ATTRIBUTE_NO_SANITIZE_INTEGER
+  void IncrementValuesTotalSum(size_t increment) {
+    // The overflow of 'values_total_sum_' is intended.
+    values_total_sum_ += static_cast<size_t>(increment);
+  }
+
+  // The storage of non-null bitmaps for each of the generated array cells:
+  // these are bitmaps to reflect on the nullability of individual array
+  // elements in each array cell.
+  // This container performs auto-cleanup upon destruction of the generator.
+  std::vector<std::unique_ptr<uint8_t[]>> cell_non_null_bitmaps_container_;
+
+  // Slices that point to the memory stored in 'flatbuffers_', with some 
offset.
+  std::vector<Slice> array_cells_;
+
+  // Flatbuffers-serialized representations of arrays.
+  std::vector<std::unique_ptr<uint8_t[]>> flatbuffers_;
+
+  // This is a complimentary container to store std::string instances when the
+  // 'values_vector_' container below stores Slice instances.
+  std::vector<std::vector<std::string>> values_vector_str_;
+
+  // The storage of test values. This constainer stores Slice instances for
+  // STRING/BINARY Kudu types, and the std::string instances backing these
+  // are stored in the 'values_vector_str_' complimentary container.
+  std::vector<std::vector<cpp_type>> values_vector_;
+
+  // Bitmap to track whether it's an array (maybe an empty one) or just null.
+  std::unique_ptr<uint8_t[]> non_null_bitmap_;
+
+  // Bitmaps to track null elements in the arrays themselves: this container
+  // stores raw pointers to memory backed by corresponding elemenents
+  // of the 'cell_non_null_bitmaps_container_' field.
+  std::vector<uint8_t*> cell_non_null_bitmaps_;
+
+  // Number of bits in the corresponding bitmaps from 'cell_non_null_bitmaps_';
+  // null and empty arrays has zero bits in their bitmaps.
+  std::vector<size_t> cell_non_null_bitmaps_sizes_;
+
+  size_t block_entries_;
+  size_t total_entries_;
+
+  // Total number of array elements (including null/non-valid elements in 
array cells).
+  size_t total_elem_count_;
+
+  // Sum of all integer values generated during Build(): useful for 
verification
+  // of the result when reading the stored values from CFile.
+  size_t values_total_sum_;
+  // Total length/size of all binary/string value generated during Build():
+  // useful for verification of the result when reading the data from CFile.
+  size_t str_values_total_size_;
+};
+
+// This template is for numeric/non-binary data types for array elements,
+// where the return type of random() can be statically cast into the type
+// of the element.
+template<DataType KUDU_DATA_TYPE,
+         typename PB_TYPE,
+         bool HAS_NULLS,
+         bool HAS_NULLS_IN_ARRAY,
+         size_t MAX_NUM_ELEMENTS_IN_ARRAY>
+class IntArrayRandomDataGenerator :
+    public ArrayDataGenerator<KUDU_DATA_TYPE,
+                              PB_TYPE,
+                              HAS_NULLS,
+                              HAS_NULLS_IN_ARRAY,
+                              MAX_NUM_ELEMENTS_IN_ARRAY> {
+ public:
+  typedef typename DataTypeTraits<KUDU_DATA_TYPE>::cpp_type ElemType;
+  ElemType BuildTestValue(size_t /*block_index*/, size_t /*value*/) override {
+    return static_cast<ElemType>(random());
+  }
+};
+
+template<DataType KUDU_DATA_TYPE,
+         typename PB_TYPE,
+         bool HAS_NULLS,
+         bool HAS_NULLS_IN_ARRAY,
+         size_t MAX_NUM_ELEMENTS_IN_ARRAY>
+class StrArrayRandomDataGenerator :
+    public ArrayDataGenerator<KUDU_DATA_TYPE,
+                              PB_TYPE,
+                              HAS_NULLS,
+                              HAS_NULLS_IN_ARRAY,
+                              MAX_NUM_ELEMENTS_IN_ARRAY> {
+ public:
+  typedef typename DataTypeTraits<KUDU_DATA_TYPE>::cpp_type ElemType;
+
+  // This is a fake implementation just to satisfy the signature of the
+  // overloaded function.
+  Slice BuildTestValue(size_t /*block_index*/, size_t /*value*/) override {
+    static const Slice kEmpty;
+    return kEmpty;
+  }
+};
+
+template<bool HAS_NULLS,
+         bool HAS_NULLS_IN_ARRAY,
+         size_t MAX_NUM_ELEMENTS_IN_ARRAY>
+using Int8ArrayRandomDataGenerator =
+    IntArrayRandomDataGenerator<INT8,
+                                serdes::Int8Array,
+                                HAS_NULLS,
+                                HAS_NULLS_IN_ARRAY,
+                                MAX_NUM_ELEMENTS_IN_ARRAY>;
+
+template<bool HAS_NULLS,
+         bool HAS_NULLS_IN_ARRAY,
+         size_t MAX_NUM_ELEMENTS_IN_ARRAY>
+using Int32ArrayRandomDataGenerator =
+    IntArrayRandomDataGenerator<INT32,
+                                serdes::Int32Array,
+                                HAS_NULLS,
+                                HAS_NULLS_IN_ARRAY,
+                                MAX_NUM_ELEMENTS_IN_ARRAY>;
+
+template<bool HAS_NULLS,
+         bool HAS_NULLS_IN_ARRAY,
+         size_t MAX_NUM_ELEMENTS_IN_ARRAY>
+using StringArrayRandomDataGenerator =
+    StrArrayRandomDataGenerator<STRING,
+                                serdes::StringArray,
+                                HAS_NULLS,
+                                HAS_NULLS_IN_ARRAY,
+                                MAX_NUM_ELEMENTS_IN_ARRAY>;
+
+template<bool HAS_NULLS,
+         bool HAS_NULLS_IN_ARRAY,
+         size_t MAX_NUM_ELEMENTS_IN_ARRAY>
+using BinaryArrayRandomDataGenerator =
+    StrArrayRandomDataGenerator<BINARY,
+                                serdes::BinaryArray,
+                                HAS_NULLS,
+                                HAS_NULLS_IN_ARRAY,
+                                MAX_NUM_ELEMENTS_IN_ARRAY>;
+
+
 class CFileTestBase : public KuduTest {
  public:
   void SetUp() override {
@@ -381,8 +763,12 @@ class CFileTestBase : public KuduTest {
 
     opts.storage_attributes.encoding = encoding;
     opts.storage_attributes.compression = compression;
-    CFileWriter w(opts, GetTypeInfo(DataGeneratorType::kDataType),
-                  DataGeneratorType::has_nulls(), std::move(sink));
+    CFileWriter w(opts,
+                  DataGeneratorType::is_array()
+                      ? GetArrayTypeInfo(DataGeneratorType::kDataType)
+                      : GetTypeInfo(DataGeneratorType::kDataType),
+                  DataGeneratorType::has_nulls(),
+                  std::move(sink));
 
     ASSERT_OK(w.Start());
 
@@ -397,12 +783,19 @@ class CFileTestBase : public KuduTest {
       data_generator->Build(towrite);
       DCHECK_EQ(towrite, data_generator->block_entries());
 
-      if (DataGeneratorType::has_nulls()) {
-        
ASSERT_OK_FAST(w.AppendNullableEntries(data_generator->non_null_bitmap(),
-                                                      data_generator->values(),
-                                                      towrite));
+      if constexpr (DataGeneratorType::is_array()) {
+        ASSERT_OK_FAST(w.AppendNullableArrayEntries(
+            data_generator->non_null_bitmap(),
+            data_generator->cells(),
+            towrite));
       } else {
-        ASSERT_OK_FAST(w.AppendEntries(data_generator->values(), towrite));
+        if constexpr (DataGeneratorType::has_nulls()) {
+          
ASSERT_OK_FAST(w.AppendNullableEntries(data_generator->non_null_bitmap(),
+                                                 data_generator->values(),
+                                                 towrite));
+        } else {
+          ASSERT_OK_FAST(w.AppendEntries(data_generator->values(), towrite));
+        }
       }
       i += towrite;
     }
@@ -411,7 +804,6 @@ class CFileTestBase : public KuduTest {
   }
 
   std::unique_ptr<FsManager> fs_manager_;
-
 };
 
 // Fast unrolled summing of a vector.
@@ -437,131 +829,150 @@ SumType FastSum(const Indexable& data, size_t n) {
   return sums[0] + sums[1] + sums[2] + sums[3];
 }
 
+template<DataType Type>
+Status TimeReadFileForArrayDataType(
+    CFileIterator* iter,
+    size_t* out_row_count,
+    size_t* out_total_array_element_count = nullptr,
+    size_t* out_total_elem_str_size = nullptr) {
+  size_t row_count = 0;
+  size_t null_arrays = 0;
+  size_t empty_arrays = 0;
+  size_t null_array_elements = 0;
+  size_t total_array_elements = 0;
+  size_t total_elem_str_size = 0;
+
+  ScopedColumnBlock<Type, /*IS_ARRAY=*/true> cb(100000); // up to 100K rows
+  SelectionVector sel(cb.nrows());
+  ColumnMaterializationContext ctx(0, nullptr, &cb, &sel);
+  ctx.SetDecoderEvalNotSupported();
+  while (iter->HasNext()) {
+    size_t n = cb.nrows();
+    RETURN_NOT_OK(iter->CopyNextValues(&n, &ctx));
+    row_count += n;
+    for (size_t ri = 0; ri < n; ++ri) {
+      const Slice* cell_ptr = reinterpret_cast<const 
Slice*>(cb.cell(ri).ptr());
+      if (cell_ptr == nullptr || cell_ptr->empty()) {
+        ++null_arrays;
+        continue;
+      }
+      ArrayCellMetadataView view(cell_ptr->data(), cell_ptr->size());
+      RETURN_NOT_OK(view.Init());
+      if (view.empty()) {
+        ++empty_arrays;
+        continue;
+      }
+
+      const auto elem_num = view.elem_num();
+      total_array_elements += elem_num;
+      BitmapIterator bit(view.not_null_bitmap(), elem_num);
+      bool is_not_null = false;
+      while (size_t elem_count = bit.Next(&is_not_null)) {
+        if (!is_not_null) {
+          null_array_elements += elem_count;
+        }
+      }
+
+      // For binary/string types, calculate the total length of strings/buffers
+      // of all the array elements.
+      if constexpr (Type == DataType::BINARY ||
+                    Type == DataType::STRING ||
+                    Type == DataType::VARCHAR) {
+        const Slice* data = reinterpret_cast<const Slice*>(view.data_as(Type));
+        DCHECK(data);
+        const auto* bm = view.not_null_bitmap();
+        DCHECK(bm);
+        for (size_t i = 0; i < elem_num; ++i, ++data) {
+          if (BitmapTest(bm, i)) {
+            total_elem_str_size += data->size();
+          }
+        }
+      }
+    }
+    cb.memory()->Reset();
+  }
+  if (out_row_count) {
+    *out_row_count = row_count;
+  }
+  if (out_total_array_element_count) {
+    *out_total_array_element_count = total_array_elements;
+  }
+  if (out_total_elem_str_size) {
+    *out_total_elem_str_size = total_elem_str_size;
+  }
+  LOG(INFO)<< "Row count           : " << row_count;
+  LOG(INFO)<< "NULL rows/arrays    : " << null_arrays;
+  LOG(INFO)<< "Empty arrays        : " << empty_arrays;
+  LOG(INFO)<< "NULL array elements : " << null_array_elements;
+  LOG(INFO)<< "Total array elements: " << total_array_elements;
+  return Status::OK();
+}
+
 template<DataType Type, typename SumType>
-void TimeReadFileForDataType(CFileIterator* iter, int* count) {
+Status TimeReadFileForDataType(CFileIterator* iter, size_t* count) {
   ScopedColumnBlock<Type> cb(8192);
   SelectionVector sel(cb.nrows());
   ColumnMaterializationContext ctx(0, nullptr, &cb, &sel);
   ctx.SetDecoderEvalNotSupported();
   SumType sum = 0;
+  size_t row_count = 0;
   while (iter->HasNext()) {
     size_t n = cb.nrows();
-    ASSERT_OK_FAST(iter->CopyNextValues(&n, &ctx));
+    RETURN_NOT_OK(iter->CopyNextValues(&n, &ctx));
     sum += FastSum<ScopedColumnBlock<Type>, SumType>(cb, n);
-    *count += n;
+    row_count += n;
     cb.memory()->Reset();
   }
+  if (count) {
+    *count = row_count;
+  }
   LOG(INFO)<< "Sum: " << sum;
-  LOG(INFO)<< "Count: " << *count;
+  LOG(INFO)<< "Count: " << row_count;
+  return Status::OK();
 }
 
 template<DataType Type>
-void ReadBinaryFile(CFileIterator* iter, int* count) {
+Status ReadBinaryFile(CFileIterator* iter, size_t* count) {
   ScopedColumnBlock<Type> cb(100);
   SelectionVector sel(cb.nrows());
   ColumnMaterializationContext ctx(0, nullptr, &cb, &sel);
   ctx.SetDecoderEvalNotSupported();
   uint64_t sum_lens = 0;
+  size_t row_count = 0;
   while (iter->HasNext()) {
     size_t n = cb.nrows();
-    ASSERT_OK_FAST(iter->CopyNextValues(&n, &ctx));
+    RETURN_NOT_OK(iter->CopyNextValues(&n, &ctx));
     for (size_t i = 0; i < n; i++) {
       if (!cb.is_null(i)) {
         sum_lens += cb[i].size();
       }
     }
-    *count += n;
+    row_count += n;
     cb.memory()->Reset();
   }
+  if (count) {
+    *count = row_count;
+  }
   LOG(INFO) << "Sum of value lengths: " << sum_lens;
-  LOG(INFO) << "Count: " << *count;
+  LOG(INFO) << "Count: " << row_count;
+  return Status::OK();
 }
 
-void TimeReadFile(FsManager* fs_manager, const BlockId& block_id, size_t* 
count_ret) {
-  Status s;
-
-  std::unique_ptr<fs::ReadableBlock> source;
-  ASSERT_OK(fs_manager->OpenBlock(block_id, &source));
-  std::unique_ptr<CFileReader> reader;
-  ASSERT_OK(CFileReader::Open(std::move(source), ReaderOptions(), &reader));
-
-  std::unique_ptr<CFileIterator> iter;
-  ASSERT_OK(reader->NewIterator(&iter, CFileReader::CACHE_BLOCK, nullptr));
-  ASSERT_OK(iter->SeekToOrdinal(0));
-
-  Arena arena(8192);
-  int count = 0;
-  switch (reader->type_info()->physical_type()) {
-    case UINT8:
-    {
-      TimeReadFileForDataType<UINT8, uint64_t>(iter.get(), &count);
-      break;
-    }
-    case INT8:
-    {
-      TimeReadFileForDataType<INT8, int64_t>(iter.get(), &count);
-      break;
-    }
-    case UINT16:
-    {
-      TimeReadFileForDataType<UINT16, uint64_t>(iter.get(), &count);
-      break;
-    }
-    case INT16:
-    {
-      TimeReadFileForDataType<INT16, int64_t>(iter.get(), &count);
-      break;
-    }
-    case UINT32:
-    {
-      TimeReadFileForDataType<UINT32, uint64_t>(iter.get(), &count);
-      break;
-    }
-    case INT32:
-    {
-      TimeReadFileForDataType<INT32, int64_t>(iter.get(), &count);
-      break;
-    }
-    case UINT64:
-    {
-      TimeReadFileForDataType<UINT64, uint64_t>(iter.get(), &count);
-      break;
-    }
-    case INT64:
-    {
-      TimeReadFileForDataType<INT64, int64_t>(iter.get(), &count);
-      break;
-    }
-    case INT128:
-    {
-      TimeReadFileForDataType<INT128, int128_t>(iter.get(), &count);
-      break;
-    }
-    case FLOAT:
-    {
-      TimeReadFileForDataType<FLOAT, float>(iter.get(), &count);
-      break;
-    }
-    case DOUBLE:
-    {
-      TimeReadFileForDataType<DOUBLE, double>(iter.get(), &count);
-      break;
-    }
-    case STRING:
-    {
-      ReadBinaryFile<STRING>(iter.get(), &count);
-      break;
-    }
-    case BINARY:
-    {
-      ReadBinaryFile<BINARY>(iter.get(), &count);
-      break;
-    }
-    default:
-      FAIL() << "Unknown type: " << reader->type_info()->physical_type();
-  }
-  *count_ret = count;
-}
+Status TimeReadFileForScalars(DataType data_type,
+                              CFileIterator* iter,
+                              size_t* count_ret);
+
+Status TimeReadFileForArrays(DataType data_type,
+                             CFileIterator* iter,
+                             size_t* count_ret,
+                             size_t* total_array_element_count_ret = nullptr,
+                             size_t* total_elem_str_size_ret = nullptr);
+
+Status TimeReadFile(FsManager* fs_manager,
+                    const BlockId& block_id,
+                    size_t* count_ret,
+                    size_t* total_array_element_count_ret = nullptr,
+                    size_t* total_elem_str_size_ret = nullptr);
 
 } // namespace cfile
 } // namespace kudu
diff --git a/src/kudu/cfile/cfile-test.cc b/src/kudu/cfile/cfile-test.cc
index c204d5e4e..b126376e1 100644
--- a/src/kudu/cfile/cfile-test.cc
+++ b/src/kudu/cfile/cfile-test.cc
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "kudu/cfile/cfile-test-base.h"
+
 #include <algorithm>
 #include <cstdint>
 #include <cstdlib>
@@ -25,6 +27,7 @@
 #include <memory>
 #include <sstream>
 #include <string>
+#include <tuple>
 #include <type_traits>
 #include <utility>
 #include <vector>
@@ -36,7 +39,6 @@
 #include "kudu/cfile/block_cache.h"
 #include "kudu/cfile/block_handle.h"
 #include "kudu/cfile/block_pointer.h"
-#include "kudu/cfile/cfile-test-base.h"
 #include "kudu/cfile/cfile.pb.h"
 #include "kudu/cfile/cfile_reader.h"
 #include "kudu/cfile/cfile_util.h"
@@ -78,10 +80,7 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
-namespace kudu {
-class Arena;
-}  // namespace kudu
-
+DECLARE_bool(cfile_support_arrays);
 DECLARE_bool(cfile_write_checksums);
 DECLARE_bool(cfile_verify_checksums);
 DECLARE_string(block_cache_eviction_policy);
@@ -92,11 +91,8 @@ DECLARE_string(nvm_cache_path);
 DECLARE_bool(nvm_cache_simulate_allocation_failure);
 
 METRIC_DECLARE_counter(block_cache_hits_caching);
-
 METRIC_DECLARE_entity(server);
 
-
-using kudu::fs::BlockManager;
 using kudu::fs::CountingReadableBlock;
 using kudu::fs::CreateCorruptBlock;
 using kudu::fs::ReadableBlock;
@@ -108,8 +104,128 @@ using std::vector;
 using strings::Substitute;
 
 namespace kudu {
+
+class Arena;
+
 namespace cfile {
 
+Status TimeReadFileForScalars(DataType data_type,
+                              CFileIterator* iter,
+                              size_t* count_ret) {
+  switch (data_type) {
+    case UINT8:
+      return TimeReadFileForDataType<UINT8, uint64_t>(iter, count_ret);
+    case INT8:
+      return TimeReadFileForDataType<INT8, int64_t>(iter, count_ret);
+    case UINT16:
+      return TimeReadFileForDataType<UINT16, uint64_t>(iter, count_ret);
+    case INT16:
+      return TimeReadFileForDataType<INT16, int64_t>(iter, count_ret);
+    case UINT32:
+      return TimeReadFileForDataType<UINT32, uint64_t>(iter, count_ret);
+    case INT32:
+      return TimeReadFileForDataType<INT32, int64_t>(iter, count_ret);
+    case UINT64:
+      return TimeReadFileForDataType<UINT64, uint64_t>(iter, count_ret);
+    case INT64:
+      return TimeReadFileForDataType<INT64, int64_t>(iter, count_ret);
+    case INT128:
+      return TimeReadFileForDataType<INT128, int128_t>(iter, count_ret);
+    case FLOAT:
+      return TimeReadFileForDataType<FLOAT, float>(iter, count_ret);
+    case DOUBLE:
+      return TimeReadFileForDataType<DOUBLE, double>(iter, count_ret);
+    case STRING:
+      return ReadBinaryFile<STRING>(iter, count_ret);
+    case BINARY:
+      return ReadBinaryFile<BINARY>(iter, count_ret);
+    default:
+      return Status::NotSupported(
+          Substitute("not implemented for type: $0", 
DataType_Name(data_type)));
+  }
+}
+
+Status TimeReadFileForArrays(DataType data_type,
+                             CFileIterator* iter,
+                             size_t* count_ret,
+                             size_t* total_array_element_count_ret,
+                             size_t* total_elem_str_size_ret) {
+  switch (data_type) {
+    case INT8:
+      return TimeReadFileForArrayDataType<INT8>(
+          iter, count_ret, total_array_element_count_ret);
+    case UINT8:
+      return TimeReadFileForArrayDataType<UINT8>(
+          iter, count_ret, total_array_element_count_ret);
+    case INT16:
+      return TimeReadFileForArrayDataType<INT16>(
+          iter, count_ret, total_array_element_count_ret);
+    case UINT16:
+      return TimeReadFileForArrayDataType<UINT16>(
+          iter, count_ret, total_array_element_count_ret);
+    case INT32:
+      return TimeReadFileForArrayDataType<INT32>(
+          iter, count_ret, total_array_element_count_ret);
+    case UINT32:
+      return TimeReadFileForArrayDataType<UINT32>(
+          iter, count_ret, total_array_element_count_ret);
+    case INT64:
+      return TimeReadFileForArrayDataType<INT64>(
+          iter, count_ret, total_array_element_count_ret);
+    case UINT64:
+      return TimeReadFileForArrayDataType<UINT64>(
+          iter, count_ret, total_array_element_count_ret);
+    case FLOAT:
+      return TimeReadFileForArrayDataType<FLOAT>(
+          iter, count_ret, total_array_element_count_ret);
+    case DOUBLE:
+      return TimeReadFileForArrayDataType<DOUBLE>(
+          iter, count_ret, total_array_element_count_ret);
+    case STRING:
+    case VARCHAR:
+      return TimeReadFileForArrayDataType<STRING>(
+          iter, count_ret, total_array_element_count_ret, 
total_elem_str_size_ret);
+    case BINARY:
+      return TimeReadFileForArrayDataType<BINARY>(
+          iter, count_ret, total_array_element_count_ret, 
total_elem_str_size_ret);
+    default:
+      return Status::NotSupported(
+          Substitute("not implemented for arrays of type: $0", 
DataType_Name(data_type)));
+  }
+}
+
+Status TimeReadFile(FsManager* fs_manager,
+                    const BlockId& block_id,
+                    size_t* count_ret,
+                    size_t* total_array_element_count_ret,
+                    size_t* total_elem_str_size_ret) {
+  std::unique_ptr<fs::ReadableBlock> source;
+  RETURN_NOT_OK(fs_manager->OpenBlock(block_id, &source));
+  std::unique_ptr<CFileReader> reader;
+  RETURN_NOT_OK(CFileReader::Open(std::move(source), ReaderOptions(), 
&reader));
+
+  std::unique_ptr<CFileIterator> iter;
+  RETURN_NOT_OK(reader->NewIterator(&iter, CFileReader::CACHE_BLOCK, nullptr));
+  RETURN_NOT_OK(iter->SeekToOrdinal(0));
+
+  if (!reader->is_array()) {
+    return TimeReadFileForScalars(
+        reader->type_info()->physical_type(), iter.get(), count_ret);
+  }
+
+  DCHECK(reader->is_array());
+
+  // Get the type of array elements.
+  const auto* elem_type_info = GetArrayElementTypeInfo(*reader->type_info());
+  // Only 1D arrays of scalar types are supported yet.
+  DCHECK_NE(NESTED, elem_type_info->type());
+  return TimeReadFileForArrays(elem_type_info->type(),
+                               iter.get(),
+                               count_ret,
+                               total_array_element_count_ret,
+                               total_elem_str_size_ret);
+}
+
 class TestCFile : public CFileTestBase {
  protected:
   template <class DataGeneratorType>
@@ -196,7 +312,7 @@ class TestCFile : public CFileTestBase {
       out[i] = 0;
     }
 
-    TimeReadFile(fs_manager_.get(), block_id, &n);
+    ASSERT_OK(TimeReadFile(fs_manager_.get(), block_id, &n));
     ASSERT_EQ(10000, n);
   }
 
@@ -255,7 +371,7 @@ class TestCFile : public CFileTestBase {
     WriteTestFile(generator, encoding, compression, 10000, SMALL_BLOCKSIZE, 
&block_id);
 
     size_t n;
-    NO_FATALS(TimeReadFile(fs_manager_.get(), block_id, &n));
+    ASSERT_OK(TimeReadFile(fs_manager_.get(), block_id, &n));
     ASSERT_EQ(n, 10000);
 
     generator->Reset();
@@ -346,7 +462,7 @@ class TestCFile : public CFileTestBase {
     LOG_TIMING(INFO, "reading 100M strings") {
       LOG(INFO) << "Starting readfile";
       size_t n;
-      TimeReadFile(fs_manager_.get(), block_id, &n);
+      ASSERT_OK(TimeReadFile(fs_manager_.get(), block_id, &n));
       ASSERT_EQ(100000000, n);
       LOG(INFO) << "End readfile";
     }
@@ -366,7 +482,7 @@ class TestCFile : public CFileTestBase {
     LOG_TIMING(INFO, Substitute("reading $0 strings with dupes", num_rows)) {
       LOG(INFO) << "Starting readfile";
       size_t n;
-      TimeReadFile(fs_manager_.get(), block_id, &n);
+      ASSERT_OK(TimeReadFile(fs_manager_.get(), block_id, &n));
       ASSERT_EQ(num_rows, n);
       LOG(INFO) << "End readfile";
     }
@@ -406,7 +522,7 @@ class TestCFile : public CFileTestBase {
 // once for each cache memory type (DRAM, NVM).
 class TestCFileBothCacheMemoryTypes :
     public TestCFile,
- public ::testing::WithParamInterface<std::pair<Cache::MemoryType, 
Cache::EvictionPolicy>> {
+    public ::testing::WithParamInterface<std::pair<Cache::MemoryType, 
Cache::EvictionPolicy>> {
  public:
   void SetUp() override {
     // The NVM cache can run using any directory as its path -- it doesn't have
@@ -486,7 +602,7 @@ TEST_P(TestCFileBothCacheMemoryTypes, 
TestWrite100MFileInts) {
   LOG_TIMING(INFO, "reading 100M ints") {
     LOG(INFO) << "Starting readfile";
     size_t n;
-    TimeReadFile(fs_manager_.get(), block_id, &n);
+    ASSERT_OK(TimeReadFile(fs_manager_.get(), block_id, &n));
     ASSERT_EQ(100000000, n);
     LOG(INFO) << "End readfile";
   }
@@ -505,7 +621,7 @@ TEST_P(TestCFileBothCacheMemoryTypes, 
TestWrite100MFileNullableInts) {
   LOG_TIMING(INFO, "reading 100M nullable ints") {
     LOG(INFO) << "Starting readfile";
     size_t n;
-    TimeReadFile(fs_manager_.get(), block_id, &n);
+    ASSERT_OK(TimeReadFile(fs_manager_.get(), block_id, &n));
     ASSERT_EQ(100000000, n);
     LOG(INFO) << "End readfile";
   }
@@ -547,7 +663,7 @@ TEST_P(TestCFileBothCacheMemoryTypes, 
TestWrite1MUniqueFileStringsDictEncoding)
   LOG_TIMING(INFO, "reading 1M strings") {
     LOG(INFO) << "Starting readfile";
     size_t n;
-    TimeReadFile(fs_manager_.get(), block_id, &n);
+    ASSERT_OK(TimeReadFile(fs_manager_.get(), block_id, &n));
     ASSERT_EQ(1000000, n);
     LOG(INFO) << "End readfile";
   }
@@ -626,7 +742,8 @@ TYPED_TEST(BitShuffleTest, 
TestFixedSizeReadWriteBitShuffle) {
   this->TestBitShuffle();
 }
 
-void EncodeStringKey(const Schema& schema,
+
+static void EncodeStringKey(const Schema& schema,
                      const Slice& key,
                      Arena* arena,
                      EncodedKey** encoded_key) {
@@ -1144,10 +1261,166 @@ TEST_P(TestCFileDifferentCodecs, TestUncompressible) {
     RandomInt32DataGenerator int_gen;
     WriteTestFile(&int_gen, PLAIN_ENCODING, codec, nrows,
                   NO_FLAGS, &block_id);
-    TimeReadFile(fs_manager_.get(), block_id, &rdrows);
+    ASSERT_OK(TimeReadFile(fs_manager_.get(), block_id, &rdrows));
     ASSERT_EQ(nrows, rdrows);
   }
 }
 
+class TestCFileArrayValues : public TestCFile,
+                             public testing::WithParamInterface<
+                                 std::tuple<EncodingType, CompressionType>> {
+  void SetUp() override {
+    FLAGS_cfile_support_arrays = true;
+    TestCFile::SetUp();
+  }
+
+ protected:
+  template<typename GenType>
+  void Run(size_t nrows) {
+    const auto codec = std::get<0>(GetParam());
+    const auto compressor = std::get<1>(GetParam());
+    srand(time(nullptr));
+    BlockId block_id;
+    size_t rdrows;
+
+    GenType array_gen;
+    NO_FATALS(WriteTestFile(&array_gen, codec, compressor, nrows, NO_FLAGS, 
&block_id));
+    size_t total_elem_count = 0;
+    size_t total_elem_str_size = 0;
+    ASSERT_OK(TimeReadFile(
+        fs_manager_.get(), block_id, &rdrows, &total_elem_count, 
&total_elem_str_size));
+    ASSERT_EQ(nrows, rdrows);
+    ASSERT_EQ(array_gen.total_elem_count(), total_elem_count);
+    ASSERT_EQ(array_gen.str_values_total_size(), total_elem_str_size);
+    // TODO(aserbin): add assertion on array_gen.values_total_sum()
+  }
+};
+
+// Various scenarios for writing arrays of integer type elements. For TSAN 
build
+// configuration, the amount of generated data is reduced to allow for faster
+// test runtime. Since there isn't any concurrency involved in these scenarios,
+// the test coverage is not reduced because of that.
+class TestCFileIntegerArrayValues : public TestCFileArrayValues {
+};
+INSTANTIATE_TEST_SUITE_P(ArrayBasics, TestCFileIntegerArrayValues,
+                         ::testing::Combine(
+                             ::testing::Values(PLAIN_ENCODING, RLE, 
BIT_SHUFFLE),
+                             ::testing::Values(NO_COMPRESSION, SNAPPY, LZ4, 
ZLIB)));
+
+// Write/read a file with random int8 values in array cells, where there might
+// be empty array cells and empty array elements themselves.
+TEST_P(TestCFileIntegerArrayValues, ReadWriteInt8WithNulls) {
+  Run<Int8ArrayRandomDataGenerator</* HAS_NULLS */true,
+                                   /* HAS_NULLS_IN_ARRAY */true,
+#if defined(THREAD_SANITIZER)
+                                   /* MAX_NUM_ELEMENTS_IN_ARRAY 
*/100>>(1000/*nrows*/);
+#else
+                                   /* MAX_NUM_ELEMENTS_IN_ARRAY 
*/10000>>(10000/*nrows*/);
+#endif  // #ifdef THREAD_SANITIZER
+}
+
+// Similar to ReadWriteInt8 above, but each array can have at most two nullable
+// elements.
+TEST_P(TestCFileIntegerArrayValues, ReadWriteInt8WithNullShortArrays) {
+  Run<Int8ArrayRandomDataGenerator</* HAS_NULLS */true,
+                                   /* HAS_NULLS_IN_ARRAY */true,
+#if defined(THREAD_SANITIZER)
+                                   /* MAX_NUM_ELEMENTS_IN_ARRAY 
*/2>>(1000/*nrows*/);
+#else
+                                   /* MAX_NUM_ELEMENTS_IN_ARRAY 
*/2>>(100000/*nrows*/);
+#endif  // #ifdef THREAD_SANITIZER
+}
+
+// Write/read a file with random int32 values in array cells with no null 
cells,
+// nor null elements in array cells themselves.
+TEST_P(TestCFileIntegerArrayValues, ReadWriteInt32) {
+  Run<Int32ArrayRandomDataGenerator</* HAS_NULLS */false,
+                                    /* HAS_NULLS_IN_ARRAY */false,
+#if defined(THREAD_SANITIZER)
+                                    /* MAX_NUM_ELEMENTS_IN_ARRAY 
*/10>>(100/*nrows*/);
+#else
+                                    /* MAX_NUM_ELEMENTS_IN_ARRAY 
*/100>>(10000/*nrows*/);
+#endif  // #ifdef THREAD_SANITIZER
+}
+
+// Write/read a file with random int32 values in array cells, where both
+// the elements of arrays and array cells themselves might be null,
+// and the number of elements in an array might be quite high.
+TEST_P(TestCFileIntegerArrayValues, ReadWriteInt32WithNulls) {
+  Run<Int32ArrayRandomDataGenerator</* HAS_NULLS */true,
+                                    /* HAS_NULLS_IN_ARRAY */true,
+#if defined(THREAD_SANITIZER)
+                                    /* MAX_NUM_ELEMENTS_IN_ARRAY 
*/1024>>(50/*nrows*/);
+#else
+                                    /* MAX_NUM_ELEMENTS_IN_ARRAY 
*/65535>>(50/*nrows*/);
+#endif  // #ifdef THREAD_SANITIZER
+}
+
+// Same as TestCFileArrayValues.ReadWriteWithNulls scenario above, but
+// the maximum number of elements in an array is small, but there are big 
number
+// of array cells.
+TEST_P(TestCFileIntegerArrayValues, ReadWriteInt32WithNullsShortArrays) {
+  Run<Int32ArrayRandomDataGenerator</* HAS_NULLS */true,
+                                    /* HAS_NULLS_IN_ARRAY */true,
+#if defined(THREAD_SANITIZER)
+                                    /* MAX_NUM_ELEMENTS_IN_ARRAY 
*/5>>(1000/*nrows*/);
+#else
+                                    /* MAX_NUM_ELEMENTS_IN_ARRAY 
*/5>>(100000/*nrows*/);
+#endif  // #ifdef THREAD_SANITIZER
+}
+
+class TestCFileBinaryArrayValues : public TestCFileArrayValues {
+};
+INSTANTIATE_TEST_SUITE_P(ArrayBasics, TestCFileBinaryArrayValues,
+                         ::testing::Combine(
+                             // TODO(aserbin): address DICT_ENCODING issues
+                             //::testing::Values(PLAIN_ENCODING, 
PREFIX_ENCODING, DICT_ENCODING),
+                             ::testing::Values(PLAIN_ENCODING, 
PREFIX_ENCODING),
+                             ::testing::Values(NO_COMPRESSION, SNAPPY, LZ4, 
ZLIB)));
+
+// Write/read a file with random string values in array cells.
+TEST_P(TestCFileBinaryArrayValues, ReadWriteStrings) {
+  Run<StringArrayRandomDataGenerator</* HAS_NULLS */false,
+                                     /* HAS_NULLS_IN_ARRAY */false,
+                                     /* MAX_NUM_ELEMENTS_IN_ARRAY 
*/10>>(1000/*nrows*/);
+}
+
+// Write/read a file with random string values in array cells, where both
+// the elements of array cells and array cells themselves might be null,
+// and the number of elements in an array might be quite high.
+TEST_P(TestCFileBinaryArrayValues, ReadWriteStringsWithNulls) {
+  Run<StringArrayRandomDataGenerator</* HAS_NULLS */false,
+                                     /* HAS_NULLS_IN_ARRAY */true,
+#if defined(THREAD_SANITIZER)
+                                     /* MAX_NUM_ELEMENTS_IN_ARRAY 
*/500>>(100/*nrows*/);
+#else
+                                     /* MAX_NUM_ELEMENTS_IN_ARRAY 
*/5000>>(1000/*nrows*/);
+#endif // #if defined(THREAD_SANITIZER)
+}
+
+// Same as ReadWriteStringsWithNulls above, but this one is for arbitrary 
binary
+// data array elements, not just strings.
+TEST_P(TestCFileBinaryArrayValues, ReadWriteBinaryWithNulls) {
+  Run<BinaryArrayRandomDataGenerator</* HAS_NULLS */false,
+                                     /* HAS_NULLS_IN_ARRAY */true,
+#if defined(THREAD_SANITIZER)
+                                     /* MAX_NUM_ELEMENTS_IN_ARRAY 
*/500>>(100/*nrows*/);
+#else
+                                     /* MAX_NUM_ELEMENTS_IN_ARRAY 
*/5000>>(1000/*nrows*/);
+#endif // #if defined(THREAD_SANITIZER)
+}
+
+// Same as the ReadWriteBinaryWithNulls scenario above, but the maximum number
+// of elements in an array is small, while there are a big number of array 
cells.
+TEST_P(TestCFileBinaryArrayValues, ReadWriteBinaryWithNullsShortArrays) {
+  Run<BinaryArrayRandomDataGenerator</* HAS_NULLS */true,
+                                     /* HAS_NULLS_IN_ARRAY */true,
+#if defined(THREAD_SANITIZER)
+                                     /* MAX_NUM_ELEMENTS_IN_ARRAY 
*/3>>(1000/*nrows*/);
+#else
+                                     /* MAX_NUM_ELEMENTS_IN_ARRAY 
*/3>>(100000/*nrows*/);
+#endif // #if defined(THREAD_SANITIZER)
+}
+
 } // namespace cfile
 } // namespace kudu
diff --git a/src/kudu/cfile/cfile_reader.cc b/src/kudu/cfile/cfile_reader.cc
index ffd24bd5f..9fde2219f 100644
--- a/src/kudu/cfile/cfile_reader.cc
+++ b/src/kudu/cfile/cfile_reader.cc
@@ -17,8 +17,11 @@
 
 #include "kudu/cfile/cfile_reader.h"
 
+#include <sys/types.h>
+
 #include <algorithm>
 #include <cstring>
+#include <limits>
 #include <memory>
 #include <ostream>
 #include <utility>
@@ -36,6 +39,7 @@
 #include "kudu/cfile/cfile_writer.h" // for kMagicString
 #include "kudu/cfile/index_btree.h"
 #include "kudu/cfile/type_encodings.h"
+#include "kudu/common/array_type_serdes.h"
 #include "kudu/common/column_materialization_context.h"
 #include "kudu/common/column_predicate.h"
 #include "kudu/common/columnblock.h"
@@ -43,6 +47,7 @@
 #include "kudu/common/encoded_key.h"
 #include "kudu/common/key_encoder.h"
 #include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
 #include "kudu/common/types.h"
 #include "kudu/fs/error_manager.h"
 #include "kudu/fs/io_context.h"
@@ -85,6 +90,8 @@ DEFINE_double(cfile_inject_corruption, 0,
               "with a corruption status");
 TAG_FLAG(cfile_inject_corruption, hidden);
 
+DECLARE_bool(cfile_support_arrays);
+
 using kudu::fault_injection::MaybeTrue;
 using kudu::fs::ErrorHandlerType;
 using kudu::fs::IOContext;
@@ -188,6 +195,12 @@ Status CFileReader::InitOnce(const IOContext* io_context) {
         footer_->incompatible_features(),
         IncompatibleFeatures::SUPPORTED));
   }
+  if (PREDICT_FALSE(footer_->is_type_array() && !FLAGS_cfile_support_arrays)) {
+    static const auto kErrStatus = Status::ConfigurationError(
+        "support for array data blocks is disabled");
+    LOG(DFATAL) << kErrStatus.ToString();
+    return kErrStatus;
+  }
 
   type_info_ = footer_->is_type_array() ? 
GetArrayTypeInfo(footer_->data_type())
                                         : GetTypeInfo(footer_->data_type());
@@ -777,7 +790,31 @@ void CFileIterator::SeekToPositionInBlock(PreparedBlock* 
pb, uint32_t idx_in_blo
   // we need to translate from 'ord_idx' (the absolute row id)
   // to the index within the non-null entries.
   uint32_t index_within_nonnulls;
-  if (reader_->is_nullable()) {
+  if (reader_->is_array()) {
+    DCHECK_LT(idx_in_block, pb->num_rows_in_block_);
+    if (PREDICT_TRUE(pb->idx_in_block_ <= idx_in_block)) {
+      // Seeking forward.
+      pb->array_rle_decoder_.Skip(idx_in_block - pb->idx_in_block_);
+      DCHECK_LT(pb->idx_in_block_, pb->array_start_indices_.size());
+      // Indices in the flattened value sequence (including null elements).
+      ssize_t flattened_idx_to_seek = pb->array_start_indices_[idx_in_block];
+      ssize_t flattened_idx_cur = pb->array_start_indices_[pb->idx_in_block_];
+      DCHECK_LE(flattened_idx_cur, flattened_idx_to_seek);
+      ssize_t nskip = flattened_idx_to_seek - flattened_idx_cur;
+      index_within_nonnulls = pb->dblk_->GetCurrentIndex() + 
pb->rle_decoder_.Skip(nskip);
+    } else {
+      // Seeking backward: need to reset the positions in both array and
+      // flattened values bitmaps and rewind them forward to required offsets.
+      pb->array_rle_decoder_ = RleDecoder<bool, 
1>(pb->array_rle_bitmap_.data(),
+                                                   
pb->array_rle_bitmap_.size());
+      pb->array_rle_decoder_.Skip(idx_in_block);
+      DCHECK_LE(idx_in_block, pb->array_start_indices_.size());
+      ssize_t flattened_idx_to_seek = pb->array_start_indices_[idx_in_block];
+      pb->rle_decoder_ = RleDecoder<bool, 1>(pb->rle_bitmap.data(),
+                                             pb->rle_bitmap.size());
+      index_within_nonnulls = pb->rle_decoder_.Skip(flattened_idx_to_seek);
+    }
+  } else if (reader_->is_nullable()) {
     if (PREDICT_TRUE(pb->idx_in_block_ <= idx_in_block)) {
       // We are seeking forward. Skip from the current position in the RLE 
decoder
       // instead of going back to the beginning of the block.
@@ -964,6 +1001,47 @@ Status DecodeNullInfo(scoped_refptr<BlockHandle>* 
data_block_handle,
   return Status::OK();
 }
 
+Status DecodeArrayInfo(scoped_refptr<BlockHandle>* data_block_handle,
+                       uint32_t* num_flattened_elements_in_block,
+                       uint32_t* num_arrays_in_block,
+                       Slice* flattened_non_null_bitmap,
+                       Slice* array_non_null_bitmap,
+                       Slice* array_elem_num_seq) {
+  Slice data_block = (*data_block_handle)->data();
+
+  if (PREDICT_FALSE(!GetVarint32(&data_block, 
num_flattened_elements_in_block))) {
+    return Status::Corruption("bad array header, num flattened elements");
+  }
+  if (PREDICT_FALSE(!GetVarint32(&data_block, num_arrays_in_block))) {
+    return Status::Corruption("bad array header, array count");
+  }
+
+  uint32_t flattened_non_null_bitmap_size;
+  if (PREDICT_FALSE(!GetVarint32(&data_block, 
&flattened_non_null_bitmap_size))) {
+    return Status::Corruption("bad array header, non-null bitmap size");
+  }
+  *flattened_non_null_bitmap = Slice(data_block.data(), 
flattened_non_null_bitmap_size);
+  data_block.remove_prefix(flattened_non_null_bitmap_size);
+
+  uint32_t array_elem_num_seq_size;
+  if (PREDICT_FALSE(!GetVarint32(&data_block, &array_elem_num_seq_size))) {
+    return Status::Corruption("bad array header, array element numbers size");
+  }
+  *array_elem_num_seq = Slice(data_block.data(), array_elem_num_seq_size);
+  data_block.remove_prefix(array_elem_num_seq_size);
+
+  uint32_t array_non_null_bitmap_size;
+  if (PREDICT_FALSE(!GetVarint32(&data_block, &array_non_null_bitmap_size))) {
+    return Status::Corruption("bad array header, non-null array bitmap size");
+  }
+  *array_non_null_bitmap = Slice(data_block.data(), 
array_non_null_bitmap_size);
+  data_block.remove_prefix(array_non_null_bitmap_size);
+
+  auto offset = data_block.data() - (*data_block_handle)->data().data();
+  *data_block_handle = (*data_block_handle)->SubrangeBlock(offset, 
data_block.size());
+  return Status::OK();
+}
+
 Status CFileIterator::ReadCurrentDataBlock(const IndexTreeIterator& idx_iter,
                                            PreparedBlock* prep_block) {
   prep_block->dblk_ptr_ = idx_iter.GetCurrentBlockPointer();
@@ -972,8 +1050,44 @@ Status CFileIterator::ReadCurrentDataBlock(const 
IndexTreeIterator& idx_iter,
 
   uint32_t num_rows_in_block = 0;
   scoped_refptr<BlockHandle> data_block = prep_block->dblk_handle_;
-  size_t total_size_read = data_block->data().size();
-  if (reader_->is_nullable()) {
+  const size_t total_size_read = data_block->data().size();
+  if (reader_->is_array()) {
+    Slice array_elem_num_seq;
+    RETURN_NOT_OK(DecodeArrayInfo(&data_block,
+                                  &(prep_block->num_flattened_elems_in_block_),
+                                  &num_rows_in_block,
+                                  &(prep_block->rle_bitmap),
+                                  &(prep_block->array_rle_bitmap_),
+                                  &array_elem_num_seq));
+    prep_block->rle_decoder_ = RleDecoder<bool, 1>(
+        prep_block->rle_bitmap.data(),
+        prep_block->rle_bitmap.size());
+    prep_block->array_rle_decoder_ = RleDecoder<bool, 1>(
+        prep_block->array_rle_bitmap_.data(),
+        prep_block->array_rle_bitmap_.size());
+
+    // Build array start indices given array element number sequence.
+    {
+      auto& start_indices = prep_block->array_start_indices_;
+      start_indices.reserve(num_rows_in_block);
+
+      RleDecoder<uint16_t, 16> dec(array_elem_num_seq.data(),
+                                   array_elem_num_seq.size());
+      size_t idx = 0;
+      uint16_t elem_num;
+      while (const size_t run_len = dec.GetNextRun(&elem_num, 
num_rows_in_block)) {
+        for (size_t i = 0; i < run_len; ++i) {
+          start_indices.emplace_back(idx);
+          idx += elem_num;
+        }
+      }
+      if (PREDICT_FALSE(num_rows_in_block != start_indices.size())) {
+        return Status::Corruption(
+            "bad array header, number of arrays mismatch");
+      }
+      DCHECK_LE(start_indices.size(), std::numeric_limits<uint32_t>::max());
+    }
+  } else if (reader_->is_nullable()) {
     RETURN_NOT_OK(DecodeNullInfo(&data_block, &num_rows_in_block, 
&(prep_block->rle_bitmap)));
     prep_block->rle_decoder_ = RleDecoder<bool, 
1>(prep_block->rle_bitmap.data(),
                                                    
prep_block->rle_bitmap.size());
@@ -987,10 +1101,11 @@ Status CFileIterator::ReadCurrentDataBlock(const 
IndexTreeIterator& idx_iter,
                                    reader_->block_id().ToString(),
                                    prep_block->dblk_ptr_.ToString()));
 
-  // For nullable blocks, we filled in the row count from the null information 
above,
-  // since the data block decoder only knows about the non-null values.
-  // For non-nullable ones, we use the information from the block decoder.
-  if (!reader_->is_nullable()) {
+  // For nullable and array data blocks, the row count information is set
+  // in the code above since the data block decoder only knows about the
+  // non-null values. For non-nullable data blocks, the correspondind
+  // information is provided by the block decoder.
+  if (!reader_->is_nullable() && !reader_->is_array()) {
     num_rows_in_block = prep_block->dblk_->Count();
   }
 
@@ -1116,6 +1231,10 @@ Status CFileIterator::FinishBatch() {
 }
 
 Status CFileIterator::Scan(ColumnMaterializationContext* ctx) {
+  // Utility block memory for reading in array data, if any.
+  unique_ptr<RowBlockMemory> array_rbm(
+      reader_->is_array() ? new RowBlockMemory : nullptr);
+
   DCHECK(seeked_) << "not seeked";
 
   // Use views to advance the block and selection vector as we read into them.
@@ -1147,12 +1266,137 @@ Status 
CFileIterator::Scan(ColumnMaterializationContext* ctx) {
       // that might be more efficient (allowing the decoder to save internal 
state
       // instead of having to reconstruct it)
     }
-    if (reader_->is_nullable()) {
+    if (reader_->is_array()) {
+      DCHECK(ctx->block()->type_info()->is_array());
+      DCHECK_GE(pb->num_rows_in_block_, pb->idx_in_block_);
+      const auto& start_indices = pb->array_start_indices_;
+      size_t cur_flattened_idx = 0;
+
+      ssize_t count = std::min(rem, pb->num_rows_in_block_ - 
pb->idx_in_block_);
+      while (count > 0) {
+        // Procesing one array at a time, peeking into the next array's start
+        // index in the flattened sequence to know how many elements to read
+        // from the flattened sequence.
+        const auto idx = pb->idx_in_block_;
+        DCHECK_LE(idx, start_indices.size());
+
+        const uint32_t flattened_idx = start_indices[idx];
+        const uint32_t flattened_next_idx =
+            (idx + 1) == start_indices.size() ? 
pb->num_flattened_elems_in_block_
+                                              : start_indices[idx + 1];
+        if (PREDICT_FALSE(flattened_next_idx < flattened_idx)) {
+          static constexpr const char* const kErrMsg =
+              "non-monotonous array start indices";
+          LOG(DFATAL) << kErrMsg;
+          return Status::Corruption(kErrMsg);
+        }
+        if (PREDICT_FALSE(flattened_idx > pb->num_flattened_elems_in_block_ ||
+                          flattened_next_idx > 
pb->num_flattened_elems_in_block_)) {
+          // Trailing elements of the array start indices may have be set
+          // to pb->num_flattened_elems_in_block_ if the corresponding trailing
+          // arrays are empty or null, but they should never be greater than
+          // the total number of elements in the flattened sequence.
+          static constexpr const char* const kErrMsg =
+              "out-of-bounds array start index";
+          LOG(DFATAL) << kErrMsg;
+          return Status::Corruption(kErrMsg);
+        }
+
+        bool array_not_null = false;
+        const size_t array_run_len = pb->array_rle_decoder_.GetNextRun(
+            &array_not_null, 1);
+        if (PREDICT_FALSE(array_run_len != 1)) {
+          static constexpr const char* const kErrMsg =
+              "unexpected end of array non-null bitmap";
+          LOG(DFATAL) << kErrMsg;
+          return Status::Corruption(kErrMsg);
+        }
+        ColumnDataView* dst = &remaining_dst;
+        if (array_not_null) {
+          const auto* ati = reader_->type_info();
+          DCHECK(ati->is_array());
+          const TypeInfo* elem_type_info = 
ati->nested_type_info()->array().elem_type_info();
+          DCHECK(elem_type_info);
+
+          if (flattened_idx == flattened_next_idx) {
+            // An empty array cell: no need to fetch any data.
+            memset(dst->data(), 0, sizeof(Slice));
+          } else {
+            // A non-empty array cell: need to fetch its elements from
+            // the flattened sequence.
+
+            const size_t array_elems_to_fetch = flattened_next_idx - 
flattened_idx;
+            // TODO(aserbin): is it possible to reuse array_rbm or the column
+            //                block's memory for this?
+            // TODO(aserbin): allocate it once for the whole block?
+            unique_ptr<uint8_t[]> cblock_data(
+                new uint8_t[array_elems_to_fetch * elem_type_info->size()]);
+            unique_ptr<uint8_t[]> cblock_not_null_bitmap(
+                new uint8_t[BitmapSize(array_elems_to_fetch)]);
+
+            // Column block to help with reading in array elements for current
+            // array cell.
+            ColumnBlock cblock(
+                elem_type_info,
+                cblock_not_null_bitmap.get(),
+                cblock_data.get(),
+                array_elems_to_fetch,
+                array_rbm.get());
+            ColumnDataView cdv(&cblock);
+
+            bool not_null = false;
+            size_t num_fetched = 0;
+            while (num_fetched + flattened_idx < flattened_next_idx) {
+              size_t run_len = pb->rle_decoder_.GetNextRun(
+                  &not_null, flattened_next_idx - flattened_idx - num_fetched);
+              if (PREDICT_FALSE(run_len == 0)) {
+                static constexpr const char* const kErrMsg =
+                    "unexpected end of flattened non-null bitmap";
+                LOG(DFATAL) << kErrMsg;
+                return Status::Corruption(kErrMsg);
+              }
+
+              size_t this_batch = run_len;
+              if (not_null) {
+                RETURN_NOT_OK(pb->dblk_->CopyNextValues(&this_batch, &cdv));
+                DCHECK_EQ(run_len, this_batch);
+                pb->needs_rewind_ = true;
+              }
+              cdv.SetNonNullBits(this_batch, not_null);
+              cdv.Advance(this_batch);
+
+              num_fetched += run_len;
+              // Flattened element count includes null elements.
+              cur_flattened_idx += run_len;
+            }
+
+            // Serialize the read data into dst->arena().
+            Slice cell_out;
+            RETURN_NOT_OK(SerializeIntoArena(
+                elem_type_info,
+                cblock_data.get(),
+                cblock_not_null_bitmap.get(),
+                cblock.nrows(),
+                dst->arena(),
+                &cell_out));
+            DCHECK_EQ(ati->size(), sizeof(Slice));
+            DCHECK_GT(dst->nrows(), 0);
+            memcpy(dst->data(), &cell_out, sizeof(Slice));
+          }
+        }
+        dst->SetNonNullBits(1, array_not_null);
+        dst->Advance(1);
+        remaining_sel.Advance(1);
+        pb->idx_in_block_++;
+        --rem;
+        --count;
+      }
+      DCHECK_LE(cur_flattened_idx, pb->num_flattened_elems_in_block_);
+    } else if (reader_->is_nullable()) {
       DCHECK(ctx->block()->is_nullable());
+      DCHECK_GE(pb->num_rows_in_block_, pb->idx_in_block_);
 
-      size_t nrows = std::min(rem, pb->num_rows_in_block_ - pb->idx_in_block_);
-      // Fill column bitmap
-      size_t count = nrows;
+      ssize_t count = std::min(rem, pb->num_rows_in_block_ - 
pb->idx_in_block_);
       while (count > 0) {
         bool not_null = false;
         size_t nblock = pb->rle_decoder_.GetNextRun(&not_null, count);
diff --git a/src/kudu/cfile/cfile_reader.h b/src/kudu/cfile/cfile_reader.h
index f6906b2f7..eafa01c49 100644
--- a/src/kudu/cfile/cfile_reader.h
+++ b/src/kudu/cfile/cfile_reader.h
@@ -144,6 +144,10 @@ class CFileReader {
     return footer().is_type_nullable();
   }
 
+  bool is_array() const {
+    return footer().is_type_array();
+  }
+
   const CFileHeaderPB& header() const {
     DCHECK(init_once_.init_succeeded());
     return *DCHECK_NOTNULL(header_.get());
@@ -439,13 +443,25 @@ class CFileIterator final : public ColumnIterator {
     bool needs_rewind_;
     uint32_t rewind_idx_;
 
-    // Total number of rows in the block (nulls + not nulls)
+    // Total number of rows in the block (nulls + not nulls).
     uint32_t num_rows_in_block_;
 
-    // Null bitmap and bitmap (RLE) decoder
+    // Non-null bitmap and bitmap (RLE) decoder.
+    // This is a bitmap for the flattened/concatenated element sequence
+    // for array data blocks.
     Slice rle_bitmap;
     RleDecoder<bool, 1> rle_decoder_;
 
+    // Array non-null bitmap and bitmap (RLE) decoder
+    Slice array_rle_bitmap_;
+    RleDecoder<bool, 1> array_rle_decoder_;
+
+    // Array start indices (used only for array data blocks).
+    std::vector<uint32_t> array_start_indices_;
+
+    // Total number of values in the 'flattened' sequence in an array block.
+    uint32_t num_flattened_elems_in_block_ = 0;
+
     rowid_t last_row_idx() const {
       return first_row_idx() + num_rows_in_block_ - 1;
     }
diff --git a/src/kudu/cfile/cfile_util.h b/src/kudu/cfile/cfile_util.h
index 74b6e8f7b..2ec873bc3 100644
--- a/src/kudu/cfile/cfile_util.h
+++ b/src/kudu/cfile/cfile_util.h
@@ -47,7 +47,10 @@ enum IncompatibleFeatures {
   // Write a crc32 checksum at the end of each cfile block
   CHECKSUM = 1 << 0,
 
-  SUPPORTED = NONE | CHECKSUM
+  // Support for reading/writing array data blocks
+  ARRAY_DATA_BLOCK = 1 << 1,
+
+  SUPPORTED = NONE | CHECKSUM | ARRAY_DATA_BLOCK
 };
 
 typedef std::function<void(const void*, faststring*)> ValidxKeyEncoder;
diff --git a/src/kudu/cfile/cfile_writer.cc b/src/kudu/cfile/cfile_writer.cc
index 433d70f3e..c23d5b3a4 100644
--- a/src/kudu/cfile/cfile_writer.cc
+++ b/src/kudu/cfile/cfile_writer.cc
@@ -17,6 +17,8 @@
 
 #include "kudu/cfile/cfile_writer.h"
 
+#include <sys/types.h>
+
 #include <functional>
 #include <iterator>
 #include <numeric>
@@ -34,6 +36,7 @@
 #include "kudu/cfile/cfile_util.h"
 #include "kudu/cfile/index_btree.h"
 #include "kudu/cfile/type_encodings.h"
+#include "kudu/common/array_cell_view.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/common/key_encoder.h"
 #include "kudu/common/schema.h"
@@ -63,6 +66,10 @@ DEFINE_bool(cfile_write_checksums, true,
             "Write CRC32 checksums for each block");
 TAG_FLAG(cfile_write_checksums, evolving);
 
+DEFINE_bool(cfile_support_arrays, false,
+            "Support encoding/decoding of arrays in CFile data blocks");
+TAG_FLAG(cfile_support_arrays, evolving);
+
 using google::protobuf::RepeatedPtrField;
 using kudu::fs::BlockCreationTransaction;
 using kudu::fs::BlockManager;
@@ -82,6 +89,9 @@ const size_t kChecksumSize = sizeof(uint32_t);
 
 static const size_t kMinBlockSize = 512;
 
+// TODO(aserbin): source this from a flag?
+static const size_t kMaxElementsInArray = 1024;
+
 class NonNullBitmapBuilder {
  public:
   explicit NonNullBitmapBuilder(size_t initial_row_capacity)
@@ -117,6 +127,42 @@ class NonNullBitmapBuilder {
   RleEncoder<bool, 1> rle_encoder_;
 };
 
+class ArrayElemNumBuilder {
+ public:
+  explicit ArrayElemNumBuilder(size_t initial_row_capacity)
+      : nitems_(0),
+        // TODO(aserbin): improve or remove the estimate for the buffer size
+        buffer_(initial_row_capacity * sizeof(uint32_t)),
+        rle_encoder_(&buffer_) {
+  }
+
+  size_t nitems() const {
+    return nitems_;
+  }
+
+  // If value parameter is true, it means that all values in this run are null
+  void Add(uint32_t elem_num) {
+    ++nitems_;
+    rle_encoder_.Put(elem_num, 1);
+  }
+
+  // NOTE: the returned Slice is only valid until this Builder is destroyed or 
Reset
+  Slice Finish() {
+    int len = rle_encoder_.Flush();
+    return Slice(buffer_.data(), len);
+  }
+
+  void Reset() {
+    nitems_ = 0;
+    rle_encoder_.Clear();
+  }
+
+ private:
+  size_t nitems_;
+  faststring buffer_;
+  RleEncoder<uint16_t, 16> rle_encoder_;
+};
+
 ////////////////////////////////////////////////////////////
 // CFileWriter
 ////////////////////////////////////////////////////////////
@@ -132,6 +178,7 @@ CFileWriter::CFileWriter(WriterOptions options,
       value_count_(0),
       is_nullable_(is_nullable),
       typeinfo_(typeinfo),
+      is_array_(typeinfo->is_array()),
       state_(kWriterInitialized) {
   const EncodingType encoding = options_.storage_attributes.encoding;
   if (auto s = TypeEncodingInfo::Get(typeinfo_, encoding, 
&type_encoding_info_);
@@ -183,6 +230,13 @@ Status CFileWriter::Start() {
   TRACE_EVENT0("cfile", "CFileWriter::Start");
   DCHECK(state_ == kWriterInitialized) << "bad state for Start(): " << state_;
 
+  if (PREDICT_FALSE(is_array_ && !FLAGS_cfile_support_arrays)) {
+    static const auto kErrStatus = Status::ConfigurationError(
+        "support for array data blocks is disabled");
+    LOG(DFATAL) << kErrStatus.ToString();
+    return kErrStatus;
+  }
+
   if (compression_ != NO_COMPRESSION) {
     const CompressionCodec* codec;
     RETURN_NOT_OK(GetCompressionCodec(compression_, &codec));
@@ -214,9 +268,19 @@ Status CFileWriter::Start() {
   RETURN_NOT_OK_PREPEND(WriteRawData(header_slices), "Couldn't write header");
   data_block_ = type_encoding_info_->CreateBlockBuilder(&options_);
 
-  if (is_nullable_) {
-    size_t nrows = ((options_.storage_attributes.cfile_block_size + 
typeinfo_->size() - 1) /
-                    typeinfo_->size());
+  if (is_array_) {
+    // Array data blocks allows nullable elements both for array's elements
+    // and for arrays cells themselves.
+    size_t nrows =
+        (options_.storage_attributes.cfile_block_size + typeinfo_->size() - 1) 
/
+        typeinfo_->size();
+    array_non_null_bitmap_builder_.reset(new NonNullBitmapBuilder(nrows));
+    non_null_bitmap_builder_.reset(new NonNullBitmapBuilder(nrows * 
kMaxElementsInArray));
+    array_elem_num_builder_.reset(new ArrayElemNumBuilder(nrows));
+  } else if (is_nullable_) {
+    size_t nrows =
+        (options_.storage_attributes.cfile_block_size + typeinfo_->size() - 1) 
/
+        typeinfo_->size();
     non_null_bitmap_builder_.reset(new NonNullBitmapBuilder(nrows * 8));
   }
 
@@ -237,19 +301,40 @@ Status 
CFileWriter::FinishAndReleaseBlock(BlockCreationTransaction* transaction)
   TRACE_EVENT0("cfile", "CFileWriter::FinishAndReleaseBlock");
   DCHECK(state_ == kWriterWriting) << "Bad state for Finish(): " << state_;
 
-  // Write out any pending values as the last data block.
-  RETURN_NOT_OK(FinishCurDataBlock());
-
-  state_ = kWriterFinished;
-
-  uint32_t incompatible_features = 0;
+  uint32_t incompatible_features = IncompatibleFeatures::NONE;
   if (FLAGS_cfile_write_checksums) {
     incompatible_features |= IncompatibleFeatures::CHECKSUM;
   }
+  if (is_array_) {
+    incompatible_features |= IncompatibleFeatures::ARRAY_DATA_BLOCK;
+  }
+
+  // Write out any pending values as the last data block.
+  if (is_array_) {
+    RETURN_NOT_OK(FinishCurArrayDataBlock());
+  } else {
+    RETURN_NOT_OK(FinishCurDataBlock());
+  }
+
+  state_ = kWriterFinished;
 
   // Start preparing the footer.
   CFileFooterPB footer;
-  footer.set_data_type(typeinfo_->type());
+  if (is_array_) {
+    // For 1D arrays, the 'data_type' field is set to reflect the type
+    // of the array's elements. Elements of an array can be nullable.
+    const auto* desc = typeinfo_->nested_type_info();
+    DCHECK(desc);
+    DCHECK(desc->is_array());
+    DCHECK(desc->array().elem_type_info());
+    footer.set_data_type(desc->array().elem_type_info()->type());
+    footer.set_is_type_array(true);
+  } else {
+    footer.set_data_type(typeinfo_->type());
+    // NOTE: leaving the 'is_type_array' field as unset: semantically it's the
+    //       same as if it were set to 'false', but the result CFile footer
+    //       would be a few bytes larger if explicitly setting the field
+  }
   footer.set_is_type_nullable(is_nullable_);
   footer.set_encoding(type_encoding_info_->encoding_type());
   footer.set_num_values(value_count_);
@@ -327,6 +412,7 @@ void 
CFileWriter::FlushMetadataToPB(RepeatedPtrField<FileMetadataPairPB>* field)
 
 Status CFileWriter::AppendEntries(const void* entries, size_t count) {
   DCHECK(!is_nullable_);
+  DCHECK(!is_array_);
 
   int rem = count;
 
@@ -353,6 +439,7 @@ Status CFileWriter::AppendNullableEntries(const uint8_t* 
bitmap,
                                           const void* entries,
                                           size_t count) {
   DCHECK(is_nullable_ && bitmap != nullptr);
+  DCHECK(!is_array_);
 
   const uint8_t* ptr = reinterpret_cast<const uint8_t*>(entries);
 
@@ -386,7 +473,124 @@ Status CFileWriter::AppendNullableEntries(const uint8_t* 
bitmap,
   return Status::OK();
 }
 
+Status CFileWriter::AppendNullableArrayEntries(const uint8_t* bitmap,
+                                               const void* entries,
+                                               size_t count) {
+  DCHECK_EQ(DataType::NESTED, typeinfo_->type());
+  DCHECK_EQ(sizeof(Slice), typeinfo_->size());
+
+  // For 1D arrays, get information on the elements.
+  const auto* desc = typeinfo_->nested_type_info();
+  DCHECK(desc);
+  DCHECK(desc->is_array());
+  // For 1D arrays, the encoder is chosen based on the type of the array's
+  // elements.
+  const auto* const elem_type_info = desc->array().elem_type_info();
+  DCHECK(elem_type_info);
+  const size_t elem_size = elem_type_info->size();
+
+  const Slice* cells_ptr = reinterpret_cast<const Slice*>(entries);
+  BitmapIterator cell_bitmap_it(bitmap, count);
+  size_t cur_cell_idx = 0;
+
+  size_t cells_num = 0;
+  bool cell_is_not_null = false;
+  while ((cells_num = cell_bitmap_it.Next(&cell_is_not_null)) > 0) {
+    if (!cell_is_not_null) {
+      // This is a run of null array-type cells.
+
+      value_count_ += cells_num;
+      array_non_null_bitmap_builder_->AddRun(false, cells_num);
+      for (size_t i = 0; i < cells_num; ++i) {
+        array_elem_num_builder_->Add(0);
+      }
+
+#if DCHECK_IS_ON()
+      // TODO(aserbin): re-enable this extra validation after 
SetNull()/set_null()
+      //                updates the underlying Slice for previously stored
+      //                non-null cell's data in ContiguousRow and elsewhere
+      //                if DCHECK_IS_ON()
+      //const Slice* cell = cells_ptr;
+      //DCHECK(cell->empty());
+#endif // if DCHECK_IS_ON() ...
+
+      cells_ptr += cells_num;
+      cur_cell_idx += cells_num;
+    } else {
+      // This is a run of non-null array-type cells.
+      for (size_t i = 0; i < cells_num; ++i, ++cur_cell_idx, ++cells_ptr) {
+        ++value_count_;
+        array_non_null_bitmap_builder_->AddRun(true, 1);
+
+        // Information on validity of the elements in the in-cell array.
+        const Slice* cell = cells_ptr;
+        DCHECK(cell);
+        ArrayCellMetadataView view(cell->data(), cell->size());
+        RETURN_NOT_OK(view.Init());
+
+        // Add information on the array's elements boundary
+        // in the flattened sequence.
+        const size_t cell_elem_num = view.elem_num();
+        array_elem_num_builder_->Add(cell_elem_num);
+        if (cell_elem_num == 0) {
+          // Current cell contains an empty array.
+          //DCHECK(!cell->data_);
+          //DCHECK(!cell->non_null_bitmap_);
+          continue;
+        }
+
+        const uint8_t* cell_non_null_bitmap = view.not_null_bitmap();
+        DCHECK(cell_non_null_bitmap);
+        BitmapIterator elem_bitmap_iter(cell_non_null_bitmap,
+                                        cell_elem_num);
+        const uint8_t* data = view.data_as(elem_type_info->type());
+        DCHECK(data);
+
+        // Mask the 'block is full' while writing a single array.
+        data_block_->SetBlockFullMasked(true);
+        size_t elem_num = 0;
+        bool elem_is_non_null = false;
+        while ((elem_num = elem_bitmap_iter.Next(&elem_is_non_null)) > 0) {
+          if (!elem_is_non_null) {
+            // Add info on the run of 'elem_num' null elements in the array.
+            non_null_bitmap_builder_->AddRun(false, elem_num);
+            // Skip over the null elements in the input data.
+            data += elem_num * elem_size;
+            continue;
+          }
+
+          // A run of 'elem_num' non-null elements in the array.
+          ssize_t elem_rem = elem_num;
+          do {
+            int n = data_block_->Add(data, elem_rem);
+            DCHECK_GT(n, 0);
+
+            non_null_bitmap_builder_->AddRun(true, n);
+            data += n * elem_size;
+            elem_rem -= n;
+          } while (elem_rem > 0);
+        }
+        // Unmask the 'block is full' logic, so the logic below works
+        // as necessary.
+        data_block_->SetBlockFullMasked(false);
+
+        // If the current block is full, switch to a new one.
+        if (data_block_->IsBlockFull()) {
+          // NOTE: with long arrays the block may get quite beyond the size
+          // threshold before switching to the next one.
+          RETURN_NOT_OK(FinishCurArrayDataBlock());
+        }
+      }
+    }
+  }
+
+  DCHECK_EQ(count, cur_cell_idx);
+
+  return Status::OK();
+}
+
 Status CFileWriter::FinishCurDataBlock() {
+  DCHECK(!is_array_);
   const uint32_t num_elems_in_block =
       is_nullable_ ? non_null_bitmap_builder_->nitems() : data_block_->Count();
   if (PREDICT_FALSE(num_elems_in_block == 0)) {
@@ -444,6 +648,100 @@ Status CFileWriter::FinishCurDataBlock() {
   return s;
 }
 
+Status CFileWriter::FinishCurArrayDataBlock() {
+  DCHECK(!validx_builder_); // array-type column cannot be a part of primary 
key
+
+  // Number of array cells in the block.
+  const uint32_t num_arrays_in_block = 
array_non_null_bitmap_builder_->nitems();
+  const uint32_t num_elems_in_block = non_null_bitmap_builder_->nitems();
+  if (PREDICT_FALSE(num_arrays_in_block == 0)) {
+    DCHECK_EQ(0, num_elems_in_block);
+    return Status::OK();
+  }
+
+  DCHECK_GE(value_count_, num_arrays_in_block);
+  rowid_t first_array_ord = value_count_ - num_arrays_in_block;
+  VLOG(1) << "Appending nullable array data block for values "
+          << first_array_ord << "-" << value_count_;
+
+  // The current data block is full, need to push it into the file.
+  Status s;
+  {
+    vector<Slice> data_slices;
+    data_block_->Finish(first_array_ord, &data_slices);
+
+    // A nullable array data block has the following layout
+    // (see docs/design-docs/cfile.md for more details):
+    //
+    // flattened value count          : unsigned [LEB128] encoded count of 
values
+    //
+    // array count                    : unsigned [LEB128] encoded count of 
arrays
+    //
+    // flattened non-null bitmap size : unsigned [LEB128] encoded size
+    //                                  of the following non-null bitmap
+    //
+    // flattened non-null bitmap      : [RLE] encoded bitmap
+    //
+    // array element numbers size     : unsigned [LEB128] encoded size of the
+    //                                  following field
+    //
+    // array element numbers          : [RLE] encoded sequence of 16-bit
+    //                                  unsigned integers
+    //
+    // array non-null bitmap size     : unsigned [LEB128] encoded size of the
+    //                                  following non-null bitmap
+    //
+    // array non-null bitmap          : [RLE] encoded bitmap on non-nullness
+    //                                  of array cells
+    //
+    // data                           : encoded non-null data values
+    //                                  (a.k.a. 'flattened sequence of 
elements')
+    vector<Slice> v;
+    v.reserve(data_slices.size() + 6);
+
+    const Slice flattened_non_null_bitmap = non_null_bitmap_builder_->Finish();
+    faststring array_headers_0;
+    PutVarint32(&array_headers_0, num_elems_in_block);
+    PutVarint32(&array_headers_0, array_elem_num_builder_->nitems());
+    PutVarint32(&array_headers_0, 
static_cast<uint32_t>(flattened_non_null_bitmap.size()));
+    v.emplace_back(array_headers_0.data(), array_headers_0.size());
+    if (!flattened_non_null_bitmap.empty()) {
+      v.emplace_back(flattened_non_null_bitmap);
+    }
+
+    const Slice array_elem_num_encoded = array_elem_num_builder_->Finish();
+    faststring array_headers_1;
+    PutVarint32(&array_headers_1, 
static_cast<uint32_t>(array_elem_num_encoded.size()));
+    v.emplace_back(array_headers_1.data(), array_headers_1.size());
+    if (!array_elem_num_encoded.empty()) {
+      v.emplace_back(array_elem_num_encoded);
+    }
+
+    const Slice array_non_null_bitmap = 
array_non_null_bitmap_builder_->Finish();
+    DCHECK(!array_non_null_bitmap.empty());
+    faststring array_headers_2;
+    PutVarint32(&array_headers_2, 
static_cast<uint32_t>(array_non_null_bitmap.size()));
+    v.emplace_back(array_headers_2.data(), array_headers_2.size());
+    v.emplace_back(array_non_null_bitmap);
+
+    std::move(data_slices.begin(), data_slices.end(), std::back_inserter(v));
+    s = AppendRawBlock(std::move(v),
+                       first_array_ord,
+                       nullptr,
+                       Slice(last_key_),
+                       "array data block");
+  }
+
+  // Reset per-block state.
+  non_null_bitmap_builder_->Reset();
+  array_non_null_bitmap_builder_->Reset();
+  array_elem_num_builder_->Reset();
+
+  data_block_->Reset();
+
+  return s;
+}
+
 Status CFileWriter::AppendRawBlock(vector<Slice> data_slices,
                                    size_t ordinal_pos,
                                    const void* validx_curr,
diff --git a/src/kudu/cfile/cfile_writer.h b/src/kudu/cfile/cfile_writer.h
index f96ec27cf..e3fe23eda 100644
--- a/src/kudu/cfile/cfile_writer.h
+++ b/src/kudu/cfile/cfile_writer.h
@@ -52,6 +52,7 @@ extern const char kMagicStringV2[];
 extern const int kMagicLength;
 extern const size_t kChecksumSize;
 
+class ArrayElemNumBuilder;
 class NonNullBitmapBuilder;
 
 // Main class used to write a CFile.
@@ -97,6 +98,23 @@ class CFileWriter final {
   // 'entries' still will have 10 elements in it
   Status AppendNullableEntries(const uint8_t* bitmap, const void* entries, 
size_t count);
 
+  // Similar to AppendNullableEntries above, but for appending array-type
+  // column blocks.
+  //
+  // The 'entries' is a pointer to a C-style array of Slice elements,
+  // where each Slice element represents a cell of an array-type column.
+  // The 'entries' may contain NULL array cells as well, and the validity
+  // of a cell (i.e. whether it's a non-NULL cell) is determined by
+  // corresponding bit in 'bitmap': 1 means that the cell contains an array
+  // (NOTE: the array may be empty, i.e. contain no elements), and 0 means
+  // the cell is nil (NULL).
+  //
+  // The information on the validity of elemenents in each of the array cells
+  // is encoded in the cell's data.
+  Status AppendNullableArrayEntries(const uint8_t* bitmap,
+                                    const void* entries,
+                                    size_t count);
+
   // Append a raw block to the file, adding it to the various indexes.
   //
   // The Slices in 'data_slices' are concatenated to form the block.
@@ -153,6 +171,7 @@ class CFileWriter final {
   Status WriteRawData(const std::vector<Slice>& data);
 
   Status FinishCurDataBlock();
+  Status FinishCurArrayDataBlock();
 
   // Flush the current unflushed_metadata_ entries into the given protobuf
   // field, clearing the buffer.
@@ -166,7 +185,8 @@ class CFileWriter final {
   // Current file offset.
   uint64_t off_;
 
-  // Current number of values that have been appended.
+  // Current number of values that have been appended. It's accumulated
+  // across all the blocks that are to be written into this CFile.
   rowid_t value_count_;
 
   // Type of data being written
@@ -174,6 +194,7 @@ class CFileWriter final {
   CompressionType compression_;
   const TypeInfo* typeinfo_;
   const TypeEncodingInfo* type_encoding_info_;
+  const bool is_array_;
 
   // The last key written to the block.
   // Only set if the writer is writing an embedded value index.
@@ -189,6 +210,8 @@ class CFileWriter final {
   std::unique_ptr<IndexTreeBuilder> posidx_builder_;
   std::unique_ptr<IndexTreeBuilder> validx_builder_;
   std::unique_ptr<NonNullBitmapBuilder> non_null_bitmap_builder_;
+  std::unique_ptr<NonNullBitmapBuilder> array_non_null_bitmap_builder_;
+  std::unique_ptr<ArrayElemNumBuilder> array_elem_num_builder_;
   std::unique_ptr<CompressedBlockBuilder> block_compressor_;
 
   enum State {
diff --git a/src/kudu/cfile/plain_bitmap_block.h 
b/src/kudu/cfile/plain_bitmap_block.h
index 561c852d7..61fe64bf9 100644
--- a/src/kudu/cfile/plain_bitmap_block.h
+++ b/src/kudu/cfile/plain_bitmap_block.h
@@ -50,7 +50,7 @@ class PlainBitMapBlockBuilder final : public BlockBuilder {
     Reset();
   }
 
-  bool IsBlockFull() const override {
+  bool IsBlockFullImpl() const override {
     return writer_.bytes_written() > 
options_->storage_attributes.cfile_block_size;
   }
 
diff --git a/src/kudu/cfile/plain_block.h b/src/kudu/cfile/plain_block.h
index b8a5e0fff..84e2d1779 100644
--- a/src/kudu/cfile/plain_block.h
+++ b/src/kudu/cfile/plain_block.h
@@ -64,7 +64,7 @@ class PlainBlockBuilder final : public BlockBuilder {
     return count;
   }
 
-  bool IsBlockFull() const override {
+  bool IsBlockFullImpl() const override {
     return buffer_.size() > options_->storage_attributes.cfile_block_size;
   }
 
diff --git a/src/kudu/cfile/rle_block.h b/src/kudu/cfile/rle_block.h
index 8930abcaa..6872e49ac 100644
--- a/src/kudu/cfile/rle_block.h
+++ b/src/kudu/cfile/rle_block.h
@@ -67,7 +67,7 @@ class RleBitMapBlockBuilder final : public BlockBuilder {
     return count;
   }
 
-  bool IsBlockFull() const override {
+  bool IsBlockFullImpl() const override {
     return encoder_.len() > options_->storage_attributes.cfile_block_size;
   }
 
@@ -232,7 +232,7 @@ class RleIntBlockBuilder final : public BlockBuilder {
     Reset();
   }
 
-  bool IsBlockFull() const override {
+  bool IsBlockFullImpl() const override {
     return rle_encoder_.len() > options_->storage_attributes.cfile_block_size;
   }
 
diff --git a/src/kudu/common/columnblock-test-util.h 
b/src/kudu/common/columnblock-test-util.h
index d67a6bff4..7430ccbef 100644
--- a/src/kudu/common/columnblock-test-util.h
+++ b/src/kudu/common/columnblock-test-util.h
@@ -16,7 +16,9 @@
 // under the License.
 #pragma once
 
+#include <cstdint>
 #include <memory>
+#include <type_traits>
 
 #include "kudu/gutil/macros.h"
 
@@ -31,17 +33,18 @@ namespace kudu {
 //
 // This is more useful in test code than production code,
 // since it doesn't allocate from an arena, etc.
-template<DataType type>
+template<DataType type, bool IS_ARRAY = false>
 class ScopedColumnBlock : public ColumnBlock {
  public:
-  typedef typename TypeTraits<type>::cpp_type cpp_type;
+  typedef typename std::conditional<
+       IS_ARRAY, Slice, typename TypeTraits<type>::cpp_type>::type cpp_type;
 
   explicit ScopedColumnBlock(size_t n_rows, bool allow_nulls = true)
-      : ColumnBlock(GetTypeInfo(type),
+      : ColumnBlock(IS_ARRAY ? GetArrayTypeInfo(type) : GetTypeInfo(type),
                     allow_nulls ? new uint8_t[BitmapSize(n_rows)] : nullptr,
                     new cpp_type[n_rows],
                     n_rows,
-                    new RowBlockMemory()),
+                    new RowBlockMemory),
         non_null_bitmap_buf_(non_null_bitmap_),
         data_buf_(reinterpret_cast<cpp_type*>(data_)),
         memory_buf_(memory_) {
diff --git a/src/kudu/common/columnblock.h b/src/kudu/common/columnblock.h
index 7130e94df..75a8b6b5f 100644
--- a/src/kudu/common/columnblock.h
+++ b/src/kudu/common/columnblock.h
@@ -248,6 +248,7 @@ class ColumnDataView {
     // Check <= here, not <, since you can skip to
     // the very end of the data (leaving an empty block)
     DCHECK_LE(skip, column_block_->nrows());
+    DCHECK_LE(row_offset_ + skip, column_block_->nrows());
     row_offset_ += skip;
   }
 
diff --git a/src/kudu/common/rowblock.h b/src/kudu/common/rowblock.h
index 42edfb877..9d741b08b 100644
--- a/src/kudu/common/rowblock.h
+++ b/src/kudu/common/rowblock.h
@@ -451,6 +451,8 @@ class RowBlock final {
   // nrows_ <= row_capacity_
   size_t nrows_;
 
+  // Memory to store column of variable length (e.g., BINARY/STRING type or
+  // array columns).
   RowBlockMemory* memory_;
 
   // The bitmap indicating which rows are valid in this block.
diff --git a/src/kudu/common/rowblock_memory.h 
b/src/kudu/common/rowblock_memory.h
index 12fc5978c..3ad26eb37 100644
--- a/src/kudu/common/rowblock_memory.h
+++ b/src/kudu/common/rowblock_memory.h
@@ -31,7 +31,7 @@ class RowBlockRefCounted;
 // cells.
 //
 // When scanning rows into a RowBlock, the rows may contain variable-length
-// data (eg BINARY columns). In this case, the data cannot be inlined directly
+// data (e.g. BINARY columns or array columns). In this case, the data cannot 
be inlined directly
 // into the columnar data arrays that are part of the RowBlock and instead need
 // to be allocated out of a separate Arena. This class wraps that Arena.
 //
diff --git a/src/kudu/tablet/multi_column_writer.cc 
b/src/kudu/tablet/multi_column_writer.cc
index 1c12b6490..241f01cff 100644
--- a/src/kudu/tablet/multi_column_writer.cc
+++ b/src/kudu/tablet/multi_column_writer.cc
@@ -27,6 +27,7 @@
 #include "kudu/common/columnblock.h"
 #include "kudu/common/rowblock.h"
 #include "kudu/common/schema.h"
+#include "kudu/common/types.h"
 #include "kudu/fs/block_id.h"
 #include "kudu/fs/block_manager.h"
 #include "kudu/fs/fs_manager.h"
@@ -88,8 +89,10 @@ Status MultiColumnWriter::Open() {
     BlockId block_id(block->id());
 
     // Create the CFile writer itself.
-    unique_ptr<CFileWriter> writer(new CFileWriter(
-        std::move(opts), col.type_info(), col.is_nullable(), 
std::move(block)));
+    unique_ptr<CFileWriter> writer(new CFileWriter(std::move(opts),
+                                                   col.type_info(),
+                                                   col.is_nullable(),
+                                                   std::move(block)));
     RETURN_NOT_OK_PREPEND(writer->Start(),
         Substitute("tablet $0: unable to start writer for column $1",
                    tablet_id_, col.ToString()));
@@ -107,11 +110,16 @@ Status MultiColumnWriter::AppendBlock(const RowBlock& 
block) {
   DCHECK(open_);
   for (auto i = 0; i < schema_->num_columns(); ++i) {
     ColumnBlock column = block.column_block(i);
-    if (column.is_nullable()) {
-      
RETURN_NOT_OK(cfile_writers_[i]->AppendNullableEntries(column.non_null_bitmap(),
-          column.data(), column.nrows()));
-    } else {
+    if (!column.is_nullable()) {
       RETURN_NOT_OK(cfile_writers_[i]->AppendEntries(column.data(), 
column.nrows()));
+    } else {
+      if (column.type_info()->is_array()) {
+        RETURN_NOT_OK(cfile_writers_[i]->AppendNullableArrayEntries(
+            column.non_null_bitmap(), column.data(), column.nrows()));
+      } else {
+        RETURN_NOT_OK(cfile_writers_[i]->AppendNullableEntries(
+            column.non_null_bitmap(), column.data(), column.nrows()));
+      }
     }
   }
   return Status::OK();

Reply via email to