This is an automated email from the ASF dual-hosted git repository. adar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 1d28104cac0866c83b3022e3ddf3b7237962c1da Author: Todd Lipcon <[email protected]> AuthorDate: Wed Mar 25 11:38:56 2020 -0700 Add functionality to serialize a RowBlock into columnar format This adds the core functionality to take a RowBlock and convert it into a set of buffers for each column: the cell data, the null bitmap, and the indirect (string) data. This also updates wire_protocol-test to add coverage for nulls and unselected rows, and adds a comparison benchmark. The columnar code path is 4-5x faster in the best case, and in the worst case only 30% slower that the existing row-wise code path. Some follow-up commits will add further optimizations. Change-Id: I287a8aa6736f19816b0edbe16409c01f35c0319e Reviewed-on: http://gerrit.cloudera.org:8080/15560 Reviewed-by: Andrew Wong <[email protected]> Tested-by: Kudu Jenkins --- src/kudu/common/CMakeLists.txt | 2 +- src/kudu/common/columnar_serialization.cc | 134 ++++++++- src/kudu/common/columnar_serialization.h | 38 ++- src/kudu/common/wire_protocol-test.cc | 442 ++++++++++++++++++++++-------- src/kudu/util/faststring.h | 10 + 5 files changed, 510 insertions(+), 116 deletions(-) diff --git a/src/kudu/common/CMakeLists.txt b/src/kudu/common/CMakeLists.txt index 0a1814e..4ffe9c4 100644 --- a/src/kudu/common/CMakeLists.txt +++ b/src/kudu/common/CMakeLists.txt @@ -99,4 +99,4 @@ ADD_KUDU_TEST(scan_spec-test) ADD_KUDU_TEST(schema-test) ADD_KUDU_TEST(table_util-test) ADD_KUDU_TEST(types-test) -ADD_KUDU_TEST(wire_protocol-test NUM_SHARDS 4) +ADD_KUDU_TEST(wire_protocol-test NUM_SHARDS 10) diff --git a/src/kudu/common/columnar_serialization.cc b/src/kudu/common/columnar_serialization.cc index 69e8da9..5e647d6 100644 --- a/src/kudu/common/columnar_serialization.cc +++ b/src/kudu/common/columnar_serialization.cc @@ -19,7 +19,6 @@ #include <immintrin.h> -#include <cstdint> #include <cstring> #include <ostream> #include <string> @@ -27,11 +26,17 @@ #include <glog/logging.h> +#include "kudu/common/columnblock.h" +#include "kudu/common/common.pb.h" +#include "kudu/common/rowblock.h" +#include "kudu/common/schema.h" +#include "kudu/common/types.h" #include "kudu/common/zp7.h" #include "kudu/gutil/cpu.h" #include "kudu/gutil/port.h" #include "kudu/util/alignment.h" #include "kudu/util/bitmap.h" +#include "kudu/util/slice.h" using std::vector; @@ -360,6 +365,133 @@ void CopySelectedRows(const vector<uint16_t>& sel_rows, } } +namespace { +// For each of the Slices in 'cells_buf', copy the pointed-to data into 'indirect' and +// modify the Slice so that its 'pointer' field is not an actual memory pointer, but +// rather an offset within the indirect data buffer. +void RelocateSlicesToIndirect(uint8_t* __restrict__ cells_buf, int n_rows, + faststring* indirect) { + Slice* cell_slices = reinterpret_cast<Slice*>(cells_buf); + size_t total_size = 0; + for (int i = 0; i < n_rows; i++) { + total_size += cell_slices[i].size(); + } + + int old_size = indirect->size(); + indirect->resize_with_extra_capacity(old_size + total_size); + + uint8_t* dst_base = indirect->data(); + uint8_t* dst = dst_base + old_size; + + for (int i = 0; i < n_rows; i++) { + Slice* s = &cell_slices[i]; + if (!s->empty()) { + memcpy(dst, s->data(), s->size()); + } + *s = Slice(reinterpret_cast<const uint8_t*>(dst - dst_base), s->size()); + dst += s->size(); + } +} + +// Specialized division for the known type sizes. Despite having some branching here, +// this is faster than a 'div' instruction which has a 20+ cycle latency. +size_t div_type_size(size_t s, size_t divisor) { + switch (divisor) { + case 1: return s; + case 2: return s/2; + case 4: return s/4; + case 8: return s/8; + case 16: return s/16; + default: return s/divisor; + } +} + + +// Copy the selected cells (and non-null-bitmap bits) from 'cblock' into 'dst' according to +// the given 'sel_rows'. +void CopySelectedCellsFromColumn(const ColumnBlock& cblock, + const SelectedRows& sel_rows, + ColumnarSerializedBatch::Column* dst) { + size_t type_size = cblock.type_info()->size(); + int n_sel = sel_rows.num_selected(); + + // Number of initial rows in the dst values and null_bitmap. + DCHECK_EQ(dst->data.size() % type_size, 0); + size_t initial_rows = div_type_size(dst->data.size(), type_size); + size_t new_num_rows = initial_rows + n_sel; + + dst->data.resize_with_extra_capacity(type_size * new_num_rows); + uint8_t* dst_buf = dst->data.data() + type_size * initial_rows; + const uint8_t* src_buf = cblock.cell_ptr(0); + + if (sel_rows.all_selected()) { + memcpy(dst_buf, src_buf, type_size * n_sel); + } else { + CopySelectedRows(sel_rows.indexes(), type_size, src_buf, dst_buf); + } + + if (cblock.is_nullable()) { + DCHECK_EQ(dst->non_null_bitmap->size(), BitmapSize(initial_rows)); + dst->non_null_bitmap->resize_with_extra_capacity(BitmapSize(new_num_rows)); + CopyNonNullBitmap(cblock.non_null_bitmap(), + sel_rows.bitmap(), + initial_rows, cblock.nrows(), + dst->non_null_bitmap->data()); + ZeroNullValues(type_size, initial_rows, n_sel, dst->data.data(), dst->non_null_bitmap->data()); + } + + if (cblock.type_info()->physical_type() == BINARY) { + RelocateSlicesToIndirect(dst_buf, n_sel, boost::get_pointer(dst->indirect_data)); + } +} +} // anonymous namespace } // namespace internal +int SerializeRowBlockColumnar( + const RowBlock& block, + const Schema* projection_schema, + ColumnarSerializedBatch* out) { + DCHECK_GT(block.nrows(), 0); + const Schema* tablet_schema = block.schema(); + + if (projection_schema == nullptr) { + projection_schema = tablet_schema; + } + + // Initialize buffers for the columns. + // TODO(todd) don't pre-size these to 1MB per column -- quite + // expensive if there are a lot of columns! + if (out->columns.size() != projection_schema->num_columns()) { + CHECK_EQ(out->columns.size(), 0); + out->columns.reserve(projection_schema->num_columns()); + for (const auto& col : projection_schema->columns()) { + out->columns.emplace_back(); + out->columns.back().data.reserve(1024 * 1024); + if (col.type_info()->physical_type() == BINARY) { + out->columns.back().indirect_data.emplace(); + } + if (col.is_nullable()) { + out->columns.back().non_null_bitmap.emplace(); + } + } + } + + SelectedRows sel = block.selection_vector()->GetSelectedRows(); + int col_idx = 0; + for (const auto& col : projection_schema->columns()) { + int t_schema_idx = tablet_schema->find_column(col.name()); + CHECK_NE(t_schema_idx, -1); + const ColumnBlock& column_block = block.column_block(t_schema_idx); + + internal::CopySelectedCellsFromColumn( + column_block, + sel, + &out->columns[col_idx]); + col_idx++; + } + + return sel.num_selected(); +} + + } // namespace kudu diff --git a/src/kudu/common/columnar_serialization.h b/src/kudu/common/columnar_serialization.h index 04025b7..d7ad494 100644 --- a/src/kudu/common/columnar_serialization.h +++ b/src/kudu/common/columnar_serialization.h @@ -19,15 +19,48 @@ #include <cstdint> #include <vector> +#include <boost/optional/optional.hpp> + +#include "kudu/util/faststring.h" + namespace kudu { +class RowBlock; +class Schema; + +// A pending batch of serialized rows, suitable for easy conversion +// into the protobuf representation and a set of sidecars. +struct ColumnarSerializedBatch { + struct Column { + // Underlying column data. + faststring data; + + // Data for varlen columns (BINARY) + boost::optional<faststring> indirect_data; + + // Each bit is set when a value is non-null + boost::optional<faststring> non_null_bitmap; + }; + std::vector<Column> columns; +}; + +// Serialize the data in 'block' into the columnar batch 'out', appending to +// any data already serialized to the same batch. +// +// Returns the number of selected rows serialized. +int SerializeRowBlockColumnar( + const RowBlock& block, + const Schema* projection_schema, + ColumnarSerializedBatch* out); + + //////////////////////////////////////////////////////////// // Expose these internal functions for unit testing. // Do not call them outside of tests! // See .cc file for docs. //////////////////////////////////////////////////////////// namespace internal { -void ZeroNullValues(int type_size, +void ZeroNullValues(int sizeof_type, int dst_idx, int n_rows, uint8_t* dst_values_buf, @@ -40,7 +73,7 @@ void CopyNonNullBitmap(const uint8_t* non_null_bitmap, uint8_t* dst_non_null_bitmap); void CopySelectedRows(const std::vector<uint16_t>& sel_rows, - int type_size, + int sizeof_type, const uint8_t* __restrict__ src_buf, uint8_t* __restrict__ dst_buf); @@ -52,6 +85,7 @@ enum class PextMethod { #endif kSimple }; + extern PextMethod g_pext_method; std::vector<PextMethod> GetAvailablePextMethods(); diff --git a/src/kudu/common/wire_protocol-test.cc b/src/kudu/common/wire_protocol-test.cc index 4c9ac0d..ae18f1e 100644 --- a/src/kudu/common/wire_protocol-test.cc +++ b/src/kudu/common/wire_protocol-test.cc @@ -20,6 +20,7 @@ #include <algorithm> #include <cstdint> #include <cstring> +#include <list> #include <numeric> #include <ostream> #include <string> @@ -30,12 +31,14 @@ #include <gtest/gtest.h> #include "kudu/common/column_predicate.h" +#include "kudu/common/columnar_serialization.h" #include "kudu/common/common.pb.h" #include "kudu/common/row.h" #include "kudu/common/rowblock.h" #include "kudu/common/schema.h" #include "kudu/common/types.h" #include "kudu/common/wire_protocol.pb.h" +#include "kudu/gutil/port.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/gutil/walltime.h" #include "kudu/util/bitmap.h" @@ -43,7 +46,9 @@ #include "kudu/util/faststring.h" #include "kudu/util/hash.pb.h" #include "kudu/util/hexdump.h" +#include "kudu/util/int128.h" #include "kudu/util/memory/arena.h" +#include "kudu/util/random.h" #include "kudu/util/slice.h" #include "kudu/util/status.h" #include "kudu/util/stopwatch.h" // IWYU pragma: keep @@ -57,121 +62,58 @@ using strings::Substitute; namespace kudu { -class WireProtocolTest : public KuduTest, - // Used for benchmark, int corresponds to the number of columns, - // double corresponds to the selection rate. - public testing::WithParamInterface<tuple<int, double>> { +class WireProtocolTest : public KuduTest { public: WireProtocolTest() - : schema_({ ColumnSchema("col1", STRING), - ColumnSchema("col2", STRING), - ColumnSchema("col3", UINT32, true /* nullable */) }, + : schema_({ ColumnSchema("string", STRING), + ColumnSchema("nullable_string", STRING, /* is_nullable=*/true), + ColumnSchema("int", INT32), + ColumnSchema("nullable_int", INT32, /* is_nullable=*/true), + ColumnSchema("int64", INT64) }, 1), test_data_arena_(4096) { } - void FillRowBlockWithTestRows(RowBlock* block) { - test_data_arena_.Reset(); + static void FillRowBlockWithTestRows(RowBlock* block) { + Random rng(SeedRandom()); + block->selection_vector()->SetAllTrue(); for (int i = 0; i < block->nrows(); i++) { + if (rng.OneIn(10)) { + block->selection_vector()->SetRowUnselected(i); + continue; + } + RowBlockRow row = block->row(i); // We make new copies of these strings into the Arena for each row so that // the workload is more realistic. If we just re-use the same Slice object // for each row, the memory accesses fit entirely into a smaller number of // cache lines and we may micro-optimize for the wrong thing. - Slice col1, col2; - CHECK(test_data_arena_.RelocateSlice("hello world col1", &col1)); - CHECK(test_data_arena_.RelocateSlice("hello world col2", &col2)); - *reinterpret_cast<Slice*>(row.mutable_cell_ptr(0)) = col1; - *reinterpret_cast<Slice*>(row.mutable_cell_ptr(1)) = col2; - *reinterpret_cast<uint32_t*>(row.mutable_cell_ptr(2)) = i; - row.cell(2).set_null(false); - } - } - - void ResetBenchmarkSchema(int num_columns) { - vector<ColumnSchema> column_schemas; - column_schemas.reserve(num_columns); - for (int i = 0; i < num_columns; i++) { - column_schemas.emplace_back(Substitute("col$0", i), i % 2 ? STRING : INT32); - } - benchmark_schema_.Reset(column_schemas, 1); - } - - void FillRowBlockForBenchmark(RowBlock* block) { - test_data_arena_.Reset(); - for (int i = 0; i < block->nrows(); i++) { - RowBlockRow row = block->row(i); - for (int j = 0; j < benchmark_schema_.num_columns(); j++) { - const ColumnSchema& column_schema = benchmark_schema_.column(j); - DataType type = column_schema.type_info()->type(); - if (type == STRING) { - Slice col; - CHECK(test_data_arena_.RelocateSlice(Substitute("hello world $0", - column_schema.name()), &col)); - memcpy(row.mutable_cell_ptr(j), &col, sizeof(Slice)); - } else if (type == INT32) { - memcpy(row.mutable_cell_ptr(j), &i, sizeof(int32_t)); - } else { - LOG(FATAL) << "Unexpected type."; - } + CHECK(block->arena()->RelocateSlice( + "hello world col0", + reinterpret_cast<Slice*>(row.mutable_cell_ptr(0)))); + + if (rng.OneIn(3)) { + row.cell(1).set_null(true); + } else { + row.cell(1).set_null(false); + CHECK(block->arena()->RelocateSlice( + "hello world col1", + reinterpret_cast<Slice*>(row.mutable_cell_ptr(1)))); } - } - } - - void SelectRandomRowsWithRate(RowBlock* block, double rate) { - CHECK_LE(rate, 1.0); - CHECK_GE(rate, 0.0); - int select_count = block->nrows() * rate; - SelectionVector* select_vector = block->selection_vector(); - if (rate == 1.0) { - select_vector->SetAllTrue(); - } else if (rate == 0.0) { - select_vector->SetAllFalse(); - } else { - vector<int> indexes(block->nrows()); - std::iota(indexes.begin(), indexes.end(), 0); - std::random_shuffle(indexes.begin(), indexes.end()); - indexes.resize(select_count); - select_vector->SetAllFalse(); - for (auto index : indexes) { - select_vector->SetRowSelected(index); - } - } - CHECK_EQ(select_vector->CountSelected(), select_count); - } - // Use column_count to control the schema scale. - // Use select_rate to control the number of selected rows. - void RunBenchmark(int column_count, double select_rate) { - ResetBenchmarkSchema(column_count); - Arena arena(1024); - RowBlock block(&benchmark_schema_, 1000, &arena); - // Regardless of the config, use a constant number of cells for the test by - // looping the conversion an appropriate number of times. - const int64_t kNumCellsToConvert = AllowSlowTests() ? 100000000 : 1000000; - const int kNumTrials = kNumCellsToConvert / select_rate / column_count / block.nrows(); - FillRowBlockForBenchmark(&block); - SelectRandomRowsWithRate(&block, select_rate); + *reinterpret_cast<int32_t*>(row.mutable_cell_ptr(2)) = i; + *reinterpret_cast<int32_t*>(row.mutable_cell_ptr(3)) = i; + row.cell(3).set_null(rng.OneIn(7)); - faststring direct, indirect; - int64_t cycle_start = CycleClock::Now(); - for (int i = 0; i < kNumTrials; ++i) { - direct.clear(); - indirect.clear(); - SerializeRowBlock(block, nullptr, &direct, &indirect); + *reinterpret_cast<int64_t*>(row.mutable_cell_ptr(4)) = i; } - int64_t cycle_end = CycleClock::Now(); - LOG(INFO) << Substitute( - "Converting to PB with column count $0 and row select rate $1: $2 cycles/cell", - column_count, select_rate, - static_cast<double>(cycle_end - cycle_start) / kNumCellsToConvert); } + protected: Schema schema_; - Schema benchmark_schema_; Arena test_data_arena_; }; @@ -221,25 +163,38 @@ TEST_F(WireProtocolTest, TestSchemaRoundTrip) { google::protobuf::RepeatedPtrField<ColumnSchemaPB> pbs; ASSERT_OK(SchemaToColumnPBs(schema_, &pbs)); - ASSERT_EQ(3, pbs.size()); + ASSERT_EQ(5, pbs.size()); // Column 0. EXPECT_TRUE(pbs.Get(0).is_key()); - EXPECT_EQ("col1", pbs.Get(0).name()); + EXPECT_EQ("string", pbs.Get(0).name()); EXPECT_EQ(STRING, pbs.Get(0).type()); EXPECT_FALSE(pbs.Get(0).is_nullable()); // Column 1. EXPECT_FALSE(pbs.Get(1).is_key()); - EXPECT_EQ("col2", pbs.Get(1).name()); + EXPECT_EQ("nullable_string", pbs.Get(1).name()); EXPECT_EQ(STRING, pbs.Get(1).type()); - EXPECT_FALSE(pbs.Get(1).is_nullable()); + EXPECT_TRUE(pbs.Get(1).is_nullable()); // Column 2. EXPECT_FALSE(pbs.Get(2).is_key()); - EXPECT_EQ("col3", pbs.Get(2).name()); - EXPECT_EQ(UINT32, pbs.Get(2).type()); - EXPECT_TRUE(pbs.Get(2).is_nullable()); + EXPECT_EQ("int", pbs.Get(2).name()); + EXPECT_EQ(INT32, pbs.Get(2).type()); + EXPECT_FALSE(pbs.Get(2).is_nullable()); + + + // Column 3. + EXPECT_FALSE(pbs.Get(3).is_key()); + EXPECT_EQ("nullable_int", pbs.Get(3).name()); + EXPECT_EQ(INT32, pbs.Get(3).type()); + EXPECT_TRUE(pbs.Get(3).is_nullable()); + + // Column 4. + EXPECT_FALSE(pbs.Get(4).is_key()); + EXPECT_EQ("int64", pbs.Get(4).name()); + EXPECT_EQ(INT64, pbs.Get(4).type()); + EXPECT_FALSE(pbs.Get(4).is_nullable()); // Convert back to a Schema object and verify they're identical. Schema schema2; @@ -304,11 +259,10 @@ TEST_F(WireProtocolTest, TestBadSchema_DuplicateColumnName) { ASSERT_EQ("Invalid argument: Duplicate column name: c0", s.ToString()); } -// Create a block of rows in columnar layout and ensure that it can be -// converted to and from protobuf. -TEST_F(WireProtocolTest, TestColumnarRowBlockToPB) { +// Create a block of rows and ensure that it can be converted to and from protobuf. +TEST_F(WireProtocolTest, TestRowBlockToRowwisePB) { Arena arena(1024); - RowBlock block(&schema_, 10, &arena); + RowBlock block(&schema_, 30, &arena); FillRowBlockWithTestRows(&block); // Convert to PB. @@ -326,14 +280,82 @@ TEST_F(WireProtocolTest, TestColumnarRowBlockToPB) { Slice direct_sidecar = direct; ASSERT_OK(ExtractRowsFromRowBlockPB(schema_, pb, indirect, &direct_sidecar, &row_ptrs)); - ASSERT_EQ(block.nrows(), row_ptrs.size()); + ASSERT_EQ(block.selection_vector()->CountSelected(), row_ptrs.size()); + int dst_row_idx = 0; for (int i = 0; i < block.nrows(); ++i) { - ConstContiguousRow row_roundtripped(&schema_, row_ptrs[i]); + if (!block.selection_vector()->IsRowSelected(i)) { + continue; + } + ConstContiguousRow row_roundtripped(&schema_, row_ptrs[dst_row_idx]); EXPECT_EQ(schema_.DebugRow(block.row(i)), schema_.DebugRow(row_roundtripped)); + dst_row_idx++; + } +} + +// Create blocks of rows and ensure that they can be converted to the columnar serialized +// layout. +TEST_F(WireProtocolTest, TestRowBlockToColumnarPB) { + // Generate several blocks of random data. + static constexpr int kNumBlocks = 3; + Arena arena(1024); + std::list<RowBlock> blocks; + for (int i = 0; i < kNumBlocks; i++) { + blocks.emplace_back(&schema_, 30, &arena); + FillRowBlockWithTestRows(&blocks.back()); + } + + // Convert all of the RowBlocks to a single serialized (concatenated) columnar format. + ColumnarSerializedBatch batch; + for (const auto& block : blocks) { + SerializeRowBlockColumnar(block, nullptr, &batch); + } + + // Verify that the resulting serialized data matches the concatenated original data blocks. + ASSERT_EQ(5, batch.columns.size()); + int dst_row_idx = 0; + for (const auto& block : blocks) { + for (int src_row_idx = 0; src_row_idx < block.nrows(); src_row_idx++) { + if (!block.selection_vector()->IsRowSelected(src_row_idx)) { + continue; + } + SCOPED_TRACE(src_row_idx); + SCOPED_TRACE(dst_row_idx); + const auto& row = block.row(src_row_idx); + for (int c = 0; c < schema_.num_columns(); c++) { + SCOPED_TRACE(c); + const auto& col = schema_.column(c); + const auto& serialized_col = batch.columns[c]; + if (col.is_nullable()) { + bool expect_null = row.is_null(c);; + EXPECT_EQ(!BitmapTest(serialized_col.non_null_bitmap->data(), dst_row_idx), + expect_null); + if (expect_null) { + continue; + } + } + int type_size = col.type_info()->size(); + Slice serialized_val(serialized_col.data.data() + type_size * dst_row_idx, + type_size); + Slice orig_val(row.cell_ptr(c), type_size); + + if (col.type_info()->physical_type() == BINARY) { + orig_val = *reinterpret_cast<const Slice*>(orig_val.data()); + serialized_val = *reinterpret_cast<const Slice*>(serialized_val.data()); + + uintptr_t indirect_offset = reinterpret_cast<uintptr_t>(serialized_val.data()); + serialized_val = Slice(serialized_col.indirect_data->data() + indirect_offset, + serialized_val.size()); + } + + EXPECT_EQ(orig_val, serialized_val); + } + dst_row_idx++; + } } } + // Create a block of rows in columnar layout and ensure that it can be // converted to and from protobuf. TEST_F(WireProtocolTest, TestColumnarRowBlockToPBWithPadding) { @@ -427,15 +449,211 @@ TEST_F(WireProtocolTest, TestColumnarRowBlockToPBWithPadding) { } } -TEST_P(WireProtocolTest, TestColumnarRowBlockToPBBenchmark) { - int column_count = std::get<0>(GetParam()); +struct RowwiseConverter { + static void Run(const RowBlock& block) { + faststring direct; + faststring indirect; + SerializeRowBlock(block, nullptr, &direct, &indirect); + } + + static constexpr const char* kName = "row-wise"; +}; + + +struct ColumnarConverter { + static void Run(const RowBlock& block) { + ColumnarSerializedBatch batch; + SerializeRowBlockColumnar(block, nullptr, &batch); + } + + static constexpr const char* kName = "columnar"; +}; + +struct BenchmarkColumnsSpec { + struct Col { + DataType type; + double null_fraction; // negative for non-null + }; + vector<Col> columns; + string name; +}; + +class WireProtocolBenchmark : + public WireProtocolTest, + public testing::WithParamInterface<tuple<BenchmarkColumnsSpec, double>> { + public: + + void ResetBenchmarkSchema(const BenchmarkColumnsSpec& spec) { + vector<ColumnSchema> column_schemas; + int i = 0; + for (const auto& c : spec.columns) { + column_schemas.emplace_back(Substitute("col$0", i++), + c.type, + /*nullable=*/c.null_fraction >= 0); + } + CHECK_OK(benchmark_schema_.Reset(std::move(column_schemas), 0)); + } + + void FillRowBlockForBenchmark(const BenchmarkColumnsSpec& spec, + RowBlock* block) { + Random rng(SeedRandom()); + + test_data_arena_.Reset(); + for (int i = 0; i < block->nrows(); i++) { + RowBlockRow row = block->row(i); + for (int j = 0; j < benchmark_schema_.num_columns(); j++) { + const ColumnSchema& column_schema = benchmark_schema_.column(j); + DataType type = spec.columns[j].type; + bool is_null = rng.NextDoubleFraction() <= spec.columns[j].null_fraction; + if (column_schema.is_nullable()) { + row.cell(j).set_null(is_null); + } + if (!is_null) { + switch (type) { + case STRING: { + Slice col; + CHECK(test_data_arena_.RelocateSlice(Substitute("hello world $0", + column_schema.name()), &col)); + memcpy(row.mutable_cell_ptr(j), &col, sizeof(Slice)); + break; + } + case INT128: + UnalignedStore<int128_t>(row.mutable_cell_ptr(j), i); + break; + case INT64: + UnalignedStore<int64_t>(row.mutable_cell_ptr(j), i); + break; + case INT32: + UnalignedStore<int32_t>(row.mutable_cell_ptr(j), i); + break; + case INT16: + UnalignedStore<int16_t>(row.mutable_cell_ptr(j), i); + break; + case INT8: + UnalignedStore<int8_t>(row.mutable_cell_ptr(j), i); + break; + default: + LOG(FATAL) << "Unexpected type: " << type; + } + } + } + } + } + + static void SelectRandomRowsWithRate(RowBlock* block, double rate) { + CHECK_LE(rate, 1.0); + CHECK_GE(rate, 0.0); + auto select_count = block->nrows() * rate; + SelectionVector* select_vector = block->selection_vector(); + if (rate == 1.0) { + select_vector->SetAllTrue(); + } else if (rate == 0.0) { + select_vector->SetAllFalse(); + } else { + vector<int> indexes(block->nrows()); + std::iota(indexes.begin(), indexes.end(), 0); + std::random_shuffle(indexes.begin(), indexes.end()); + indexes.resize(select_count); + select_vector->SetAllFalse(); + for (auto index : indexes) { + select_vector->SetRowSelected(index); + } + } + CHECK_EQ(select_vector->CountSelected(), select_count); + } + + + // Use column_count to control the schema scale. + // Use select_rate to control the number of selected rows. + template<class Converter> + double RunBenchmark(const BenchmarkColumnsSpec& spec, + double select_rate) { + ResetBenchmarkSchema(spec); + Arena arena(1024); + RowBlock block(&benchmark_schema_, 1000, &arena); + // Regardless of the config, use a constant number of selected cells for the test by + // looping the conversion an appropriate number of times. + const int64_t kNumCellsToConvert = AllowSlowTests() ? 100000000 : 1000000; + const int64_t kCellsPerBlock = block.nrows() * spec.columns.size(); + const double kSelectedCellsPerBlock = kCellsPerBlock * select_rate; + const int kNumTrials = static_cast<int>(kNumCellsToConvert / kSelectedCellsPerBlock); + FillRowBlockForBenchmark(spec, &block); + SelectRandomRowsWithRate(&block, select_rate); + + int64_t cycle_start = CycleClock::Now(); + for (int i = 0; i < kNumTrials; ++i) { + Converter::Run(block); + } + int64_t cycle_end = CycleClock::Now(); + double cycles_per_cell = static_cast<double>(cycle_end - cycle_start) / kNumCellsToConvert; + LOG(INFO) << Substitute( + "Converting $0 to PB (method $3) row select rate $1: $2 cycles/cell", + spec.name, select_rate, cycles_per_cell, + Converter::kName); + return cycles_per_cell; + } + + protected: + Schema benchmark_schema_; +}; + +TEST_P(WireProtocolBenchmark, TestRowBlockToPBBenchmark) { + const auto& spec = std::get<0>(GetParam()); double select_rate = std::get<1>(GetParam()); - RunBenchmark(column_count, select_rate); + double cycles_per_cell_rowwise = RunBenchmark<RowwiseConverter>(spec, select_rate); + double cycles_per_cell_columnar = RunBenchmark<ColumnarConverter>(spec, select_rate); + double ratio = cycles_per_cell_rowwise / cycles_per_cell_columnar; + LOG(INFO) << Substitute( + "Converting $0 to PB row select rate $1: columnar/rowwise throughput ratio: $2x", + spec.name, select_rate, ratio); } -INSTANTIATE_TEST_CASE_P(ColumnarRowBlockToPBBenchmarkParams, WireProtocolTest, - testing::Combine(testing::Values(3, 30, 300), - testing::Values(1.0, 0.8, 0.5, 0.2))); +BenchmarkColumnsSpec UniformColumns(int n_cols, DataType type, double null_fraction) { + vector<BenchmarkColumnsSpec::Col> cols(n_cols); + for (int i = 0; i < n_cols; i++) { + cols[i] = {type, null_fraction}; + } + string null_str; + if (null_fraction >= 0) { + null_str = Substitute("$0pct_null", static_cast<int>(null_fraction * 100)); + } else { + null_str = "non_null"; + } + return {cols, Substitute("$0_$1_$2", + n_cols, + GetTypeInfo(type)->name(), + null_str) }; +} + +INSTANTIATE_TEST_CASE_P( + ColumnarRowBlockToPBBenchmarkParams, WireProtocolBenchmark, + testing::Combine( + testing::Values( + UniformColumns(10, INT64, -1), + UniformColumns(10, INT32, -1), + UniformColumns(10, STRING, -1), + + UniformColumns(10, INT128, 0), + UniformColumns(10, INT64, 0), + UniformColumns(10, INT32, 0), + UniformColumns(10, INT16, 0), + UniformColumns(10, INT8, 0), + UniformColumns(10, STRING, 0), + + UniformColumns(10, INT128, 0.1), + UniformColumns(10, INT64, 0.1), + UniformColumns(10, INT32, 0.1), + UniformColumns(10, INT16, 0.1), + UniformColumns(10, INT8, 0.1), + UniformColumns(10, STRING, 0.1)), + // Selection rates. + testing::Values(1.0, 0.8, 0.5, 0.2)), + [](const testing::TestParamInfo<WireProtocolBenchmark::ParamType>& info) { + return Substitute("$0_sel_$1pct", + std::get<0>(info.param).name, + static_cast<int>(std::get<1>(info.param)*100)); + }); + // Test that trying to extract rows from an invalid block correctly returns // Corruption statuses. diff --git a/src/kudu/util/faststring.h b/src/kudu/util/faststring.h index 1357eab..c7215f7 100644 --- a/src/kudu/util/faststring.h +++ b/src/kudu/util/faststring.h @@ -106,6 +106,16 @@ class faststring { ASAN_UNPOISON_MEMORY_REGION(data_, len_); } + // Resize to 'newsize'. In contrast to 'resize()', if this requires allocating a new + // backing array, the new capacity is rounded up in the same manner as if data had been + // appended to the buffer. + void resize_with_extra_capacity(size_t newsize) { + if (newsize > capacity_) { + GrowToAtLeast(newsize); + } + resize(newsize); + } + // Releases the underlying array; after this, the buffer is left empty. // // NOTE: the data pointer returned by release() always points to dynamically
