This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 9a1c19fd0 KUDU-1261 writing/reading of array data blocks
9a1c19fd0 is described below
commit 9a1c19fd0853aef54bdd52af69194ee43f2a33b6
Author: Alexey Serbin <[email protected]>
AuthorDate: Wed Apr 23 07:32:55 2025 -0700
KUDU-1261 writing/reading of array data blocks
The CFile reader and writer are updated to support reading and writing
nullable array data blocks as per the documented specification [1].
The new functionality is covered by newly added tests in cfile-test.cc.
Follow-up patches (such as [2]) contain end-to-end tests using Kudu
mini-cluster and Kudu C++ client.
With this patch, writing/reading of array data works end-to-end
of all the existing scalar types except for INT128/DECIMAL128
(the latter isn't supported by the serdes layer, but everything
else can support 128-bit integers).
Below is the list of caveats (TODOs) known at the time of writing:
* the DICTIONARY encoder for string/binaries needs updating to
allow for masking 'block is full' bit; as of this changelist
storing strings/binaries works with PLAIN and PREFIX_ENCODING,
but has edge cases where it's not working in DICTIONARY encoding
* memory management isn't yet optimal when reading array cells
in CFileIterator
* maximum number of elements in an array isn't yet configurable
* the configured maximum number of elements in an array isn't yet
enforced
* for future proofing (e.g., thinking of switching to Apache Arrow
format instead of Flatbuffers) it's necessary to add meta-info,
so the sender and the recipient can tell what format to use for
for exchanging the data of NESTED data types
* support for array data blocks needs to be explicitly enabled
by setting --cfile_support_arrays=true
[1] https://gerrit.cloudera.org/#/c/22058
[2] https://gerrit.cloudera.org/#/c/23220
Change-Id: I5825c939cb40d350a23a78609115f26e62cca270
Reviewed-on: http://gerrit.cloudera.org:8080/22868
Reviewed-by: Abhishek Chennaka <[email protected]>
Tested-by: Alexey Serbin <[email protected]>
---
src/kudu/cfile/binary_dict_block.cc | 8 +-
src/kudu/cfile/binary_dict_block.h | 2 +-
src/kudu/cfile/binary_plain_block.cc | 2 +-
src/kudu/cfile/binary_plain_block.h | 2 +-
src/kudu/cfile/binary_prefix_block.cc | 2 +-
src/kudu/cfile/binary_prefix_block.h | 2 +-
src/kudu/cfile/block_encodings.h | 29 +-
src/kudu/cfile/bshuf_block.h | 6 +-
src/kudu/cfile/cfile-test-base.h | 671 +++++++++++++++++++++++++-------
src/kudu/cfile/cfile-test.cc | 309 ++++++++++++++-
src/kudu/cfile/cfile_reader.cc | 266 ++++++++++++-
src/kudu/cfile/cfile_reader.h | 20 +-
src/kudu/cfile/cfile_util.h | 5 +-
src/kudu/cfile/cfile_writer.cc | 318 ++++++++++++++-
src/kudu/cfile/cfile_writer.h | 25 +-
src/kudu/cfile/plain_bitmap_block.h | 2 +-
src/kudu/cfile/plain_block.h | 2 +-
src/kudu/cfile/rle_block.h | 4 +-
src/kudu/common/columnblock-test-util.h | 11 +-
src/kudu/common/columnblock.h | 1 +
src/kudu/common/rowblock.h | 2 +
src/kudu/common/rowblock_memory.h | 2 +-
src/kudu/tablet/multi_column_writer.cc | 20 +-
23 files changed, 1509 insertions(+), 202 deletions(-)
diff --git a/src/kudu/cfile/binary_dict_block.cc
b/src/kudu/cfile/binary_dict_block.cc
index c860ef468..0c0c89746 100644
--- a/src/kudu/cfile/binary_dict_block.cc
+++ b/src/kudu/cfile/binary_dict_block.cc
@@ -68,7 +68,7 @@ BinaryDictBlockBuilder::BinaryDictBlockBuilder(const
WriterOptions* options)
void BinaryDictBlockBuilder::Reset() {
if (mode_ == kCodeWordMode &&
- dict_block_.IsBlockFull()) {
+ dict_block_.IsBlockFullImpl()) {
mode_ = kPlainBinaryMode;
data_builder_.reset(new BinaryPlainBlockBuilder(options_));
} else {
@@ -96,9 +96,9 @@ void BinaryDictBlockBuilder::Finish(rowid_t ordinal_pos,
vector<Slice>* slices)
//
// If it is the latter case, all the subsequent data blocks will switch to
// StringPlainBlock automatically.
-bool BinaryDictBlockBuilder::IsBlockFull() const {
- if (data_builder_->IsBlockFull()) return true;
- if (dict_block_.IsBlockFull() && (mode_ == kCodeWordMode)) return true;
+bool BinaryDictBlockBuilder::IsBlockFullImpl() const {
+ if (data_builder_->IsBlockFullImpl()) return true;
+ if (dict_block_.IsBlockFullImpl() && (mode_ == kCodeWordMode)) return true;
return false;
}
diff --git a/src/kudu/cfile/binary_dict_block.h
b/src/kudu/cfile/binary_dict_block.h
index c653be1e8..2e4e2ce9a 100644
--- a/src/kudu/cfile/binary_dict_block.h
+++ b/src/kudu/cfile/binary_dict_block.h
@@ -80,7 +80,7 @@ class BinaryDictBlockBuilder final : public BlockBuilder {
public:
explicit BinaryDictBlockBuilder(const WriterOptions* options);
- bool IsBlockFull() const override;
+ bool IsBlockFullImpl() const override;
// Append the dictionary block for the current cfile to the end of the cfile
and set the footer
// accordingly.
diff --git a/src/kudu/cfile/binary_plain_block.cc
b/src/kudu/cfile/binary_plain_block.cc
index 8d4f8d218..24c95c1b2 100644
--- a/src/kudu/cfile/binary_plain_block.cc
+++ b/src/kudu/cfile/binary_plain_block.cc
@@ -66,7 +66,7 @@ void BinaryPlainBlockBuilder::Reset() {
finished_ = false;
}
-bool BinaryPlainBlockBuilder::IsBlockFull() const {
+bool BinaryPlainBlockBuilder::IsBlockFullImpl() const {
return size_estimate_ > options_->storage_attributes.cfile_block_size;
}
diff --git a/src/kudu/cfile/binary_plain_block.h
b/src/kudu/cfile/binary_plain_block.h
index 8bea41a27..fe773749d 100644
--- a/src/kudu/cfile/binary_plain_block.h
+++ b/src/kudu/cfile/binary_plain_block.h
@@ -60,7 +60,7 @@ class BinaryPlainBlockBuilder final : public BlockBuilder {
public:
explicit BinaryPlainBlockBuilder(const WriterOptions* options);
- bool IsBlockFull() const override;
+ bool IsBlockFullImpl() const override;
int Add(const uint8_t* vals, size_t count) override;
diff --git a/src/kudu/cfile/binary_prefix_block.cc
b/src/kudu/cfile/binary_prefix_block.cc
index 0b053448c..1536315e7 100644
--- a/src/kudu/cfile/binary_prefix_block.cc
+++ b/src/kudu/cfile/binary_prefix_block.cc
@@ -96,7 +96,7 @@ void BinaryPrefixBlockBuilder::Reset() {
last_val_.clear();
}
-bool BinaryPrefixBlockBuilder::IsBlockFull() const {
+bool BinaryPrefixBlockBuilder::IsBlockFullImpl() const {
// TODO(todd): take restarts size into account
return buffer_.size() > options_->storage_attributes.cfile_block_size;
}
diff --git a/src/kudu/cfile/binary_prefix_block.h
b/src/kudu/cfile/binary_prefix_block.h
index afffcd107..7e4a91e11 100644
--- a/src/kudu/cfile/binary_prefix_block.h
+++ b/src/kudu/cfile/binary_prefix_block.h
@@ -47,7 +47,7 @@ class BinaryPrefixBlockBuilder final : public BlockBuilder {
public:
explicit BinaryPrefixBlockBuilder(const WriterOptions* options);
- bool IsBlockFull() const override;
+ bool IsBlockFullImpl() const override;
int Add(const uint8_t* vals, size_t count) override;
diff --git a/src/kudu/cfile/block_encodings.h b/src/kudu/cfile/block_encodings.h
index 62e7fba8c..0b94c24bb 100644
--- a/src/kudu/cfile/block_encodings.h
+++ b/src/kudu/cfile/block_encodings.h
@@ -39,7 +39,9 @@ class CFileWriter;
class BlockBuilder {
public:
- BlockBuilder() = default;
+ BlockBuilder()
+ : block_full_masked_(false) {
+ }
virtual ~BlockBuilder() = default;
// Append extra information to the end of the current cfile, for example:
@@ -53,7 +55,15 @@ class BlockBuilder {
// A block is full if it its estimated size is larger than the configured
// WriterOptions' cfile_block_size.
// If it is full, the cfile writer will call FinishCurDataBlock().
- virtual bool IsBlockFull() const = 0;
+ bool IsBlockFull() const {
+ if (block_full_masked_) {
+ return false;
+ }
+ return IsBlockFullImpl();
+ }
+
+ // The indication of the 'block is full' not affected by
SetBlockFullMasked().
+ virtual bool IsBlockFullImpl() const = 0;
// Add a sequence of values to the block.
// Returns the number of values actually added, which may be less
@@ -94,7 +104,22 @@ class BlockBuilder {
// If no keys have been added, returns Status::NotFound
virtual Status GetLastKey(void* key) const = 0;
+ // Mask the 'block is full' criterion. This is used when writing array data
+ // blocks: the contents of a single array cell cannot be split between
+ // different array data blocks. The cfile writer uses this to make sure the
+ // encoder isn't switching to a new block when writing data within
+ // a single array cell.
+ void SetBlockFullMasked(bool block_full_masked) {
+ block_full_masked_ = block_full_masked;
+ }
+
+ // Whether the 'block is full' is masked.
+ bool IsBlockFullMasked() const {
+ return block_full_masked_;
+ }
+
private:
+ bool block_full_masked_;
DISALLOW_COPY_AND_ASSIGN(BlockBuilder);
};
diff --git a/src/kudu/cfile/bshuf_block.h b/src/kudu/cfile/bshuf_block.h
index a4e8450e1..9fd218a7c 100644
--- a/src/kudu/cfile/bshuf_block.h
+++ b/src/kudu/cfile/bshuf_block.h
@@ -119,13 +119,13 @@ class BShufBlockBuilder final : public BlockBuilder {
rem_elem_capacity_ = block_size / size_of_type;
}
- bool IsBlockFull() const override {
- return rem_elem_capacity_ == 0;
+ bool IsBlockFullImpl() const override {
+ return rem_elem_capacity_ <= 0;
}
int Add(const uint8_t* vals_void, size_t count) override {
DCHECK(!finished_);
- int to_add = std::min<int>(rem_elem_capacity_, count);
+ int to_add = IsBlockFullMasked() ? count :
std::min<int>(rem_elem_capacity_, count);
data_.append(vals_void, to_add * size_of_type);
count_ += to_add;
rem_elem_capacity_ -= to_add;
diff --git a/src/kudu/cfile/cfile-test-base.h b/src/kudu/cfile/cfile-test-base.h
index 2f4b41d4a..cc637a115 100644
--- a/src/kudu/cfile/cfile-test-base.h
+++ b/src/kudu/cfile/cfile-test-base.h
@@ -21,6 +21,7 @@
#include <functional>
#include <memory>
#include <string>
+#include <type_traits>
#include <utility>
#include <vector>
@@ -29,6 +30,8 @@
#include "kudu/cfile/cfile.pb.h"
#include "kudu/cfile/cfile_reader.h"
#include "kudu/cfile/cfile_writer.h"
+#include "kudu/common/array_cell_view.h"
+#include "kudu/common/array_type_serdes.h"
#include "kudu/common/columnblock.h"
#include "kudu/common/columnblock-test-util.h"
#include "kudu/fs/fs_manager.h"
@@ -48,6 +51,28 @@ DEFINE_int32(cfile_test_block_size, 1024,
namespace kudu {
namespace cfile {
+// The true/false pattern alternates every 2^(N-1) consequitive integer
+// input values, cycling between:
+// 2^(N-1) true
+// 2^(N-1) alternating true/false
+// 2^(N-1) false
+// 2^(N-1) alternating true/false
+//
+// This is useful for stress-testing the run-length coding of NULL values
+// in data blocks.
+template<size_t N>
+bool IsNullAlternating(size_t n) {
+ switch ((n >> N) & 3) {
+ case 1:
+ case 3:
+ return n & 1;
+ case 2:
+ return false;
+ default:
+ return true;
+ }
+}
+
// Abstract test data generator.
// You must implement BuildTestValue() to return your test value.
// Usage example:
@@ -60,18 +85,22 @@ namespace cfile {
template<DataType DATA_TYPE, bool HAS_NULLS>
class DataGenerator {
public:
- static bool has_nulls() {
+ static constexpr bool has_nulls() {
return HAS_NULLS;
}
- static const DataType kDataType;
+ static constexpr bool is_array() {
+ return false;
+ }
+
+ static constexpr const DataType kDataType = DATA_TYPE;
typedef typename DataTypeTraits<DATA_TYPE>::cpp_type cpp_type;
- DataGenerator() :
- block_entries_(0),
- total_entries_(0)
- {}
+ DataGenerator()
+ : block_entries_(0),
+ total_entries_(0) {
+ }
virtual ~DataGenerator() = default;
@@ -105,25 +134,7 @@ class DataGenerator {
if (!HAS_NULLS) {
return false;
}
-
- // The NULL pattern alternates every 32 rows, cycling between:
- // 32 NULL
- // 32 alternating NULL/NOTNULL
- // 32 NOT-NULL
- // 32 alternating NULL/NOTNULL
- // This is to ensure that we stress the run-length coding for
- // NULL value.
- switch ((n >> 6) & 3) {
- case 0:
- return true;
- case 1:
- case 3:
- return n & 1;
- case 2:
- return false;
- default:
- LOG(FATAL);
- }
+ return IsNullAlternating<6>(n);
}
virtual void Resize(size_t num_entries) {
@@ -147,16 +158,13 @@ class DataGenerator {
return values_[index];
}
- private:
+ protected:
std::unique_ptr<cpp_type[]> values_;
std::unique_ptr<uint8_t[]> non_null_bitmap_;
size_t block_entries_;
size_t total_entries_;
};
-template<DataType DATA_TYPE, bool HAS_NULLS>
-const DataType DataGenerator<DATA_TYPE, HAS_NULLS>::kDataType = DATA_TYPE;
-
template<bool HAS_NULLS>
class UInt8DataGenerator : public DataGenerator<UINT8, HAS_NULLS> {
public:
@@ -336,6 +344,380 @@ class RandomInt32DataGenerator : public
DataGenerator<INT32, /* HAS_NULLS= */ fa
}
};
+// Generic data generation interface for array cells. It's supposed to be
+// pluggable with minimal constexpr-eval style changes into the places
+// where DataGenerator is used.
+template<DataType DATA_TYPE,
+ typename FB_TYPE,
+ bool HAS_NULLS,
+ bool HAS_NULLS_IN_ARRAY,
+ size_t MAX_NUM_ELEMENTS_IN_ARRAY>
+class ArrayDataGenerator {
+ public:
+ typedef typename DataTypeTraits<DATA_TYPE>::cpp_type cpp_type;
+
+ static constexpr const DataType kDataType = DATA_TYPE;
+
+ static constexpr bool has_nulls() {
+ return HAS_NULLS;
+ }
+
+ static constexpr bool is_array() {
+ return true;
+ }
+
+ ArrayDataGenerator()
+ : block_entries_(0),
+ total_entries_(0),
+ total_elem_count_(0),
+ values_total_sum_(0),
+ str_values_total_size_(0) {
+ }
+
+ virtual ~ArrayDataGenerator() = default;
+
+ void Reset() {
+ block_entries_ = 0;
+ total_entries_ = 0;
+ total_elem_count_ = 0;
+ values_total_sum_ = 0;
+ str_values_total_size_ = 0;
+ }
+
+ void Build(size_t num_entries) {
+ Build(total_entries_, num_entries);
+ total_entries_ += num_entries;
+ }
+
+ // Build "num_entries" of array cells using (offset + i) as value.
+ // The data is available via 'values()', non-null for whole array cells
+ // via 'non_null_bitmap()', and validity of elements in each of the generated
+ // arrays via 'cell_non_null_bitmaps()'. Those are alive while an instance
+ // of this class is alive, and invalidated by next calls to either 'Build()'
+ // or 'Resize()'.
+ void Build(size_t offset, size_t num_entries) {
+ Resize(num_entries);
+
+ DCHECK_EQ(num_entries, array_cells_.size());
+ DCHECK_EQ(num_entries, flatbuffers_.size());
+ DCHECK_EQ(num_entries, cell_non_null_bitmaps_.size());
+ DCHECK_EQ(num_entries, cell_non_null_bitmaps_container_.size());
+ DCHECK_EQ(num_entries, cell_non_null_bitmaps_sizes_.size());
+ DCHECK_EQ(num_entries, values_vector_.size());
+ DCHECK_EQ(num_entries, values_vector_str_.size());
+ DCHECK_EQ(num_entries, block_entries_);
+
+ for (size_t i = 0; i < num_entries; ++i) {
+ const bool is_null_array =
+ HAS_NULLS ? TestValueShouldBeNull(offset + i) : false;
+ BitmapChange(non_null_bitmap_.get(), i, !is_null_array);
+
+ if (is_null_array) {
+ cell_non_null_bitmaps_sizes_[i] = 0;
+ cell_non_null_bitmaps_[i] = nullptr;
+ array_cells_[i].clear();
+ values_vector_[i].clear();
+ values_vector_str_[i].clear();
+ flatbuffers_[i].reset();
+ continue;
+ }
+
+ const size_t array_elem_num = random() % MAX_NUM_ELEMENTS_IN_ARRAY;
+ total_elem_count_ += array_elem_num;
+
+ // Set the number of bits for the array bitmap correspondingly.
+ cell_non_null_bitmaps_sizes_[i] = array_elem_num;
+ if (array_elem_num == 0) {
+ // Build an empty array for this cell.
+ values_vector_[i].clear();
+ values_vector_str_[i].clear();
+
+ array_cells_[i] = Slice(static_cast<uint8_t*>(nullptr), 0);
+ } else {
+ // Helper container for flatbuffer's serialization.
+ std::vector<bool> validity_src(array_elem_num, false);
+ values_vector_[i].resize(array_elem_num);
+ values_vector_str_[i].resize(array_elem_num);
+
+ cell_non_null_bitmaps_[i] = cell_non_null_bitmaps_container_[i].get();
+ uint8_t* array_non_null_bitmap = cell_non_null_bitmaps_[i];
+ DCHECK(array_non_null_bitmap);
+ for (size_t j = 0; j < array_elem_num; ++j) {
+ const bool array_elem_is_null =
+ HAS_NULLS_IN_ARRAY ? TestValueInArrayShouldBeNull(offset + i + j)
+ : false;
+ BitmapChange(array_non_null_bitmap, j, !array_elem_is_null);
+ validity_src[j] = !array_elem_is_null;
+ if constexpr (DATA_TYPE == STRING) {
+ auto& str = values_vector_str_[i][j];
+ if (array_elem_is_null) {
+ str.clear();
+ values_vector_[i][j] = Slice();
+ } else {
+ str.reserve(20);
+ str = std::to_string(random());
+ values_vector_[i][j] = Slice(str.data(), str.size());
+ str_values_total_size_ += str.size();
+ }
+ } else if constexpr (DATA_TYPE == BINARY) {
+ auto& str = values_vector_str_[i][j];
+ if (array_elem_is_null) {
+ str.clear();
+ values_vector_[i][j] = Slice();
+ } else {
+ str.reserve(45);
+ const auto r = random();
+ str.push_back(r % 128);
+ str.push_back(42 + r % 32);
+ str.append(std::to_string(r));
+ str.append({ 0x00, 0x7f, 0x42 });
+ str.append(std::to_string(random()));
+ values_vector_[i][j] = Slice(str.data(), str.size());
+ str_values_total_size_ += str.size();
+ }
+ } else {
+ static_assert(!std::is_same<Slice, cpp_type>::value,
+ "cannot be a binary type");
+ if (array_elem_is_null) {
+ values_vector_[i][j] = 0;
+ } else {
+ values_vector_[i][j] = BuildTestValue(i, offset + i + j);
+
IncrementValuesTotalSum(static_cast<size_t>(values_vector_[i][j]));
+ }
+ }
+ }
+
+ size_t buf_size = 0;
+ const auto s = Serialize<DATA_TYPE, FB_TYPE>(
+ reinterpret_cast<uint8_t*>(values_vector_[i].data()),
+ array_elem_num,
+ validity_src,
+ &flatbuffers_[i],
+ &buf_size);
+ CHECK_OK(s);
+ array_cells_[i] = Slice(flatbuffers_[i].get(), buf_size);
+ }
+ }
+ }
+
+ virtual cpp_type BuildTestValue(size_t block_index, size_t value) = 0;
+
+ bool TestValueShouldBeNull(size_t n) {
+ if (!HAS_NULLS) {
+ return false;
+ }
+ return IsNullAlternating</*N=*/5>(n);
+ }
+
+ bool TestValueInArrayShouldBeNull(size_t n) {
+ if (!HAS_NULLS_IN_ARRAY) {
+ return false;
+ }
+ return IsNullAlternating</*N=*/6>(n);
+ }
+
+ // Resize() resets all the underlying data if 'num_entries' is greater
+ // than 'block_entries_'.
+ virtual void Resize(size_t num_entries) {
+ cell_non_null_bitmaps_container_.resize(num_entries);
+
+ cell_non_null_bitmaps_.resize(num_entries);
+ cell_non_null_bitmaps_sizes_.resize(num_entries);
+
+ array_cells_.resize(num_entries);
+ flatbuffers_.resize(num_entries);
+ values_vector_.resize(num_entries);
+ values_vector_str_.resize(num_entries);
+
+ if (block_entries_ >= num_entries) {
+ block_entries_ = num_entries;
+ return;
+ }
+
+ for (auto i = 0; i < num_entries; ++i) {
+ // Allocate up to the maximum possible number of elements in each array
+ // cell. The cells are populated with elements (or nullified) by Build().
+ cell_non_null_bitmaps_sizes_[i] = MAX_NUM_ELEMENTS_IN_ARRAY;
+ cell_non_null_bitmaps_container_[i].reset(new
uint8_t[BitmapSize(cell_non_null_bitmaps_sizes_[i])]);
+ cell_non_null_bitmaps_[i] = cell_non_null_bitmaps_container_[i].get();
+
+ array_cells_[i] = Slice();
+ flatbuffers_[i].reset();
+ values_vector_[i].clear();
+ values_vector_str_[i].clear();
+ }
+ non_null_bitmap_.reset(new uint8_t[BitmapSize(num_entries)]);
+ block_entries_ = num_entries;
+ // The 'total_entries_' isn't touched -- it corresponds to the offset
+ // in the generated CFile, so it's possible to continue writing into
+ // the same file even after Resize().
+ }
+
+ size_t block_entries() const {
+ return block_entries_;
+ }
+ size_t total_entries() const {
+ return total_entries_;
+ }
+
+ const uint8_t* non_null_bitmap() const {
+ return non_null_bitmap_.get();
+ }
+
+ const void* cells() const {
+ return reinterpret_cast<const void*>(array_cells_.data());
+ }
+
+ size_t total_elem_count() const {
+ return total_elem_count_;
+ }
+
+ size_t values_total_sum() const {
+ return values_total_sum_;
+ }
+
+ size_t str_values_total_size() const {
+ return str_values_total_size_;
+ }
+
+ private:
+ ATTRIBUTE_NO_SANITIZE_INTEGER
+ void IncrementValuesTotalSum(size_t increment) {
+ // The overflow of 'values_total_sum_' is intended.
+ values_total_sum_ += static_cast<size_t>(increment);
+ }
+
+ // The storage of non-null bitmaps for each of the generated array cells:
+ // these are bitmaps to reflect on the nullability of individual array
+ // elements in each array cell.
+ // This container performs auto-cleanup upon destruction of the generator.
+ std::vector<std::unique_ptr<uint8_t[]>> cell_non_null_bitmaps_container_;
+
+ // Slices that point to the memory stored in 'flatbuffers_', with some
offset.
+ std::vector<Slice> array_cells_;
+
+ // Flatbuffers-serialized representations of arrays.
+ std::vector<std::unique_ptr<uint8_t[]>> flatbuffers_;
+
+ // This is a complimentary container to store std::string instances when the
+ // 'values_vector_' container below stores Slice instances.
+ std::vector<std::vector<std::string>> values_vector_str_;
+
+ // The storage of test values. This constainer stores Slice instances for
+ // STRING/BINARY Kudu types, and the std::string instances backing these
+ // are stored in the 'values_vector_str_' complimentary container.
+ std::vector<std::vector<cpp_type>> values_vector_;
+
+ // Bitmap to track whether it's an array (maybe an empty one) or just null.
+ std::unique_ptr<uint8_t[]> non_null_bitmap_;
+
+ // Bitmaps to track null elements in the arrays themselves: this container
+ // stores raw pointers to memory backed by corresponding elemenents
+ // of the 'cell_non_null_bitmaps_container_' field.
+ std::vector<uint8_t*> cell_non_null_bitmaps_;
+
+ // Number of bits in the corresponding bitmaps from 'cell_non_null_bitmaps_';
+ // null and empty arrays has zero bits in their bitmaps.
+ std::vector<size_t> cell_non_null_bitmaps_sizes_;
+
+ size_t block_entries_;
+ size_t total_entries_;
+
+ // Total number of array elements (including null/non-valid elements in
array cells).
+ size_t total_elem_count_;
+
+ // Sum of all integer values generated during Build(): useful for
verification
+ // of the result when reading the stored values from CFile.
+ size_t values_total_sum_;
+ // Total length/size of all binary/string value generated during Build():
+ // useful for verification of the result when reading the data from CFile.
+ size_t str_values_total_size_;
+};
+
+// This template is for numeric/non-binary data types for array elements,
+// where the return type of random() can be statically cast into the type
+// of the element.
+template<DataType KUDU_DATA_TYPE,
+ typename PB_TYPE,
+ bool HAS_NULLS,
+ bool HAS_NULLS_IN_ARRAY,
+ size_t MAX_NUM_ELEMENTS_IN_ARRAY>
+class IntArrayRandomDataGenerator :
+ public ArrayDataGenerator<KUDU_DATA_TYPE,
+ PB_TYPE,
+ HAS_NULLS,
+ HAS_NULLS_IN_ARRAY,
+ MAX_NUM_ELEMENTS_IN_ARRAY> {
+ public:
+ typedef typename DataTypeTraits<KUDU_DATA_TYPE>::cpp_type ElemType;
+ ElemType BuildTestValue(size_t /*block_index*/, size_t /*value*/) override {
+ return static_cast<ElemType>(random());
+ }
+};
+
+template<DataType KUDU_DATA_TYPE,
+ typename PB_TYPE,
+ bool HAS_NULLS,
+ bool HAS_NULLS_IN_ARRAY,
+ size_t MAX_NUM_ELEMENTS_IN_ARRAY>
+class StrArrayRandomDataGenerator :
+ public ArrayDataGenerator<KUDU_DATA_TYPE,
+ PB_TYPE,
+ HAS_NULLS,
+ HAS_NULLS_IN_ARRAY,
+ MAX_NUM_ELEMENTS_IN_ARRAY> {
+ public:
+ typedef typename DataTypeTraits<KUDU_DATA_TYPE>::cpp_type ElemType;
+
+ // This is a fake implementation just to satisfy the signature of the
+ // overloaded function.
+ Slice BuildTestValue(size_t /*block_index*/, size_t /*value*/) override {
+ static const Slice kEmpty;
+ return kEmpty;
+ }
+};
+
+template<bool HAS_NULLS,
+ bool HAS_NULLS_IN_ARRAY,
+ size_t MAX_NUM_ELEMENTS_IN_ARRAY>
+using Int8ArrayRandomDataGenerator =
+ IntArrayRandomDataGenerator<INT8,
+ serdes::Int8Array,
+ HAS_NULLS,
+ HAS_NULLS_IN_ARRAY,
+ MAX_NUM_ELEMENTS_IN_ARRAY>;
+
+template<bool HAS_NULLS,
+ bool HAS_NULLS_IN_ARRAY,
+ size_t MAX_NUM_ELEMENTS_IN_ARRAY>
+using Int32ArrayRandomDataGenerator =
+ IntArrayRandomDataGenerator<INT32,
+ serdes::Int32Array,
+ HAS_NULLS,
+ HAS_NULLS_IN_ARRAY,
+ MAX_NUM_ELEMENTS_IN_ARRAY>;
+
+template<bool HAS_NULLS,
+ bool HAS_NULLS_IN_ARRAY,
+ size_t MAX_NUM_ELEMENTS_IN_ARRAY>
+using StringArrayRandomDataGenerator =
+ StrArrayRandomDataGenerator<STRING,
+ serdes::StringArray,
+ HAS_NULLS,
+ HAS_NULLS_IN_ARRAY,
+ MAX_NUM_ELEMENTS_IN_ARRAY>;
+
+template<bool HAS_NULLS,
+ bool HAS_NULLS_IN_ARRAY,
+ size_t MAX_NUM_ELEMENTS_IN_ARRAY>
+using BinaryArrayRandomDataGenerator =
+ StrArrayRandomDataGenerator<BINARY,
+ serdes::BinaryArray,
+ HAS_NULLS,
+ HAS_NULLS_IN_ARRAY,
+ MAX_NUM_ELEMENTS_IN_ARRAY>;
+
+
class CFileTestBase : public KuduTest {
public:
void SetUp() override {
@@ -381,8 +763,12 @@ class CFileTestBase : public KuduTest {
opts.storage_attributes.encoding = encoding;
opts.storage_attributes.compression = compression;
- CFileWriter w(opts, GetTypeInfo(DataGeneratorType::kDataType),
- DataGeneratorType::has_nulls(), std::move(sink));
+ CFileWriter w(opts,
+ DataGeneratorType::is_array()
+ ? GetArrayTypeInfo(DataGeneratorType::kDataType)
+ : GetTypeInfo(DataGeneratorType::kDataType),
+ DataGeneratorType::has_nulls(),
+ std::move(sink));
ASSERT_OK(w.Start());
@@ -397,12 +783,19 @@ class CFileTestBase : public KuduTest {
data_generator->Build(towrite);
DCHECK_EQ(towrite, data_generator->block_entries());
- if (DataGeneratorType::has_nulls()) {
-
ASSERT_OK_FAST(w.AppendNullableEntries(data_generator->non_null_bitmap(),
- data_generator->values(),
- towrite));
+ if constexpr (DataGeneratorType::is_array()) {
+ ASSERT_OK_FAST(w.AppendNullableArrayEntries(
+ data_generator->non_null_bitmap(),
+ data_generator->cells(),
+ towrite));
} else {
- ASSERT_OK_FAST(w.AppendEntries(data_generator->values(), towrite));
+ if constexpr (DataGeneratorType::has_nulls()) {
+
ASSERT_OK_FAST(w.AppendNullableEntries(data_generator->non_null_bitmap(),
+ data_generator->values(),
+ towrite));
+ } else {
+ ASSERT_OK_FAST(w.AppendEntries(data_generator->values(), towrite));
+ }
}
i += towrite;
}
@@ -411,7 +804,6 @@ class CFileTestBase : public KuduTest {
}
std::unique_ptr<FsManager> fs_manager_;
-
};
// Fast unrolled summing of a vector.
@@ -437,131 +829,150 @@ SumType FastSum(const Indexable& data, size_t n) {
return sums[0] + sums[1] + sums[2] + sums[3];
}
+template<DataType Type>
+Status TimeReadFileForArrayDataType(
+ CFileIterator* iter,
+ size_t* out_row_count,
+ size_t* out_total_array_element_count = nullptr,
+ size_t* out_total_elem_str_size = nullptr) {
+ size_t row_count = 0;
+ size_t null_arrays = 0;
+ size_t empty_arrays = 0;
+ size_t null_array_elements = 0;
+ size_t total_array_elements = 0;
+ size_t total_elem_str_size = 0;
+
+ ScopedColumnBlock<Type, /*IS_ARRAY=*/true> cb(100000); // up to 100K rows
+ SelectionVector sel(cb.nrows());
+ ColumnMaterializationContext ctx(0, nullptr, &cb, &sel);
+ ctx.SetDecoderEvalNotSupported();
+ while (iter->HasNext()) {
+ size_t n = cb.nrows();
+ RETURN_NOT_OK(iter->CopyNextValues(&n, &ctx));
+ row_count += n;
+ for (size_t ri = 0; ri < n; ++ri) {
+ const Slice* cell_ptr = reinterpret_cast<const
Slice*>(cb.cell(ri).ptr());
+ if (cell_ptr == nullptr || cell_ptr->empty()) {
+ ++null_arrays;
+ continue;
+ }
+ ArrayCellMetadataView view(cell_ptr->data(), cell_ptr->size());
+ RETURN_NOT_OK(view.Init());
+ if (view.empty()) {
+ ++empty_arrays;
+ continue;
+ }
+
+ const auto elem_num = view.elem_num();
+ total_array_elements += elem_num;
+ BitmapIterator bit(view.not_null_bitmap(), elem_num);
+ bool is_not_null = false;
+ while (size_t elem_count = bit.Next(&is_not_null)) {
+ if (!is_not_null) {
+ null_array_elements += elem_count;
+ }
+ }
+
+ // For binary/string types, calculate the total length of strings/buffers
+ // of all the array elements.
+ if constexpr (Type == DataType::BINARY ||
+ Type == DataType::STRING ||
+ Type == DataType::VARCHAR) {
+ const Slice* data = reinterpret_cast<const Slice*>(view.data_as(Type));
+ DCHECK(data);
+ const auto* bm = view.not_null_bitmap();
+ DCHECK(bm);
+ for (size_t i = 0; i < elem_num; ++i, ++data) {
+ if (BitmapTest(bm, i)) {
+ total_elem_str_size += data->size();
+ }
+ }
+ }
+ }
+ cb.memory()->Reset();
+ }
+ if (out_row_count) {
+ *out_row_count = row_count;
+ }
+ if (out_total_array_element_count) {
+ *out_total_array_element_count = total_array_elements;
+ }
+ if (out_total_elem_str_size) {
+ *out_total_elem_str_size = total_elem_str_size;
+ }
+ LOG(INFO)<< "Row count : " << row_count;
+ LOG(INFO)<< "NULL rows/arrays : " << null_arrays;
+ LOG(INFO)<< "Empty arrays : " << empty_arrays;
+ LOG(INFO)<< "NULL array elements : " << null_array_elements;
+ LOG(INFO)<< "Total array elements: " << total_array_elements;
+ return Status::OK();
+}
+
template<DataType Type, typename SumType>
-void TimeReadFileForDataType(CFileIterator* iter, int* count) {
+Status TimeReadFileForDataType(CFileIterator* iter, size_t* count) {
ScopedColumnBlock<Type> cb(8192);
SelectionVector sel(cb.nrows());
ColumnMaterializationContext ctx(0, nullptr, &cb, &sel);
ctx.SetDecoderEvalNotSupported();
SumType sum = 0;
+ size_t row_count = 0;
while (iter->HasNext()) {
size_t n = cb.nrows();
- ASSERT_OK_FAST(iter->CopyNextValues(&n, &ctx));
+ RETURN_NOT_OK(iter->CopyNextValues(&n, &ctx));
sum += FastSum<ScopedColumnBlock<Type>, SumType>(cb, n);
- *count += n;
+ row_count += n;
cb.memory()->Reset();
}
+ if (count) {
+ *count = row_count;
+ }
LOG(INFO)<< "Sum: " << sum;
- LOG(INFO)<< "Count: " << *count;
+ LOG(INFO)<< "Count: " << row_count;
+ return Status::OK();
}
template<DataType Type>
-void ReadBinaryFile(CFileIterator* iter, int* count) {
+Status ReadBinaryFile(CFileIterator* iter, size_t* count) {
ScopedColumnBlock<Type> cb(100);
SelectionVector sel(cb.nrows());
ColumnMaterializationContext ctx(0, nullptr, &cb, &sel);
ctx.SetDecoderEvalNotSupported();
uint64_t sum_lens = 0;
+ size_t row_count = 0;
while (iter->HasNext()) {
size_t n = cb.nrows();
- ASSERT_OK_FAST(iter->CopyNextValues(&n, &ctx));
+ RETURN_NOT_OK(iter->CopyNextValues(&n, &ctx));
for (size_t i = 0; i < n; i++) {
if (!cb.is_null(i)) {
sum_lens += cb[i].size();
}
}
- *count += n;
+ row_count += n;
cb.memory()->Reset();
}
+ if (count) {
+ *count = row_count;
+ }
LOG(INFO) << "Sum of value lengths: " << sum_lens;
- LOG(INFO) << "Count: " << *count;
+ LOG(INFO) << "Count: " << row_count;
+ return Status::OK();
}
-void TimeReadFile(FsManager* fs_manager, const BlockId& block_id, size_t*
count_ret) {
- Status s;
-
- std::unique_ptr<fs::ReadableBlock> source;
- ASSERT_OK(fs_manager->OpenBlock(block_id, &source));
- std::unique_ptr<CFileReader> reader;
- ASSERT_OK(CFileReader::Open(std::move(source), ReaderOptions(), &reader));
-
- std::unique_ptr<CFileIterator> iter;
- ASSERT_OK(reader->NewIterator(&iter, CFileReader::CACHE_BLOCK, nullptr));
- ASSERT_OK(iter->SeekToOrdinal(0));
-
- Arena arena(8192);
- int count = 0;
- switch (reader->type_info()->physical_type()) {
- case UINT8:
- {
- TimeReadFileForDataType<UINT8, uint64_t>(iter.get(), &count);
- break;
- }
- case INT8:
- {
- TimeReadFileForDataType<INT8, int64_t>(iter.get(), &count);
- break;
- }
- case UINT16:
- {
- TimeReadFileForDataType<UINT16, uint64_t>(iter.get(), &count);
- break;
- }
- case INT16:
- {
- TimeReadFileForDataType<INT16, int64_t>(iter.get(), &count);
- break;
- }
- case UINT32:
- {
- TimeReadFileForDataType<UINT32, uint64_t>(iter.get(), &count);
- break;
- }
- case INT32:
- {
- TimeReadFileForDataType<INT32, int64_t>(iter.get(), &count);
- break;
- }
- case UINT64:
- {
- TimeReadFileForDataType<UINT64, uint64_t>(iter.get(), &count);
- break;
- }
- case INT64:
- {
- TimeReadFileForDataType<INT64, int64_t>(iter.get(), &count);
- break;
- }
- case INT128:
- {
- TimeReadFileForDataType<INT128, int128_t>(iter.get(), &count);
- break;
- }
- case FLOAT:
- {
- TimeReadFileForDataType<FLOAT, float>(iter.get(), &count);
- break;
- }
- case DOUBLE:
- {
- TimeReadFileForDataType<DOUBLE, double>(iter.get(), &count);
- break;
- }
- case STRING:
- {
- ReadBinaryFile<STRING>(iter.get(), &count);
- break;
- }
- case BINARY:
- {
- ReadBinaryFile<BINARY>(iter.get(), &count);
- break;
- }
- default:
- FAIL() << "Unknown type: " << reader->type_info()->physical_type();
- }
- *count_ret = count;
-}
+Status TimeReadFileForScalars(DataType data_type,
+ CFileIterator* iter,
+ size_t* count_ret);
+
+Status TimeReadFileForArrays(DataType data_type,
+ CFileIterator* iter,
+ size_t* count_ret,
+ size_t* total_array_element_count_ret = nullptr,
+ size_t* total_elem_str_size_ret = nullptr);
+
+Status TimeReadFile(FsManager* fs_manager,
+ const BlockId& block_id,
+ size_t* count_ret,
+ size_t* total_array_element_count_ret = nullptr,
+ size_t* total_elem_str_size_ret = nullptr);
} // namespace cfile
} // namespace kudu
diff --git a/src/kudu/cfile/cfile-test.cc b/src/kudu/cfile/cfile-test.cc
index c204d5e4e..b126376e1 100644
--- a/src/kudu/cfile/cfile-test.cc
+++ b/src/kudu/cfile/cfile-test.cc
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+#include "kudu/cfile/cfile-test-base.h"
+
#include <algorithm>
#include <cstdint>
#include <cstdlib>
@@ -25,6 +27,7 @@
#include <memory>
#include <sstream>
#include <string>
+#include <tuple>
#include <type_traits>
#include <utility>
#include <vector>
@@ -36,7 +39,6 @@
#include "kudu/cfile/block_cache.h"
#include "kudu/cfile/block_handle.h"
#include "kudu/cfile/block_pointer.h"
-#include "kudu/cfile/cfile-test-base.h"
#include "kudu/cfile/cfile.pb.h"
#include "kudu/cfile/cfile_reader.h"
#include "kudu/cfile/cfile_util.h"
@@ -78,10 +80,7 @@
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
-namespace kudu {
-class Arena;
-} // namespace kudu
-
+DECLARE_bool(cfile_support_arrays);
DECLARE_bool(cfile_write_checksums);
DECLARE_bool(cfile_verify_checksums);
DECLARE_string(block_cache_eviction_policy);
@@ -92,11 +91,8 @@ DECLARE_string(nvm_cache_path);
DECLARE_bool(nvm_cache_simulate_allocation_failure);
METRIC_DECLARE_counter(block_cache_hits_caching);
-
METRIC_DECLARE_entity(server);
-
-using kudu::fs::BlockManager;
using kudu::fs::CountingReadableBlock;
using kudu::fs::CreateCorruptBlock;
using kudu::fs::ReadableBlock;
@@ -108,8 +104,128 @@ using std::vector;
using strings::Substitute;
namespace kudu {
+
+class Arena;
+
namespace cfile {
+Status TimeReadFileForScalars(DataType data_type,
+ CFileIterator* iter,
+ size_t* count_ret) {
+ switch (data_type) {
+ case UINT8:
+ return TimeReadFileForDataType<UINT8, uint64_t>(iter, count_ret);
+ case INT8:
+ return TimeReadFileForDataType<INT8, int64_t>(iter, count_ret);
+ case UINT16:
+ return TimeReadFileForDataType<UINT16, uint64_t>(iter, count_ret);
+ case INT16:
+ return TimeReadFileForDataType<INT16, int64_t>(iter, count_ret);
+ case UINT32:
+ return TimeReadFileForDataType<UINT32, uint64_t>(iter, count_ret);
+ case INT32:
+ return TimeReadFileForDataType<INT32, int64_t>(iter, count_ret);
+ case UINT64:
+ return TimeReadFileForDataType<UINT64, uint64_t>(iter, count_ret);
+ case INT64:
+ return TimeReadFileForDataType<INT64, int64_t>(iter, count_ret);
+ case INT128:
+ return TimeReadFileForDataType<INT128, int128_t>(iter, count_ret);
+ case FLOAT:
+ return TimeReadFileForDataType<FLOAT, float>(iter, count_ret);
+ case DOUBLE:
+ return TimeReadFileForDataType<DOUBLE, double>(iter, count_ret);
+ case STRING:
+ return ReadBinaryFile<STRING>(iter, count_ret);
+ case BINARY:
+ return ReadBinaryFile<BINARY>(iter, count_ret);
+ default:
+ return Status::NotSupported(
+ Substitute("not implemented for type: $0",
DataType_Name(data_type)));
+ }
+}
+
+Status TimeReadFileForArrays(DataType data_type,
+ CFileIterator* iter,
+ size_t* count_ret,
+ size_t* total_array_element_count_ret,
+ size_t* total_elem_str_size_ret) {
+ switch (data_type) {
+ case INT8:
+ return TimeReadFileForArrayDataType<INT8>(
+ iter, count_ret, total_array_element_count_ret);
+ case UINT8:
+ return TimeReadFileForArrayDataType<UINT8>(
+ iter, count_ret, total_array_element_count_ret);
+ case INT16:
+ return TimeReadFileForArrayDataType<INT16>(
+ iter, count_ret, total_array_element_count_ret);
+ case UINT16:
+ return TimeReadFileForArrayDataType<UINT16>(
+ iter, count_ret, total_array_element_count_ret);
+ case INT32:
+ return TimeReadFileForArrayDataType<INT32>(
+ iter, count_ret, total_array_element_count_ret);
+ case UINT32:
+ return TimeReadFileForArrayDataType<UINT32>(
+ iter, count_ret, total_array_element_count_ret);
+ case INT64:
+ return TimeReadFileForArrayDataType<INT64>(
+ iter, count_ret, total_array_element_count_ret);
+ case UINT64:
+ return TimeReadFileForArrayDataType<UINT64>(
+ iter, count_ret, total_array_element_count_ret);
+ case FLOAT:
+ return TimeReadFileForArrayDataType<FLOAT>(
+ iter, count_ret, total_array_element_count_ret);
+ case DOUBLE:
+ return TimeReadFileForArrayDataType<DOUBLE>(
+ iter, count_ret, total_array_element_count_ret);
+ case STRING:
+ case VARCHAR:
+ return TimeReadFileForArrayDataType<STRING>(
+ iter, count_ret, total_array_element_count_ret,
total_elem_str_size_ret);
+ case BINARY:
+ return TimeReadFileForArrayDataType<BINARY>(
+ iter, count_ret, total_array_element_count_ret,
total_elem_str_size_ret);
+ default:
+ return Status::NotSupported(
+ Substitute("not implemented for arrays of type: $0",
DataType_Name(data_type)));
+ }
+}
+
+Status TimeReadFile(FsManager* fs_manager,
+ const BlockId& block_id,
+ size_t* count_ret,
+ size_t* total_array_element_count_ret,
+ size_t* total_elem_str_size_ret) {
+ std::unique_ptr<fs::ReadableBlock> source;
+ RETURN_NOT_OK(fs_manager->OpenBlock(block_id, &source));
+ std::unique_ptr<CFileReader> reader;
+ RETURN_NOT_OK(CFileReader::Open(std::move(source), ReaderOptions(),
&reader));
+
+ std::unique_ptr<CFileIterator> iter;
+ RETURN_NOT_OK(reader->NewIterator(&iter, CFileReader::CACHE_BLOCK, nullptr));
+ RETURN_NOT_OK(iter->SeekToOrdinal(0));
+
+ if (!reader->is_array()) {
+ return TimeReadFileForScalars(
+ reader->type_info()->physical_type(), iter.get(), count_ret);
+ }
+
+ DCHECK(reader->is_array());
+
+ // Get the type of array elements.
+ const auto* elem_type_info = GetArrayElementTypeInfo(*reader->type_info());
+ // Only 1D arrays of scalar types are supported yet.
+ DCHECK_NE(NESTED, elem_type_info->type());
+ return TimeReadFileForArrays(elem_type_info->type(),
+ iter.get(),
+ count_ret,
+ total_array_element_count_ret,
+ total_elem_str_size_ret);
+}
+
class TestCFile : public CFileTestBase {
protected:
template <class DataGeneratorType>
@@ -196,7 +312,7 @@ class TestCFile : public CFileTestBase {
out[i] = 0;
}
- TimeReadFile(fs_manager_.get(), block_id, &n);
+ ASSERT_OK(TimeReadFile(fs_manager_.get(), block_id, &n));
ASSERT_EQ(10000, n);
}
@@ -255,7 +371,7 @@ class TestCFile : public CFileTestBase {
WriteTestFile(generator, encoding, compression, 10000, SMALL_BLOCKSIZE,
&block_id);
size_t n;
- NO_FATALS(TimeReadFile(fs_manager_.get(), block_id, &n));
+ ASSERT_OK(TimeReadFile(fs_manager_.get(), block_id, &n));
ASSERT_EQ(n, 10000);
generator->Reset();
@@ -346,7 +462,7 @@ class TestCFile : public CFileTestBase {
LOG_TIMING(INFO, "reading 100M strings") {
LOG(INFO) << "Starting readfile";
size_t n;
- TimeReadFile(fs_manager_.get(), block_id, &n);
+ ASSERT_OK(TimeReadFile(fs_manager_.get(), block_id, &n));
ASSERT_EQ(100000000, n);
LOG(INFO) << "End readfile";
}
@@ -366,7 +482,7 @@ class TestCFile : public CFileTestBase {
LOG_TIMING(INFO, Substitute("reading $0 strings with dupes", num_rows)) {
LOG(INFO) << "Starting readfile";
size_t n;
- TimeReadFile(fs_manager_.get(), block_id, &n);
+ ASSERT_OK(TimeReadFile(fs_manager_.get(), block_id, &n));
ASSERT_EQ(num_rows, n);
LOG(INFO) << "End readfile";
}
@@ -406,7 +522,7 @@ class TestCFile : public CFileTestBase {
// once for each cache memory type (DRAM, NVM).
class TestCFileBothCacheMemoryTypes :
public TestCFile,
- public ::testing::WithParamInterface<std::pair<Cache::MemoryType,
Cache::EvictionPolicy>> {
+ public ::testing::WithParamInterface<std::pair<Cache::MemoryType,
Cache::EvictionPolicy>> {
public:
void SetUp() override {
// The NVM cache can run using any directory as its path -- it doesn't have
@@ -486,7 +602,7 @@ TEST_P(TestCFileBothCacheMemoryTypes,
TestWrite100MFileInts) {
LOG_TIMING(INFO, "reading 100M ints") {
LOG(INFO) << "Starting readfile";
size_t n;
- TimeReadFile(fs_manager_.get(), block_id, &n);
+ ASSERT_OK(TimeReadFile(fs_manager_.get(), block_id, &n));
ASSERT_EQ(100000000, n);
LOG(INFO) << "End readfile";
}
@@ -505,7 +621,7 @@ TEST_P(TestCFileBothCacheMemoryTypes,
TestWrite100MFileNullableInts) {
LOG_TIMING(INFO, "reading 100M nullable ints") {
LOG(INFO) << "Starting readfile";
size_t n;
- TimeReadFile(fs_manager_.get(), block_id, &n);
+ ASSERT_OK(TimeReadFile(fs_manager_.get(), block_id, &n));
ASSERT_EQ(100000000, n);
LOG(INFO) << "End readfile";
}
@@ -547,7 +663,7 @@ TEST_P(TestCFileBothCacheMemoryTypes,
TestWrite1MUniqueFileStringsDictEncoding)
LOG_TIMING(INFO, "reading 1M strings") {
LOG(INFO) << "Starting readfile";
size_t n;
- TimeReadFile(fs_manager_.get(), block_id, &n);
+ ASSERT_OK(TimeReadFile(fs_manager_.get(), block_id, &n));
ASSERT_EQ(1000000, n);
LOG(INFO) << "End readfile";
}
@@ -626,7 +742,8 @@ TYPED_TEST(BitShuffleTest,
TestFixedSizeReadWriteBitShuffle) {
this->TestBitShuffle();
}
-void EncodeStringKey(const Schema& schema,
+
+static void EncodeStringKey(const Schema& schema,
const Slice& key,
Arena* arena,
EncodedKey** encoded_key) {
@@ -1144,10 +1261,166 @@ TEST_P(TestCFileDifferentCodecs, TestUncompressible) {
RandomInt32DataGenerator int_gen;
WriteTestFile(&int_gen, PLAIN_ENCODING, codec, nrows,
NO_FLAGS, &block_id);
- TimeReadFile(fs_manager_.get(), block_id, &rdrows);
+ ASSERT_OK(TimeReadFile(fs_manager_.get(), block_id, &rdrows));
ASSERT_EQ(nrows, rdrows);
}
}
+class TestCFileArrayValues : public TestCFile,
+ public testing::WithParamInterface<
+ std::tuple<EncodingType, CompressionType>> {
+ void SetUp() override {
+ FLAGS_cfile_support_arrays = true;
+ TestCFile::SetUp();
+ }
+
+ protected:
+ template<typename GenType>
+ void Run(size_t nrows) {
+ const auto codec = std::get<0>(GetParam());
+ const auto compressor = std::get<1>(GetParam());
+ srand(time(nullptr));
+ BlockId block_id;
+ size_t rdrows;
+
+ GenType array_gen;
+ NO_FATALS(WriteTestFile(&array_gen, codec, compressor, nrows, NO_FLAGS,
&block_id));
+ size_t total_elem_count = 0;
+ size_t total_elem_str_size = 0;
+ ASSERT_OK(TimeReadFile(
+ fs_manager_.get(), block_id, &rdrows, &total_elem_count,
&total_elem_str_size));
+ ASSERT_EQ(nrows, rdrows);
+ ASSERT_EQ(array_gen.total_elem_count(), total_elem_count);
+ ASSERT_EQ(array_gen.str_values_total_size(), total_elem_str_size);
+ // TODO(aserbin): add assertion on array_gen.values_total_sum()
+ }
+};
+
+// Various scenarios for writing arrays of integer type elements. For TSAN
build
+// configuration, the amount of generated data is reduced to allow for faster
+// test runtime. Since there isn't any concurrency involved in these scenarios,
+// the test coverage is not reduced because of that.
+class TestCFileIntegerArrayValues : public TestCFileArrayValues {
+};
+INSTANTIATE_TEST_SUITE_P(ArrayBasics, TestCFileIntegerArrayValues,
+ ::testing::Combine(
+ ::testing::Values(PLAIN_ENCODING, RLE,
BIT_SHUFFLE),
+ ::testing::Values(NO_COMPRESSION, SNAPPY, LZ4,
ZLIB)));
+
+// Write/read a file with random int8 values in array cells, where there might
+// be empty array cells and empty array elements themselves.
+TEST_P(TestCFileIntegerArrayValues, ReadWriteInt8WithNulls) {
+ Run<Int8ArrayRandomDataGenerator</* HAS_NULLS */true,
+ /* HAS_NULLS_IN_ARRAY */true,
+#if defined(THREAD_SANITIZER)
+ /* MAX_NUM_ELEMENTS_IN_ARRAY
*/100>>(1000/*nrows*/);
+#else
+ /* MAX_NUM_ELEMENTS_IN_ARRAY
*/10000>>(10000/*nrows*/);
+#endif // #ifdef THREAD_SANITIZER
+}
+
+// Similar to ReadWriteInt8 above, but each array can have at most two nullable
+// elements.
+TEST_P(TestCFileIntegerArrayValues, ReadWriteInt8WithNullShortArrays) {
+ Run<Int8ArrayRandomDataGenerator</* HAS_NULLS */true,
+ /* HAS_NULLS_IN_ARRAY */true,
+#if defined(THREAD_SANITIZER)
+ /* MAX_NUM_ELEMENTS_IN_ARRAY
*/2>>(1000/*nrows*/);
+#else
+ /* MAX_NUM_ELEMENTS_IN_ARRAY
*/2>>(100000/*nrows*/);
+#endif // #ifdef THREAD_SANITIZER
+}
+
+// Write/read a file with random int32 values in array cells with no null
cells,
+// nor null elements in array cells themselves.
+TEST_P(TestCFileIntegerArrayValues, ReadWriteInt32) {
+ Run<Int32ArrayRandomDataGenerator</* HAS_NULLS */false,
+ /* HAS_NULLS_IN_ARRAY */false,
+#if defined(THREAD_SANITIZER)
+ /* MAX_NUM_ELEMENTS_IN_ARRAY
*/10>>(100/*nrows*/);
+#else
+ /* MAX_NUM_ELEMENTS_IN_ARRAY
*/100>>(10000/*nrows*/);
+#endif // #ifdef THREAD_SANITIZER
+}
+
+// Write/read a file with random int32 values in array cells, where both
+// the elements of arrays and array cells themselves might be null,
+// and the number of elements in an array might be quite high.
+TEST_P(TestCFileIntegerArrayValues, ReadWriteInt32WithNulls) {
+ Run<Int32ArrayRandomDataGenerator</* HAS_NULLS */true,
+ /* HAS_NULLS_IN_ARRAY */true,
+#if defined(THREAD_SANITIZER)
+ /* MAX_NUM_ELEMENTS_IN_ARRAY
*/1024>>(50/*nrows*/);
+#else
+ /* MAX_NUM_ELEMENTS_IN_ARRAY
*/65535>>(50/*nrows*/);
+#endif // #ifdef THREAD_SANITIZER
+}
+
+// Same as TestCFileArrayValues.ReadWriteWithNulls scenario above, but
+// the maximum number of elements in an array is small, but there are big
number
+// of array cells.
+TEST_P(TestCFileIntegerArrayValues, ReadWriteInt32WithNullsShortArrays) {
+ Run<Int32ArrayRandomDataGenerator</* HAS_NULLS */true,
+ /* HAS_NULLS_IN_ARRAY */true,
+#if defined(THREAD_SANITIZER)
+ /* MAX_NUM_ELEMENTS_IN_ARRAY
*/5>>(1000/*nrows*/);
+#else
+ /* MAX_NUM_ELEMENTS_IN_ARRAY
*/5>>(100000/*nrows*/);
+#endif // #ifdef THREAD_SANITIZER
+}
+
+class TestCFileBinaryArrayValues : public TestCFileArrayValues {
+};
+INSTANTIATE_TEST_SUITE_P(ArrayBasics, TestCFileBinaryArrayValues,
+ ::testing::Combine(
+ // TODO(aserbin): address DICT_ENCODING issues
+ //::testing::Values(PLAIN_ENCODING,
PREFIX_ENCODING, DICT_ENCODING),
+ ::testing::Values(PLAIN_ENCODING,
PREFIX_ENCODING),
+ ::testing::Values(NO_COMPRESSION, SNAPPY, LZ4,
ZLIB)));
+
+// Write/read a file with random string values in array cells.
+TEST_P(TestCFileBinaryArrayValues, ReadWriteStrings) {
+ Run<StringArrayRandomDataGenerator</* HAS_NULLS */false,
+ /* HAS_NULLS_IN_ARRAY */false,
+ /* MAX_NUM_ELEMENTS_IN_ARRAY
*/10>>(1000/*nrows*/);
+}
+
+// Write/read a file with random string values in array cells, where both
+// the elements of array cells and array cells themselves might be null,
+// and the number of elements in an array might be quite high.
+TEST_P(TestCFileBinaryArrayValues, ReadWriteStringsWithNulls) {
+ Run<StringArrayRandomDataGenerator</* HAS_NULLS */false,
+ /* HAS_NULLS_IN_ARRAY */true,
+#if defined(THREAD_SANITIZER)
+ /* MAX_NUM_ELEMENTS_IN_ARRAY
*/500>>(100/*nrows*/);
+#else
+ /* MAX_NUM_ELEMENTS_IN_ARRAY
*/5000>>(1000/*nrows*/);
+#endif // #if defined(THREAD_SANITIZER)
+}
+
+// Same as ReadWriteStringsWithNulls above, but this one is for arbitrary
binary
+// data array elements, not just strings.
+TEST_P(TestCFileBinaryArrayValues, ReadWriteBinaryWithNulls) {
+ Run<BinaryArrayRandomDataGenerator</* HAS_NULLS */false,
+ /* HAS_NULLS_IN_ARRAY */true,
+#if defined(THREAD_SANITIZER)
+ /* MAX_NUM_ELEMENTS_IN_ARRAY
*/500>>(100/*nrows*/);
+#else
+ /* MAX_NUM_ELEMENTS_IN_ARRAY
*/5000>>(1000/*nrows*/);
+#endif // #if defined(THREAD_SANITIZER)
+}
+
+// Same as the ReadWriteBinaryWithNulls scenario above, but the maximum number
+// of elements in an array is small, while there are a big number of array
cells.
+TEST_P(TestCFileBinaryArrayValues, ReadWriteBinaryWithNullsShortArrays) {
+ Run<BinaryArrayRandomDataGenerator</* HAS_NULLS */true,
+ /* HAS_NULLS_IN_ARRAY */true,
+#if defined(THREAD_SANITIZER)
+ /* MAX_NUM_ELEMENTS_IN_ARRAY
*/3>>(1000/*nrows*/);
+#else
+ /* MAX_NUM_ELEMENTS_IN_ARRAY
*/3>>(100000/*nrows*/);
+#endif // #if defined(THREAD_SANITIZER)
+}
+
} // namespace cfile
} // namespace kudu
diff --git a/src/kudu/cfile/cfile_reader.cc b/src/kudu/cfile/cfile_reader.cc
index ffd24bd5f..9fde2219f 100644
--- a/src/kudu/cfile/cfile_reader.cc
+++ b/src/kudu/cfile/cfile_reader.cc
@@ -17,8 +17,11 @@
#include "kudu/cfile/cfile_reader.h"
+#include <sys/types.h>
+
#include <algorithm>
#include <cstring>
+#include <limits>
#include <memory>
#include <ostream>
#include <utility>
@@ -36,6 +39,7 @@
#include "kudu/cfile/cfile_writer.h" // for kMagicString
#include "kudu/cfile/index_btree.h"
#include "kudu/cfile/type_encodings.h"
+#include "kudu/common/array_type_serdes.h"
#include "kudu/common/column_materialization_context.h"
#include "kudu/common/column_predicate.h"
#include "kudu/common/columnblock.h"
@@ -43,6 +47,7 @@
#include "kudu/common/encoded_key.h"
#include "kudu/common/key_encoder.h"
#include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
#include "kudu/common/types.h"
#include "kudu/fs/error_manager.h"
#include "kudu/fs/io_context.h"
@@ -85,6 +90,8 @@ DEFINE_double(cfile_inject_corruption, 0,
"with a corruption status");
TAG_FLAG(cfile_inject_corruption, hidden);
+DECLARE_bool(cfile_support_arrays);
+
using kudu::fault_injection::MaybeTrue;
using kudu::fs::ErrorHandlerType;
using kudu::fs::IOContext;
@@ -188,6 +195,12 @@ Status CFileReader::InitOnce(const IOContext* io_context) {
footer_->incompatible_features(),
IncompatibleFeatures::SUPPORTED));
}
+ if (PREDICT_FALSE(footer_->is_type_array() && !FLAGS_cfile_support_arrays)) {
+ static const auto kErrStatus = Status::ConfigurationError(
+ "support for array data blocks is disabled");
+ LOG(DFATAL) << kErrStatus.ToString();
+ return kErrStatus;
+ }
type_info_ = footer_->is_type_array() ?
GetArrayTypeInfo(footer_->data_type())
: GetTypeInfo(footer_->data_type());
@@ -777,7 +790,31 @@ void CFileIterator::SeekToPositionInBlock(PreparedBlock*
pb, uint32_t idx_in_blo
// we need to translate from 'ord_idx' (the absolute row id)
// to the index within the non-null entries.
uint32_t index_within_nonnulls;
- if (reader_->is_nullable()) {
+ if (reader_->is_array()) {
+ DCHECK_LT(idx_in_block, pb->num_rows_in_block_);
+ if (PREDICT_TRUE(pb->idx_in_block_ <= idx_in_block)) {
+ // Seeking forward.
+ pb->array_rle_decoder_.Skip(idx_in_block - pb->idx_in_block_);
+ DCHECK_LT(pb->idx_in_block_, pb->array_start_indices_.size());
+ // Indices in the flattened value sequence (including null elements).
+ ssize_t flattened_idx_to_seek = pb->array_start_indices_[idx_in_block];
+ ssize_t flattened_idx_cur = pb->array_start_indices_[pb->idx_in_block_];
+ DCHECK_LE(flattened_idx_cur, flattened_idx_to_seek);
+ ssize_t nskip = flattened_idx_to_seek - flattened_idx_cur;
+ index_within_nonnulls = pb->dblk_->GetCurrentIndex() +
pb->rle_decoder_.Skip(nskip);
+ } else {
+ // Seeking backward: need to reset the positions in both array and
+ // flattened values bitmaps and rewind them forward to required offsets.
+ pb->array_rle_decoder_ = RleDecoder<bool,
1>(pb->array_rle_bitmap_.data(),
+
pb->array_rle_bitmap_.size());
+ pb->array_rle_decoder_.Skip(idx_in_block);
+ DCHECK_LE(idx_in_block, pb->array_start_indices_.size());
+ ssize_t flattened_idx_to_seek = pb->array_start_indices_[idx_in_block];
+ pb->rle_decoder_ = RleDecoder<bool, 1>(pb->rle_bitmap.data(),
+ pb->rle_bitmap.size());
+ index_within_nonnulls = pb->rle_decoder_.Skip(flattened_idx_to_seek);
+ }
+ } else if (reader_->is_nullable()) {
if (PREDICT_TRUE(pb->idx_in_block_ <= idx_in_block)) {
// We are seeking forward. Skip from the current position in the RLE
decoder
// instead of going back to the beginning of the block.
@@ -964,6 +1001,47 @@ Status DecodeNullInfo(scoped_refptr<BlockHandle>*
data_block_handle,
return Status::OK();
}
+Status DecodeArrayInfo(scoped_refptr<BlockHandle>* data_block_handle,
+ uint32_t* num_flattened_elements_in_block,
+ uint32_t* num_arrays_in_block,
+ Slice* flattened_non_null_bitmap,
+ Slice* array_non_null_bitmap,
+ Slice* array_elem_num_seq) {
+ Slice data_block = (*data_block_handle)->data();
+
+ if (PREDICT_FALSE(!GetVarint32(&data_block,
num_flattened_elements_in_block))) {
+ return Status::Corruption("bad array header, num flattened elements");
+ }
+ if (PREDICT_FALSE(!GetVarint32(&data_block, num_arrays_in_block))) {
+ return Status::Corruption("bad array header, array count");
+ }
+
+ uint32_t flattened_non_null_bitmap_size;
+ if (PREDICT_FALSE(!GetVarint32(&data_block,
&flattened_non_null_bitmap_size))) {
+ return Status::Corruption("bad array header, non-null bitmap size");
+ }
+ *flattened_non_null_bitmap = Slice(data_block.data(),
flattened_non_null_bitmap_size);
+ data_block.remove_prefix(flattened_non_null_bitmap_size);
+
+ uint32_t array_elem_num_seq_size;
+ if (PREDICT_FALSE(!GetVarint32(&data_block, &array_elem_num_seq_size))) {
+ return Status::Corruption("bad array header, array element numbers size");
+ }
+ *array_elem_num_seq = Slice(data_block.data(), array_elem_num_seq_size);
+ data_block.remove_prefix(array_elem_num_seq_size);
+
+ uint32_t array_non_null_bitmap_size;
+ if (PREDICT_FALSE(!GetVarint32(&data_block, &array_non_null_bitmap_size))) {
+ return Status::Corruption("bad array header, non-null array bitmap size");
+ }
+ *array_non_null_bitmap = Slice(data_block.data(),
array_non_null_bitmap_size);
+ data_block.remove_prefix(array_non_null_bitmap_size);
+
+ auto offset = data_block.data() - (*data_block_handle)->data().data();
+ *data_block_handle = (*data_block_handle)->SubrangeBlock(offset,
data_block.size());
+ return Status::OK();
+}
+
Status CFileIterator::ReadCurrentDataBlock(const IndexTreeIterator& idx_iter,
PreparedBlock* prep_block) {
prep_block->dblk_ptr_ = idx_iter.GetCurrentBlockPointer();
@@ -972,8 +1050,44 @@ Status CFileIterator::ReadCurrentDataBlock(const
IndexTreeIterator& idx_iter,
uint32_t num_rows_in_block = 0;
scoped_refptr<BlockHandle> data_block = prep_block->dblk_handle_;
- size_t total_size_read = data_block->data().size();
- if (reader_->is_nullable()) {
+ const size_t total_size_read = data_block->data().size();
+ if (reader_->is_array()) {
+ Slice array_elem_num_seq;
+ RETURN_NOT_OK(DecodeArrayInfo(&data_block,
+ &(prep_block->num_flattened_elems_in_block_),
+ &num_rows_in_block,
+ &(prep_block->rle_bitmap),
+ &(prep_block->array_rle_bitmap_),
+ &array_elem_num_seq));
+ prep_block->rle_decoder_ = RleDecoder<bool, 1>(
+ prep_block->rle_bitmap.data(),
+ prep_block->rle_bitmap.size());
+ prep_block->array_rle_decoder_ = RleDecoder<bool, 1>(
+ prep_block->array_rle_bitmap_.data(),
+ prep_block->array_rle_bitmap_.size());
+
+ // Build array start indices given array element number sequence.
+ {
+ auto& start_indices = prep_block->array_start_indices_;
+ start_indices.reserve(num_rows_in_block);
+
+ RleDecoder<uint16_t, 16> dec(array_elem_num_seq.data(),
+ array_elem_num_seq.size());
+ size_t idx = 0;
+ uint16_t elem_num;
+ while (const size_t run_len = dec.GetNextRun(&elem_num,
num_rows_in_block)) {
+ for (size_t i = 0; i < run_len; ++i) {
+ start_indices.emplace_back(idx);
+ idx += elem_num;
+ }
+ }
+ if (PREDICT_FALSE(num_rows_in_block != start_indices.size())) {
+ return Status::Corruption(
+ "bad array header, number of arrays mismatch");
+ }
+ DCHECK_LE(start_indices.size(), std::numeric_limits<uint32_t>::max());
+ }
+ } else if (reader_->is_nullable()) {
RETURN_NOT_OK(DecodeNullInfo(&data_block, &num_rows_in_block,
&(prep_block->rle_bitmap)));
prep_block->rle_decoder_ = RleDecoder<bool,
1>(prep_block->rle_bitmap.data(),
prep_block->rle_bitmap.size());
@@ -987,10 +1101,11 @@ Status CFileIterator::ReadCurrentDataBlock(const
IndexTreeIterator& idx_iter,
reader_->block_id().ToString(),
prep_block->dblk_ptr_.ToString()));
- // For nullable blocks, we filled in the row count from the null information
above,
- // since the data block decoder only knows about the non-null values.
- // For non-nullable ones, we use the information from the block decoder.
- if (!reader_->is_nullable()) {
+ // For nullable and array data blocks, the row count information is set
+ // in the code above since the data block decoder only knows about the
+ // non-null values. For non-nullable data blocks, the correspondind
+ // information is provided by the block decoder.
+ if (!reader_->is_nullable() && !reader_->is_array()) {
num_rows_in_block = prep_block->dblk_->Count();
}
@@ -1116,6 +1231,10 @@ Status CFileIterator::FinishBatch() {
}
Status CFileIterator::Scan(ColumnMaterializationContext* ctx) {
+ // Utility block memory for reading in array data, if any.
+ unique_ptr<RowBlockMemory> array_rbm(
+ reader_->is_array() ? new RowBlockMemory : nullptr);
+
DCHECK(seeked_) << "not seeked";
// Use views to advance the block and selection vector as we read into them.
@@ -1147,12 +1266,137 @@ Status
CFileIterator::Scan(ColumnMaterializationContext* ctx) {
// that might be more efficient (allowing the decoder to save internal
state
// instead of having to reconstruct it)
}
- if (reader_->is_nullable()) {
+ if (reader_->is_array()) {
+ DCHECK(ctx->block()->type_info()->is_array());
+ DCHECK_GE(pb->num_rows_in_block_, pb->idx_in_block_);
+ const auto& start_indices = pb->array_start_indices_;
+ size_t cur_flattened_idx = 0;
+
+ ssize_t count = std::min(rem, pb->num_rows_in_block_ -
pb->idx_in_block_);
+ while (count > 0) {
+ // Procesing one array at a time, peeking into the next array's start
+ // index in the flattened sequence to know how many elements to read
+ // from the flattened sequence.
+ const auto idx = pb->idx_in_block_;
+ DCHECK_LE(idx, start_indices.size());
+
+ const uint32_t flattened_idx = start_indices[idx];
+ const uint32_t flattened_next_idx =
+ (idx + 1) == start_indices.size() ?
pb->num_flattened_elems_in_block_
+ : start_indices[idx + 1];
+ if (PREDICT_FALSE(flattened_next_idx < flattened_idx)) {
+ static constexpr const char* const kErrMsg =
+ "non-monotonous array start indices";
+ LOG(DFATAL) << kErrMsg;
+ return Status::Corruption(kErrMsg);
+ }
+ if (PREDICT_FALSE(flattened_idx > pb->num_flattened_elems_in_block_ ||
+ flattened_next_idx >
pb->num_flattened_elems_in_block_)) {
+ // Trailing elements of the array start indices may have be set
+ // to pb->num_flattened_elems_in_block_ if the corresponding trailing
+ // arrays are empty or null, but they should never be greater than
+ // the total number of elements in the flattened sequence.
+ static constexpr const char* const kErrMsg =
+ "out-of-bounds array start index";
+ LOG(DFATAL) << kErrMsg;
+ return Status::Corruption(kErrMsg);
+ }
+
+ bool array_not_null = false;
+ const size_t array_run_len = pb->array_rle_decoder_.GetNextRun(
+ &array_not_null, 1);
+ if (PREDICT_FALSE(array_run_len != 1)) {
+ static constexpr const char* const kErrMsg =
+ "unexpected end of array non-null bitmap";
+ LOG(DFATAL) << kErrMsg;
+ return Status::Corruption(kErrMsg);
+ }
+ ColumnDataView* dst = &remaining_dst;
+ if (array_not_null) {
+ const auto* ati = reader_->type_info();
+ DCHECK(ati->is_array());
+ const TypeInfo* elem_type_info =
ati->nested_type_info()->array().elem_type_info();
+ DCHECK(elem_type_info);
+
+ if (flattened_idx == flattened_next_idx) {
+ // An empty array cell: no need to fetch any data.
+ memset(dst->data(), 0, sizeof(Slice));
+ } else {
+ // A non-empty array cell: need to fetch its elements from
+ // the flattened sequence.
+
+ const size_t array_elems_to_fetch = flattened_next_idx -
flattened_idx;
+ // TODO(aserbin): is it possible to reuse array_rbm or the column
+ // block's memory for this?
+ // TODO(aserbin): allocate it once for the whole block?
+ unique_ptr<uint8_t[]> cblock_data(
+ new uint8_t[array_elems_to_fetch * elem_type_info->size()]);
+ unique_ptr<uint8_t[]> cblock_not_null_bitmap(
+ new uint8_t[BitmapSize(array_elems_to_fetch)]);
+
+ // Column block to help with reading in array elements for current
+ // array cell.
+ ColumnBlock cblock(
+ elem_type_info,
+ cblock_not_null_bitmap.get(),
+ cblock_data.get(),
+ array_elems_to_fetch,
+ array_rbm.get());
+ ColumnDataView cdv(&cblock);
+
+ bool not_null = false;
+ size_t num_fetched = 0;
+ while (num_fetched + flattened_idx < flattened_next_idx) {
+ size_t run_len = pb->rle_decoder_.GetNextRun(
+ ¬_null, flattened_next_idx - flattened_idx - num_fetched);
+ if (PREDICT_FALSE(run_len == 0)) {
+ static constexpr const char* const kErrMsg =
+ "unexpected end of flattened non-null bitmap";
+ LOG(DFATAL) << kErrMsg;
+ return Status::Corruption(kErrMsg);
+ }
+
+ size_t this_batch = run_len;
+ if (not_null) {
+ RETURN_NOT_OK(pb->dblk_->CopyNextValues(&this_batch, &cdv));
+ DCHECK_EQ(run_len, this_batch);
+ pb->needs_rewind_ = true;
+ }
+ cdv.SetNonNullBits(this_batch, not_null);
+ cdv.Advance(this_batch);
+
+ num_fetched += run_len;
+ // Flattened element count includes null elements.
+ cur_flattened_idx += run_len;
+ }
+
+ // Serialize the read data into dst->arena().
+ Slice cell_out;
+ RETURN_NOT_OK(SerializeIntoArena(
+ elem_type_info,
+ cblock_data.get(),
+ cblock_not_null_bitmap.get(),
+ cblock.nrows(),
+ dst->arena(),
+ &cell_out));
+ DCHECK_EQ(ati->size(), sizeof(Slice));
+ DCHECK_GT(dst->nrows(), 0);
+ memcpy(dst->data(), &cell_out, sizeof(Slice));
+ }
+ }
+ dst->SetNonNullBits(1, array_not_null);
+ dst->Advance(1);
+ remaining_sel.Advance(1);
+ pb->idx_in_block_++;
+ --rem;
+ --count;
+ }
+ DCHECK_LE(cur_flattened_idx, pb->num_flattened_elems_in_block_);
+ } else if (reader_->is_nullable()) {
DCHECK(ctx->block()->is_nullable());
+ DCHECK_GE(pb->num_rows_in_block_, pb->idx_in_block_);
- size_t nrows = std::min(rem, pb->num_rows_in_block_ - pb->idx_in_block_);
- // Fill column bitmap
- size_t count = nrows;
+ ssize_t count = std::min(rem, pb->num_rows_in_block_ -
pb->idx_in_block_);
while (count > 0) {
bool not_null = false;
size_t nblock = pb->rle_decoder_.GetNextRun(¬_null, count);
diff --git a/src/kudu/cfile/cfile_reader.h b/src/kudu/cfile/cfile_reader.h
index f6906b2f7..eafa01c49 100644
--- a/src/kudu/cfile/cfile_reader.h
+++ b/src/kudu/cfile/cfile_reader.h
@@ -144,6 +144,10 @@ class CFileReader {
return footer().is_type_nullable();
}
+ bool is_array() const {
+ return footer().is_type_array();
+ }
+
const CFileHeaderPB& header() const {
DCHECK(init_once_.init_succeeded());
return *DCHECK_NOTNULL(header_.get());
@@ -439,13 +443,25 @@ class CFileIterator final : public ColumnIterator {
bool needs_rewind_;
uint32_t rewind_idx_;
- // Total number of rows in the block (nulls + not nulls)
+ // Total number of rows in the block (nulls + not nulls).
uint32_t num_rows_in_block_;
- // Null bitmap and bitmap (RLE) decoder
+ // Non-null bitmap and bitmap (RLE) decoder.
+ // This is a bitmap for the flattened/concatenated element sequence
+ // for array data blocks.
Slice rle_bitmap;
RleDecoder<bool, 1> rle_decoder_;
+ // Array non-null bitmap and bitmap (RLE) decoder
+ Slice array_rle_bitmap_;
+ RleDecoder<bool, 1> array_rle_decoder_;
+
+ // Array start indices (used only for array data blocks).
+ std::vector<uint32_t> array_start_indices_;
+
+ // Total number of values in the 'flattened' sequence in an array block.
+ uint32_t num_flattened_elems_in_block_ = 0;
+
rowid_t last_row_idx() const {
return first_row_idx() + num_rows_in_block_ - 1;
}
diff --git a/src/kudu/cfile/cfile_util.h b/src/kudu/cfile/cfile_util.h
index 74b6e8f7b..2ec873bc3 100644
--- a/src/kudu/cfile/cfile_util.h
+++ b/src/kudu/cfile/cfile_util.h
@@ -47,7 +47,10 @@ enum IncompatibleFeatures {
// Write a crc32 checksum at the end of each cfile block
CHECKSUM = 1 << 0,
- SUPPORTED = NONE | CHECKSUM
+ // Support for reading/writing array data blocks
+ ARRAY_DATA_BLOCK = 1 << 1,
+
+ SUPPORTED = NONE | CHECKSUM | ARRAY_DATA_BLOCK
};
typedef std::function<void(const void*, faststring*)> ValidxKeyEncoder;
diff --git a/src/kudu/cfile/cfile_writer.cc b/src/kudu/cfile/cfile_writer.cc
index 433d70f3e..c23d5b3a4 100644
--- a/src/kudu/cfile/cfile_writer.cc
+++ b/src/kudu/cfile/cfile_writer.cc
@@ -17,6 +17,8 @@
#include "kudu/cfile/cfile_writer.h"
+#include <sys/types.h>
+
#include <functional>
#include <iterator>
#include <numeric>
@@ -34,6 +36,7 @@
#include "kudu/cfile/cfile_util.h"
#include "kudu/cfile/index_btree.h"
#include "kudu/cfile/type_encodings.h"
+#include "kudu/common/array_cell_view.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/key_encoder.h"
#include "kudu/common/schema.h"
@@ -63,6 +66,10 @@ DEFINE_bool(cfile_write_checksums, true,
"Write CRC32 checksums for each block");
TAG_FLAG(cfile_write_checksums, evolving);
+DEFINE_bool(cfile_support_arrays, false,
+ "Support encoding/decoding of arrays in CFile data blocks");
+TAG_FLAG(cfile_support_arrays, evolving);
+
using google::protobuf::RepeatedPtrField;
using kudu::fs::BlockCreationTransaction;
using kudu::fs::BlockManager;
@@ -82,6 +89,9 @@ const size_t kChecksumSize = sizeof(uint32_t);
static const size_t kMinBlockSize = 512;
+// TODO(aserbin): source this from a flag?
+static const size_t kMaxElementsInArray = 1024;
+
class NonNullBitmapBuilder {
public:
explicit NonNullBitmapBuilder(size_t initial_row_capacity)
@@ -117,6 +127,42 @@ class NonNullBitmapBuilder {
RleEncoder<bool, 1> rle_encoder_;
};
+class ArrayElemNumBuilder {
+ public:
+ explicit ArrayElemNumBuilder(size_t initial_row_capacity)
+ : nitems_(0),
+ // TODO(aserbin): improve or remove the estimate for the buffer size
+ buffer_(initial_row_capacity * sizeof(uint32_t)),
+ rle_encoder_(&buffer_) {
+ }
+
+ size_t nitems() const {
+ return nitems_;
+ }
+
+ // If value parameter is true, it means that all values in this run are null
+ void Add(uint32_t elem_num) {
+ ++nitems_;
+ rle_encoder_.Put(elem_num, 1);
+ }
+
+ // NOTE: the returned Slice is only valid until this Builder is destroyed or
Reset
+ Slice Finish() {
+ int len = rle_encoder_.Flush();
+ return Slice(buffer_.data(), len);
+ }
+
+ void Reset() {
+ nitems_ = 0;
+ rle_encoder_.Clear();
+ }
+
+ private:
+ size_t nitems_;
+ faststring buffer_;
+ RleEncoder<uint16_t, 16> rle_encoder_;
+};
+
////////////////////////////////////////////////////////////
// CFileWriter
////////////////////////////////////////////////////////////
@@ -132,6 +178,7 @@ CFileWriter::CFileWriter(WriterOptions options,
value_count_(0),
is_nullable_(is_nullable),
typeinfo_(typeinfo),
+ is_array_(typeinfo->is_array()),
state_(kWriterInitialized) {
const EncodingType encoding = options_.storage_attributes.encoding;
if (auto s = TypeEncodingInfo::Get(typeinfo_, encoding,
&type_encoding_info_);
@@ -183,6 +230,13 @@ Status CFileWriter::Start() {
TRACE_EVENT0("cfile", "CFileWriter::Start");
DCHECK(state_ == kWriterInitialized) << "bad state for Start(): " << state_;
+ if (PREDICT_FALSE(is_array_ && !FLAGS_cfile_support_arrays)) {
+ static const auto kErrStatus = Status::ConfigurationError(
+ "support for array data blocks is disabled");
+ LOG(DFATAL) << kErrStatus.ToString();
+ return kErrStatus;
+ }
+
if (compression_ != NO_COMPRESSION) {
const CompressionCodec* codec;
RETURN_NOT_OK(GetCompressionCodec(compression_, &codec));
@@ -214,9 +268,19 @@ Status CFileWriter::Start() {
RETURN_NOT_OK_PREPEND(WriteRawData(header_slices), "Couldn't write header");
data_block_ = type_encoding_info_->CreateBlockBuilder(&options_);
- if (is_nullable_) {
- size_t nrows = ((options_.storage_attributes.cfile_block_size +
typeinfo_->size() - 1) /
- typeinfo_->size());
+ if (is_array_) {
+ // Array data blocks allows nullable elements both for array's elements
+ // and for arrays cells themselves.
+ size_t nrows =
+ (options_.storage_attributes.cfile_block_size + typeinfo_->size() - 1)
/
+ typeinfo_->size();
+ array_non_null_bitmap_builder_.reset(new NonNullBitmapBuilder(nrows));
+ non_null_bitmap_builder_.reset(new NonNullBitmapBuilder(nrows *
kMaxElementsInArray));
+ array_elem_num_builder_.reset(new ArrayElemNumBuilder(nrows));
+ } else if (is_nullable_) {
+ size_t nrows =
+ (options_.storage_attributes.cfile_block_size + typeinfo_->size() - 1)
/
+ typeinfo_->size();
non_null_bitmap_builder_.reset(new NonNullBitmapBuilder(nrows * 8));
}
@@ -237,19 +301,40 @@ Status
CFileWriter::FinishAndReleaseBlock(BlockCreationTransaction* transaction)
TRACE_EVENT0("cfile", "CFileWriter::FinishAndReleaseBlock");
DCHECK(state_ == kWriterWriting) << "Bad state for Finish(): " << state_;
- // Write out any pending values as the last data block.
- RETURN_NOT_OK(FinishCurDataBlock());
-
- state_ = kWriterFinished;
-
- uint32_t incompatible_features = 0;
+ uint32_t incompatible_features = IncompatibleFeatures::NONE;
if (FLAGS_cfile_write_checksums) {
incompatible_features |= IncompatibleFeatures::CHECKSUM;
}
+ if (is_array_) {
+ incompatible_features |= IncompatibleFeatures::ARRAY_DATA_BLOCK;
+ }
+
+ // Write out any pending values as the last data block.
+ if (is_array_) {
+ RETURN_NOT_OK(FinishCurArrayDataBlock());
+ } else {
+ RETURN_NOT_OK(FinishCurDataBlock());
+ }
+
+ state_ = kWriterFinished;
// Start preparing the footer.
CFileFooterPB footer;
- footer.set_data_type(typeinfo_->type());
+ if (is_array_) {
+ // For 1D arrays, the 'data_type' field is set to reflect the type
+ // of the array's elements. Elements of an array can be nullable.
+ const auto* desc = typeinfo_->nested_type_info();
+ DCHECK(desc);
+ DCHECK(desc->is_array());
+ DCHECK(desc->array().elem_type_info());
+ footer.set_data_type(desc->array().elem_type_info()->type());
+ footer.set_is_type_array(true);
+ } else {
+ footer.set_data_type(typeinfo_->type());
+ // NOTE: leaving the 'is_type_array' field as unset: semantically it's the
+ // same as if it were set to 'false', but the result CFile footer
+ // would be a few bytes larger if explicitly setting the field
+ }
footer.set_is_type_nullable(is_nullable_);
footer.set_encoding(type_encoding_info_->encoding_type());
footer.set_num_values(value_count_);
@@ -327,6 +412,7 @@ void
CFileWriter::FlushMetadataToPB(RepeatedPtrField<FileMetadataPairPB>* field)
Status CFileWriter::AppendEntries(const void* entries, size_t count) {
DCHECK(!is_nullable_);
+ DCHECK(!is_array_);
int rem = count;
@@ -353,6 +439,7 @@ Status CFileWriter::AppendNullableEntries(const uint8_t*
bitmap,
const void* entries,
size_t count) {
DCHECK(is_nullable_ && bitmap != nullptr);
+ DCHECK(!is_array_);
const uint8_t* ptr = reinterpret_cast<const uint8_t*>(entries);
@@ -386,7 +473,124 @@ Status CFileWriter::AppendNullableEntries(const uint8_t*
bitmap,
return Status::OK();
}
+Status CFileWriter::AppendNullableArrayEntries(const uint8_t* bitmap,
+ const void* entries,
+ size_t count) {
+ DCHECK_EQ(DataType::NESTED, typeinfo_->type());
+ DCHECK_EQ(sizeof(Slice), typeinfo_->size());
+
+ // For 1D arrays, get information on the elements.
+ const auto* desc = typeinfo_->nested_type_info();
+ DCHECK(desc);
+ DCHECK(desc->is_array());
+ // For 1D arrays, the encoder is chosen based on the type of the array's
+ // elements.
+ const auto* const elem_type_info = desc->array().elem_type_info();
+ DCHECK(elem_type_info);
+ const size_t elem_size = elem_type_info->size();
+
+ const Slice* cells_ptr = reinterpret_cast<const Slice*>(entries);
+ BitmapIterator cell_bitmap_it(bitmap, count);
+ size_t cur_cell_idx = 0;
+
+ size_t cells_num = 0;
+ bool cell_is_not_null = false;
+ while ((cells_num = cell_bitmap_it.Next(&cell_is_not_null)) > 0) {
+ if (!cell_is_not_null) {
+ // This is a run of null array-type cells.
+
+ value_count_ += cells_num;
+ array_non_null_bitmap_builder_->AddRun(false, cells_num);
+ for (size_t i = 0; i < cells_num; ++i) {
+ array_elem_num_builder_->Add(0);
+ }
+
+#if DCHECK_IS_ON()
+ // TODO(aserbin): re-enable this extra validation after
SetNull()/set_null()
+ // updates the underlying Slice for previously stored
+ // non-null cell's data in ContiguousRow and elsewhere
+ // if DCHECK_IS_ON()
+ //const Slice* cell = cells_ptr;
+ //DCHECK(cell->empty());
+#endif // if DCHECK_IS_ON() ...
+
+ cells_ptr += cells_num;
+ cur_cell_idx += cells_num;
+ } else {
+ // This is a run of non-null array-type cells.
+ for (size_t i = 0; i < cells_num; ++i, ++cur_cell_idx, ++cells_ptr) {
+ ++value_count_;
+ array_non_null_bitmap_builder_->AddRun(true, 1);
+
+ // Information on validity of the elements in the in-cell array.
+ const Slice* cell = cells_ptr;
+ DCHECK(cell);
+ ArrayCellMetadataView view(cell->data(), cell->size());
+ RETURN_NOT_OK(view.Init());
+
+ // Add information on the array's elements boundary
+ // in the flattened sequence.
+ const size_t cell_elem_num = view.elem_num();
+ array_elem_num_builder_->Add(cell_elem_num);
+ if (cell_elem_num == 0) {
+ // Current cell contains an empty array.
+ //DCHECK(!cell->data_);
+ //DCHECK(!cell->non_null_bitmap_);
+ continue;
+ }
+
+ const uint8_t* cell_non_null_bitmap = view.not_null_bitmap();
+ DCHECK(cell_non_null_bitmap);
+ BitmapIterator elem_bitmap_iter(cell_non_null_bitmap,
+ cell_elem_num);
+ const uint8_t* data = view.data_as(elem_type_info->type());
+ DCHECK(data);
+
+ // Mask the 'block is full' while writing a single array.
+ data_block_->SetBlockFullMasked(true);
+ size_t elem_num = 0;
+ bool elem_is_non_null = false;
+ while ((elem_num = elem_bitmap_iter.Next(&elem_is_non_null)) > 0) {
+ if (!elem_is_non_null) {
+ // Add info on the run of 'elem_num' null elements in the array.
+ non_null_bitmap_builder_->AddRun(false, elem_num);
+ // Skip over the null elements in the input data.
+ data += elem_num * elem_size;
+ continue;
+ }
+
+ // A run of 'elem_num' non-null elements in the array.
+ ssize_t elem_rem = elem_num;
+ do {
+ int n = data_block_->Add(data, elem_rem);
+ DCHECK_GT(n, 0);
+
+ non_null_bitmap_builder_->AddRun(true, n);
+ data += n * elem_size;
+ elem_rem -= n;
+ } while (elem_rem > 0);
+ }
+ // Unmask the 'block is full' logic, so the logic below works
+ // as necessary.
+ data_block_->SetBlockFullMasked(false);
+
+ // If the current block is full, switch to a new one.
+ if (data_block_->IsBlockFull()) {
+ // NOTE: with long arrays the block may get quite beyond the size
+ // threshold before switching to the next one.
+ RETURN_NOT_OK(FinishCurArrayDataBlock());
+ }
+ }
+ }
+ }
+
+ DCHECK_EQ(count, cur_cell_idx);
+
+ return Status::OK();
+}
+
Status CFileWriter::FinishCurDataBlock() {
+ DCHECK(!is_array_);
const uint32_t num_elems_in_block =
is_nullable_ ? non_null_bitmap_builder_->nitems() : data_block_->Count();
if (PREDICT_FALSE(num_elems_in_block == 0)) {
@@ -444,6 +648,100 @@ Status CFileWriter::FinishCurDataBlock() {
return s;
}
+Status CFileWriter::FinishCurArrayDataBlock() {
+ DCHECK(!validx_builder_); // array-type column cannot be a part of primary
key
+
+ // Number of array cells in the block.
+ const uint32_t num_arrays_in_block =
array_non_null_bitmap_builder_->nitems();
+ const uint32_t num_elems_in_block = non_null_bitmap_builder_->nitems();
+ if (PREDICT_FALSE(num_arrays_in_block == 0)) {
+ DCHECK_EQ(0, num_elems_in_block);
+ return Status::OK();
+ }
+
+ DCHECK_GE(value_count_, num_arrays_in_block);
+ rowid_t first_array_ord = value_count_ - num_arrays_in_block;
+ VLOG(1) << "Appending nullable array data block for values "
+ << first_array_ord << "-" << value_count_;
+
+ // The current data block is full, need to push it into the file.
+ Status s;
+ {
+ vector<Slice> data_slices;
+ data_block_->Finish(first_array_ord, &data_slices);
+
+ // A nullable array data block has the following layout
+ // (see docs/design-docs/cfile.md for more details):
+ //
+ // flattened value count : unsigned [LEB128] encoded count of
values
+ //
+ // array count : unsigned [LEB128] encoded count of
arrays
+ //
+ // flattened non-null bitmap size : unsigned [LEB128] encoded size
+ // of the following non-null bitmap
+ //
+ // flattened non-null bitmap : [RLE] encoded bitmap
+ //
+ // array element numbers size : unsigned [LEB128] encoded size of the
+ // following field
+ //
+ // array element numbers : [RLE] encoded sequence of 16-bit
+ // unsigned integers
+ //
+ // array non-null bitmap size : unsigned [LEB128] encoded size of the
+ // following non-null bitmap
+ //
+ // array non-null bitmap : [RLE] encoded bitmap on non-nullness
+ // of array cells
+ //
+ // data : encoded non-null data values
+ // (a.k.a. 'flattened sequence of
elements')
+ vector<Slice> v;
+ v.reserve(data_slices.size() + 6);
+
+ const Slice flattened_non_null_bitmap = non_null_bitmap_builder_->Finish();
+ faststring array_headers_0;
+ PutVarint32(&array_headers_0, num_elems_in_block);
+ PutVarint32(&array_headers_0, array_elem_num_builder_->nitems());
+ PutVarint32(&array_headers_0,
static_cast<uint32_t>(flattened_non_null_bitmap.size()));
+ v.emplace_back(array_headers_0.data(), array_headers_0.size());
+ if (!flattened_non_null_bitmap.empty()) {
+ v.emplace_back(flattened_non_null_bitmap);
+ }
+
+ const Slice array_elem_num_encoded = array_elem_num_builder_->Finish();
+ faststring array_headers_1;
+ PutVarint32(&array_headers_1,
static_cast<uint32_t>(array_elem_num_encoded.size()));
+ v.emplace_back(array_headers_1.data(), array_headers_1.size());
+ if (!array_elem_num_encoded.empty()) {
+ v.emplace_back(array_elem_num_encoded);
+ }
+
+ const Slice array_non_null_bitmap =
array_non_null_bitmap_builder_->Finish();
+ DCHECK(!array_non_null_bitmap.empty());
+ faststring array_headers_2;
+ PutVarint32(&array_headers_2,
static_cast<uint32_t>(array_non_null_bitmap.size()));
+ v.emplace_back(array_headers_2.data(), array_headers_2.size());
+ v.emplace_back(array_non_null_bitmap);
+
+ std::move(data_slices.begin(), data_slices.end(), std::back_inserter(v));
+ s = AppendRawBlock(std::move(v),
+ first_array_ord,
+ nullptr,
+ Slice(last_key_),
+ "array data block");
+ }
+
+ // Reset per-block state.
+ non_null_bitmap_builder_->Reset();
+ array_non_null_bitmap_builder_->Reset();
+ array_elem_num_builder_->Reset();
+
+ data_block_->Reset();
+
+ return s;
+}
+
Status CFileWriter::AppendRawBlock(vector<Slice> data_slices,
size_t ordinal_pos,
const void* validx_curr,
diff --git a/src/kudu/cfile/cfile_writer.h b/src/kudu/cfile/cfile_writer.h
index f96ec27cf..e3fe23eda 100644
--- a/src/kudu/cfile/cfile_writer.h
+++ b/src/kudu/cfile/cfile_writer.h
@@ -52,6 +52,7 @@ extern const char kMagicStringV2[];
extern const int kMagicLength;
extern const size_t kChecksumSize;
+class ArrayElemNumBuilder;
class NonNullBitmapBuilder;
// Main class used to write a CFile.
@@ -97,6 +98,23 @@ class CFileWriter final {
// 'entries' still will have 10 elements in it
Status AppendNullableEntries(const uint8_t* bitmap, const void* entries,
size_t count);
+ // Similar to AppendNullableEntries above, but for appending array-type
+ // column blocks.
+ //
+ // The 'entries' is a pointer to a C-style array of Slice elements,
+ // where each Slice element represents a cell of an array-type column.
+ // The 'entries' may contain NULL array cells as well, and the validity
+ // of a cell (i.e. whether it's a non-NULL cell) is determined by
+ // corresponding bit in 'bitmap': 1 means that the cell contains an array
+ // (NOTE: the array may be empty, i.e. contain no elements), and 0 means
+ // the cell is nil (NULL).
+ //
+ // The information on the validity of elemenents in each of the array cells
+ // is encoded in the cell's data.
+ Status AppendNullableArrayEntries(const uint8_t* bitmap,
+ const void* entries,
+ size_t count);
+
// Append a raw block to the file, adding it to the various indexes.
//
// The Slices in 'data_slices' are concatenated to form the block.
@@ -153,6 +171,7 @@ class CFileWriter final {
Status WriteRawData(const std::vector<Slice>& data);
Status FinishCurDataBlock();
+ Status FinishCurArrayDataBlock();
// Flush the current unflushed_metadata_ entries into the given protobuf
// field, clearing the buffer.
@@ -166,7 +185,8 @@ class CFileWriter final {
// Current file offset.
uint64_t off_;
- // Current number of values that have been appended.
+ // Current number of values that have been appended. It's accumulated
+ // across all the blocks that are to be written into this CFile.
rowid_t value_count_;
// Type of data being written
@@ -174,6 +194,7 @@ class CFileWriter final {
CompressionType compression_;
const TypeInfo* typeinfo_;
const TypeEncodingInfo* type_encoding_info_;
+ const bool is_array_;
// The last key written to the block.
// Only set if the writer is writing an embedded value index.
@@ -189,6 +210,8 @@ class CFileWriter final {
std::unique_ptr<IndexTreeBuilder> posidx_builder_;
std::unique_ptr<IndexTreeBuilder> validx_builder_;
std::unique_ptr<NonNullBitmapBuilder> non_null_bitmap_builder_;
+ std::unique_ptr<NonNullBitmapBuilder> array_non_null_bitmap_builder_;
+ std::unique_ptr<ArrayElemNumBuilder> array_elem_num_builder_;
std::unique_ptr<CompressedBlockBuilder> block_compressor_;
enum State {
diff --git a/src/kudu/cfile/plain_bitmap_block.h
b/src/kudu/cfile/plain_bitmap_block.h
index 561c852d7..61fe64bf9 100644
--- a/src/kudu/cfile/plain_bitmap_block.h
+++ b/src/kudu/cfile/plain_bitmap_block.h
@@ -50,7 +50,7 @@ class PlainBitMapBlockBuilder final : public BlockBuilder {
Reset();
}
- bool IsBlockFull() const override {
+ bool IsBlockFullImpl() const override {
return writer_.bytes_written() >
options_->storage_attributes.cfile_block_size;
}
diff --git a/src/kudu/cfile/plain_block.h b/src/kudu/cfile/plain_block.h
index b8a5e0fff..84e2d1779 100644
--- a/src/kudu/cfile/plain_block.h
+++ b/src/kudu/cfile/plain_block.h
@@ -64,7 +64,7 @@ class PlainBlockBuilder final : public BlockBuilder {
return count;
}
- bool IsBlockFull() const override {
+ bool IsBlockFullImpl() const override {
return buffer_.size() > options_->storage_attributes.cfile_block_size;
}
diff --git a/src/kudu/cfile/rle_block.h b/src/kudu/cfile/rle_block.h
index 8930abcaa..6872e49ac 100644
--- a/src/kudu/cfile/rle_block.h
+++ b/src/kudu/cfile/rle_block.h
@@ -67,7 +67,7 @@ class RleBitMapBlockBuilder final : public BlockBuilder {
return count;
}
- bool IsBlockFull() const override {
+ bool IsBlockFullImpl() const override {
return encoder_.len() > options_->storage_attributes.cfile_block_size;
}
@@ -232,7 +232,7 @@ class RleIntBlockBuilder final : public BlockBuilder {
Reset();
}
- bool IsBlockFull() const override {
+ bool IsBlockFullImpl() const override {
return rle_encoder_.len() > options_->storage_attributes.cfile_block_size;
}
diff --git a/src/kudu/common/columnblock-test-util.h
b/src/kudu/common/columnblock-test-util.h
index d67a6bff4..7430ccbef 100644
--- a/src/kudu/common/columnblock-test-util.h
+++ b/src/kudu/common/columnblock-test-util.h
@@ -16,7 +16,9 @@
// under the License.
#pragma once
+#include <cstdint>
#include <memory>
+#include <type_traits>
#include "kudu/gutil/macros.h"
@@ -31,17 +33,18 @@ namespace kudu {
//
// This is more useful in test code than production code,
// since it doesn't allocate from an arena, etc.
-template<DataType type>
+template<DataType type, bool IS_ARRAY = false>
class ScopedColumnBlock : public ColumnBlock {
public:
- typedef typename TypeTraits<type>::cpp_type cpp_type;
+ typedef typename std::conditional<
+ IS_ARRAY, Slice, typename TypeTraits<type>::cpp_type>::type cpp_type;
explicit ScopedColumnBlock(size_t n_rows, bool allow_nulls = true)
- : ColumnBlock(GetTypeInfo(type),
+ : ColumnBlock(IS_ARRAY ? GetArrayTypeInfo(type) : GetTypeInfo(type),
allow_nulls ? new uint8_t[BitmapSize(n_rows)] : nullptr,
new cpp_type[n_rows],
n_rows,
- new RowBlockMemory()),
+ new RowBlockMemory),
non_null_bitmap_buf_(non_null_bitmap_),
data_buf_(reinterpret_cast<cpp_type*>(data_)),
memory_buf_(memory_) {
diff --git a/src/kudu/common/columnblock.h b/src/kudu/common/columnblock.h
index 7130e94df..75a8b6b5f 100644
--- a/src/kudu/common/columnblock.h
+++ b/src/kudu/common/columnblock.h
@@ -248,6 +248,7 @@ class ColumnDataView {
// Check <= here, not <, since you can skip to
// the very end of the data (leaving an empty block)
DCHECK_LE(skip, column_block_->nrows());
+ DCHECK_LE(row_offset_ + skip, column_block_->nrows());
row_offset_ += skip;
}
diff --git a/src/kudu/common/rowblock.h b/src/kudu/common/rowblock.h
index 42edfb877..9d741b08b 100644
--- a/src/kudu/common/rowblock.h
+++ b/src/kudu/common/rowblock.h
@@ -451,6 +451,8 @@ class RowBlock final {
// nrows_ <= row_capacity_
size_t nrows_;
+ // Memory to store column of variable length (e.g., BINARY/STRING type or
+ // array columns).
RowBlockMemory* memory_;
// The bitmap indicating which rows are valid in this block.
diff --git a/src/kudu/common/rowblock_memory.h
b/src/kudu/common/rowblock_memory.h
index 12fc5978c..3ad26eb37 100644
--- a/src/kudu/common/rowblock_memory.h
+++ b/src/kudu/common/rowblock_memory.h
@@ -31,7 +31,7 @@ class RowBlockRefCounted;
// cells.
//
// When scanning rows into a RowBlock, the rows may contain variable-length
-// data (eg BINARY columns). In this case, the data cannot be inlined directly
+// data (e.g. BINARY columns or array columns). In this case, the data cannot
be inlined directly
// into the columnar data arrays that are part of the RowBlock and instead need
// to be allocated out of a separate Arena. This class wraps that Arena.
//
diff --git a/src/kudu/tablet/multi_column_writer.cc
b/src/kudu/tablet/multi_column_writer.cc
index 1c12b6490..241f01cff 100644
--- a/src/kudu/tablet/multi_column_writer.cc
+++ b/src/kudu/tablet/multi_column_writer.cc
@@ -27,6 +27,7 @@
#include "kudu/common/columnblock.h"
#include "kudu/common/rowblock.h"
#include "kudu/common/schema.h"
+#include "kudu/common/types.h"
#include "kudu/fs/block_id.h"
#include "kudu/fs/block_manager.h"
#include "kudu/fs/fs_manager.h"
@@ -88,8 +89,10 @@ Status MultiColumnWriter::Open() {
BlockId block_id(block->id());
// Create the CFile writer itself.
- unique_ptr<CFileWriter> writer(new CFileWriter(
- std::move(opts), col.type_info(), col.is_nullable(),
std::move(block)));
+ unique_ptr<CFileWriter> writer(new CFileWriter(std::move(opts),
+ col.type_info(),
+ col.is_nullable(),
+ std::move(block)));
RETURN_NOT_OK_PREPEND(writer->Start(),
Substitute("tablet $0: unable to start writer for column $1",
tablet_id_, col.ToString()));
@@ -107,11 +110,16 @@ Status MultiColumnWriter::AppendBlock(const RowBlock&
block) {
DCHECK(open_);
for (auto i = 0; i < schema_->num_columns(); ++i) {
ColumnBlock column = block.column_block(i);
- if (column.is_nullable()) {
-
RETURN_NOT_OK(cfile_writers_[i]->AppendNullableEntries(column.non_null_bitmap(),
- column.data(), column.nrows()));
- } else {
+ if (!column.is_nullable()) {
RETURN_NOT_OK(cfile_writers_[i]->AppendEntries(column.data(),
column.nrows()));
+ } else {
+ if (column.type_info()->is_array()) {
+ RETURN_NOT_OK(cfile_writers_[i]->AppendNullableArrayEntries(
+ column.non_null_bitmap(), column.data(), column.nrows()));
+ } else {
+ RETURN_NOT_OK(cfile_writers_[i]->AppendNullableEntries(
+ column.non_null_bitmap(), column.data(), column.nrows()));
+ }
}
}
return Status::OK();