This is an automated email from the ASF dual-hosted git repository. granthenke pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 4b3e884c09f0b4e8871273a29590e8c56f3c5df7 Author: Todd Lipcon <[email protected]> AuthorDate: Thu Apr 23 23:52:57 2020 -0700 KUDU-2844 (3/3): avoid copying plain/dict strings to RowBlock Arena This changes Dictionary and Plain binary blocks to no longer copy string values into the destination RowBlock's Arena. Instead, the dictionary block (or plain block) is attached to the RowBlockMemory's list of reference-counted block handles, and the cell directly refers to the underlying block handle. I modified full_stack-insert-scan-test to have it do some scans with fault tolerance and no caching. If I comment out the code path that reference-counts the blocks during a scan, the newly added test fails. I performance-tested this by loading a lineitem table using tpch_real_world: $ tpch_real_world -tpch-path-to-dbgen-dir /data/2/todd/dbgen/ \ -tpch_num_inserters=8 -tpch_scaling_factor=100 \ --tpch_mini_cluster_base_dir /data/1/todd/tpch-kudu-data \ For the numbers reported below, I accidentally cancelled the load after 222M rows were present, but I used the same dataset for both "before" and "after" so the relative comparison is still valid. I started a tserver and master with the same data directories, with the tserver running inside perf stat (or perf record to look at profile): $ kudu-master \ -fs-wal-dir /data/1/todd/tpch-kudu-data/master-0/wal/ \ -fs-data-dirs /data/1/todd/tpch-kudu-data/master-0/data/ $ perf stat kudu-tserver \ -fs-wal-dir /data/1/todd/tpch-kudu-data/ts-0/wal/ \ -fs-data-dirs /data/1/todd/tpch-kudu-data/ts-0/data/ I waited until the data had been fully flushed from MRS and compacted before running the read workloads. To test the reads I ran the following 10 times: $ kudu perf table_scan localhost tpch_real_world \ --columns l_shipdate,l_shipmode,l_comment --num_threads=16 The results of the first test were a bit noisy due to NUMA placement issues -- some runs were 30-40% faster than other runs, even on the same build, which made it hard to compare results, even though it was clear that the optimized version used fewer cycles on average. So, I ran both the tserver and the client using 'numactl -m 0 -N 0' to force everything to a single NUMA node. This made results much more consistent. Before: 255870.36 msec task-clock # 3.058 CPUs utilized 244847 context-switches # 0.957 K/sec 3322 cpu-migrations # 0.013 K/sec 245814 page-faults # 0.961 K/sec 1066864136000 cycles # 4.170 GHz (83.46%) 84410991344 stalled-cycles-frontend # 7.91% frontend cycles idle (83.37%) 340913242391 stalled-cycles-backend # 31.95% backend cycles idle (83.25%) 1131564485394 instructions # 1.06 insn per cycle # 0.30 stalled cycles per insn (83.34%) 187879069908 branches # 734.274 M/sec (83.32%) 8550168935 branch-misses # 4.55% of all branches (83.26%) 191.262870000 seconds user 64.765755000 seconds sys After: 214131.49 msec task-clock # 2.750 CPUs utilized 245357 context-switches # 0.001 M/sec 2734 cpu-migrations # 0.013 K/sec 248108 page-faults # 0.001 M/sec 893270854012 cycles # 4.172 GHz (83.45%) 83805641687 stalled-cycles-frontend # 9.38% frontend cycles idle (83.25%) 345166097238 stalled-cycles-backend # 38.64% backend cycles idle (83.29%) 913435059189 instructions # 1.02 insn per cycle # 0.38 stalled cycles per insn (83.36%) 142198832288 branches # 664.072 M/sec (83.36%) 4819907752 branch-misses # 3.39% of all branches (83.29%) 77.876854360 seconds time elapsed 146.195821000 seconds user 68.113598000 seconds sys To summarize, the change gives about 1.30x reduction in CPU cycles on the tserver. The wall clock reported by the perf scan tool showed a 15-20% reduction in wall-clock. Testing just scanning a dictionary-encoded column shows even better results. Looking at 'perf diff -c ratio -o 0 --sort=sym' between two profiles collected by perf-record, we can see that the BinaryDictBlockDecoder code path is much cheaper, most of the memcpy calls are removed, and slightly more CPU is spent serializing the result (probably due to reduced locality of reference copying the string data to the wire). Orig % CPU Ratio Symbol ------------------------------------------------------------------------------------- 26.16% 0.437272 [.] kudu::cfile::BinaryDictBlockDecoder::CopyNextDecodeStrings(unsigned long*, kudu::ColumnDataView*) 19.63% 1.135304 [.] kudu::SerializeRowBlock(kudu::RowBlock const&, kudu::Schema const*, kudu::faststring*, kudu::faststring*, bool) 11.49% 1.002845 [k] copy_user_enhanced_fast_string 10.29% 0.068955 [.] __memcpy_ssse3_back Change-Id: I93fa1f9fd401814a42dc5a1f3fd2ffb1286ac441 Reviewed-on: http://gerrit.cloudera.org:8080/15802 Tested-by: Kudu Jenkins Reviewed-by: Andrew Wong <[email protected]> --- src/kudu/cfile/binary_dict_block.cc | 23 +++++++++----- src/kudu/cfile/binary_plain_block.cc | 28 +++++++++++------ src/kudu/cfile/binary_plain_block.h | 6 +++- src/kudu/cfile/block_handle.h | 4 +-- src/kudu/common/rowblock_memory.h | 36 +++++++++++++++++++++- .../full_stack-insert-scan-test.cc | 28 +++++++++++++++-- 6 files changed, 101 insertions(+), 24 deletions(-) diff --git a/src/kudu/cfile/binary_dict_block.cc b/src/kudu/cfile/binary_dict_block.cc index 4000e61..194915a 100644 --- a/src/kudu/cfile/binary_dict_block.cc +++ b/src/kudu/cfile/binary_dict_block.cc @@ -17,6 +17,7 @@ #include "kudu/cfile/binary_dict_block.h" +#include <functional> #include <limits> #include <ostream> #include <utility> @@ -35,6 +36,7 @@ #include "kudu/common/columnblock.h" #include "kudu/common/common.pb.h" #include "kudu/common/rowblock.h" +#include "kudu/common/rowblock_memory.h" #include "kudu/common/types.h" #include "kudu/gutil/casts.h" #include "kudu/gutil/map-util.h" @@ -293,12 +295,13 @@ Status BinaryDictBlockDecoder::CopyNextAndEval(size_t* n, return CopyNextDecodeStrings(n, dst); } + bool retain_dict = false; + // Load the rows' codeword values into a buffer for scanning. BShufBlockDecoder<UINT32>* d_bptr = down_cast<BShufBlockDecoder<UINT32>*>(data_decoder_.get()); codeword_buf_.resize(*n * sizeof(uint32_t)); d_bptr->CopyNextValuesToArray(n, codeword_buf_.data()); Slice* out = reinterpret_cast<Slice*>(dst->data()); - Arena* out_arena = dst->arena(); for (size_t i = 0; i < *n; i++, out++) { // Check with the SelectionVectorView to see whether the data has already // been cleared, in which case we can skip evaluation. @@ -307,13 +310,18 @@ Status BinaryDictBlockDecoder::CopyNextAndEval(size_t* n, } uint32_t codeword = *reinterpret_cast<uint32_t*>(&codeword_buf_[i*sizeof(uint32_t)]); if (BitmapTest(codewords_matching_pred->bitmap(), codeword)) { - // Row is included in predicate, copy data to block. - CHECK(out_arena->RelocateSlice(dict_decoder_->string_at_index(codeword), out)); + // Row is included in predicate: point the cell in the block + // to the entry in the dictionary. + *out = dict_decoder_->string_at_index(codeword); + retain_dict = true; } else { // Mark that the row will not be returned. sel->ClearBit(i); } } + if (retain_dict) { + dst->memory()->RetainReference(dict_decoder_->block_handle()); + } return Status::OK(); } @@ -323,22 +331,21 @@ Status BinaryDictBlockDecoder::CopyNextDecodeStrings(size_t* n, ColumnDataView* DCHECK_LE(*n, dst->nrows()); DCHECK_EQ(dst->stride(), sizeof(Slice)); - Arena* out_arena = dst->arena(); Slice* out = reinterpret_cast<Slice*>(dst->data()); codeword_buf_.resize((*n)*sizeof(uint32_t)); // Copy the codewords into a temporary buffer first. - // And then Copy the strings corresponding to the codewords to the destination buffer. BShufBlockDecoder<UINT32>* d_bptr = down_cast<BShufBlockDecoder<UINT32>*>(data_decoder_.get()); RETURN_NOT_OK(d_bptr->CopyNextValuesToArray(n, codeword_buf_.data())); + // Now point the cells in the destination block to the string data in the dictionary + // block. for (int i = 0; i < *n; i++) { uint32_t codeword = *reinterpret_cast<uint32_t*>(&codeword_buf_[i*sizeof(uint32_t)]); - Slice elem = dict_decoder_->string_at_index(codeword); - CHECK(out_arena->RelocateSlice(elem, out)); - out++; + *out++ = dict_decoder_->string_at_index(codeword); } + dst->memory()->RetainReference(dict_decoder_->block_handle()); return Status::OK(); } diff --git a/src/kudu/cfile/binary_plain_block.cc b/src/kudu/cfile/binary_plain_block.cc index 9ab00ed..9782d24 100644 --- a/src/kudu/cfile/binary_plain_block.cc +++ b/src/kudu/cfile/binary_plain_block.cc @@ -19,6 +19,7 @@ #include <algorithm> #include <cstdint> +#include <functional> #include <ostream> #include <vector> @@ -31,6 +32,7 @@ #include "kudu/common/columnblock.h" #include "kudu/common/common.pb.h" #include "kudu/common/rowblock.h" +#include "kudu/common/rowblock_memory.h" #include "kudu/common/schema.h" #include "kudu/common/types.h" #include "kudu/gutil/stringprintf.h" @@ -39,7 +41,6 @@ #include "kudu/util/coding-inl.h" #include "kudu/util/group_varint-inl.h" #include "kudu/util/hexdump.h" -#include "kudu/util/memory/arena.h" using std::vector; @@ -163,8 +164,8 @@ Status BinaryPlainBlockBuilder::GetLastKey(void *key_void) const { //////////////////////////////////////////////////////////// BinaryPlainBlockDecoder::BinaryPlainBlockDecoder(scoped_refptr<BlockHandle> block) - : block_handle_(std::move(block)), - data_(block_handle_->data()), + : block_(std::move(block)), + data_(block_->data()), parsed_(false), num_elems_(0), ordinal_pos_base_(0), @@ -301,7 +302,6 @@ Status BinaryPlainBlockDecoder::HandleBatch(size_t* n, ColumnDataView* dst, Cell CHECK_EQ(dst->type_info()->physical_type(), BINARY); DCHECK_LE(*n, dst->nrows()); DCHECK_EQ(dst->stride(), sizeof(Slice)); - Arena *out_arena = dst->arena(); if (PREDICT_FALSE(*n == 0 || cur_idx_ >= num_elems_)) { *n = 0; return Status::OK(); @@ -311,15 +311,16 @@ Status BinaryPlainBlockDecoder::HandleBatch(size_t* n, ColumnDataView* dst, Cell Slice *out = reinterpret_cast<Slice*>(dst->data()); for (size_t i = 0; i < max_fetch; i++, out++, cur_idx_++) { Slice elem(string_at_index(cur_idx_)); - c(i, elem, out, out_arena); + c(i, elem, out); } *n = max_fetch; return Status::OK(); } Status BinaryPlainBlockDecoder::CopyNextValues(size_t* n, ColumnDataView* dst) { - return HandleBatch(n, dst, [&](size_t i, Slice elem, Slice* out, Arena* out_arena) { - CHECK(out_arena->RelocateSlice(elem, out)); + dst->memory()->RetainReference(block_); + return HandleBatch(n, dst, [&](size_t /*i*/, Slice elem, Slice* out) { + *out = elem; }); } @@ -327,16 +328,23 @@ Status BinaryPlainBlockDecoder::CopyNextAndEval(size_t* n, ColumnMaterializationContext* ctx, SelectionVectorView* sel, ColumnDataView* dst) { + bool retain_block = false; ctx->SetDecoderEvalSupported(); - return HandleBatch(n, dst, [&](size_t i, Slice elem, Slice* out, Arena* out_arena) { + Status s = HandleBatch(n, dst, [&](size_t i, Slice elem, Slice* out) { if (!sel->TestBit(i)) { return; - } else if (ctx->pred()->EvaluateCell<BINARY>(static_cast<const void*>(&elem))) { - CHECK(out_arena->RelocateSlice(elem, out)); + } + if (ctx->pred()->EvaluateCell<BINARY>(static_cast<const void*>(&elem))) { + *out = elem; + retain_block = true; } else { sel->ClearBit(i); } }); + if (PREDICT_TRUE(s.ok() && retain_block)) { + dst->memory()->RetainReference(block_); + } + return s; } diff --git a/src/kudu/cfile/binary_plain_block.h b/src/kudu/cfile/binary_plain_block.h index 953c4ae..edd4784 100644 --- a/src/kudu/cfile/binary_plain_block.h +++ b/src/kudu/cfile/binary_plain_block.h @@ -142,6 +142,10 @@ class BinaryPlainBlockDecoder final : public BlockDecoder { return Slice(&data_[str_offset], len); } + const scoped_refptr<BlockHandle>& block_handle() { + return block_; + } + // Minimum length of a header. static const size_t kMinHeaderSize = sizeof(uint32_t) * 3; @@ -163,7 +167,7 @@ class BinaryPlainBlockDecoder final : public BlockDecoder { return ret; } - scoped_refptr<BlockHandle> block_handle_; + scoped_refptr<BlockHandle> block_; Slice data_; bool parsed_; diff --git a/src/kudu/cfile/block_handle.h b/src/kudu/cfile/block_handle.h index d1bb62e..b88d844 100644 --- a/src/kudu/cfile/block_handle.h +++ b/src/kudu/cfile/block_handle.h @@ -24,8 +24,8 @@ #include <boost/variant/variant.hpp> #include "kudu/cfile/block_cache.h" -#include "kudu/gutil/ref_counted.h" #include "kudu/common/rowblock_memory.h" +#include "kudu/gutil/ref_counted.h" namespace kudu { namespace cfile { @@ -40,7 +40,7 @@ namespace cfile { // Note that the BlockHandle itself may refer to a BlockCacheHandle, which itself is // reference-counted. When all of the references to a BlockHandle go out of scope, it // results in decrementing the BlockCacheHandle's reference count. -class BlockHandle : public RefCountedThreadSafe<BlockHandle> { +class BlockHandle final : public RefCountedThreadSafe<BlockHandle> { public: static scoped_refptr<BlockHandle> WithOwnedData(const Slice& data) { return { new BlockHandle(data) }; diff --git a/src/kudu/common/rowblock_memory.h b/src/kudu/common/rowblock_memory.h index 9117ebb..5a11b4b 100644 --- a/src/kudu/common/rowblock_memory.h +++ b/src/kudu/common/rowblock_memory.h @@ -16,10 +16,16 @@ // under the License. #pragma once +#include <functional> +#include <vector> + +#include "kudu/gutil/ref_counted.h" #include "kudu/util/memory/arena.h" namespace kudu { +class RowBlockRefCounted; + // Handles the memory allocated alongside a RowBlock for variable-length // cells. // @@ -27,11 +33,39 @@ namespace kudu { // data (eg BINARY columns). In this case, the data cannot be inlined directly // into the columnar data arrays that are part of the RowBlock and instead need // to be allocated out of a separate Arena. This class wraps that Arena. +// +// In some cases (eg "plain" or "dictionary" encodings), the underlying blocks may contain +// string data in a non-encoded form. In that case, instead of copying strings, we can +// refer to the strings within those data blocks themselves, and hold a reference to +// the underlying block. This class holds those reference counts as well. struct RowBlockMemory { Arena arena; explicit RowBlockMemory(int arena_size = 32 * 1024) : arena(arena_size) {} - void Reset() { arena.Reset(); } + ~RowBlockMemory() { Reset(); } + + void Reset() { + arena.Reset(); + for (auto& f : to_release_) { + f(); + } + to_release_.clear(); + } + + // Retain a reference, typically to a BlockHandle. This is templatized to avoid + // a circular dependency between kudu/common/ and kudu/cfile/ + template<class T> + void RetainReference(const scoped_refptr<T>& item) { + // TODO(todd) if this ever ends up being a hot code path, we could + // probably optimize by having a small hashset of pointers. If an + // element is already in the set, we don't need to add a second copy. + T* raw = item.get(); + raw->AddRef(); + to_release_.emplace_back([=]() { raw->Release(); }); + } + + private: + std::vector<std::function<void()>> to_release_; }; } // namespace kudu diff --git a/src/kudu/integration-tests/full_stack-insert-scan-test.cc b/src/kudu/integration-tests/full_stack-insert-scan-test.cc index b91949e..7d3134e 100644 --- a/src/kudu/integration-tests/full_stack-insert-scan-test.cc +++ b/src/kudu/integration-tests/full_stack-insert-scan-test.cc @@ -21,6 +21,7 @@ #include <cstdint> #include <memory> #include <ostream> +#include <set> #include <string> #include <thread> #include <utility> @@ -40,6 +41,7 @@ #include "kudu/client/write_op.h" #include "kudu/codegen/compilation_manager.h" #include "kudu/common/partial_row.h" +#include "kudu/gutil/map-util.h" #include "kudu/gutil/port.h" #include "kudu/gutil/ref_counted.h" #include "kudu/gutil/strings/split.h" @@ -98,6 +100,7 @@ using kudu::client::KuduTable; using kudu::client::KuduTableCreator; using kudu::cluster::InternalMiniCluster; using kudu::cluster::InternalMiniClusterOptions; +using std::set; using std::string; using std::thread; using std::unique_ptr; @@ -196,9 +199,17 @@ class FullStackInsertScanTest : public KuduTest { // Insert the rows that are associated with that ID. void InsertRows(CountDownLatch* start_latch, int id, uint32_t seed); + enum class ScanFlag { + // Disable the block cache for the scan. + kDontCacheBlocks, + // Enable fault tolerance. This triggers different iterator code paths. + kFaultTolerant, + }; + // Run a scan from the reader_client_ with the projection schema schema // and LOG_TIMING message msg. - void ScanProjection(const vector<string>& cols, const string& msg); + void ScanProjection(const vector<string>& cols, const string& msg, + const set<ScanFlag>& flags = {}); vector<string> AllColumnNames() const; vector<string> StringColumnNames() const; @@ -325,6 +336,11 @@ void FullStackInsertScanTest::DoTestScans() { NO_FATALS(ScanProjection({}, "empty projection, 0 col")); NO_FATALS(ScanProjection({ "key" }, "key scan, 1 col")); + NO_FATALS(ScanProjection(AllColumnNames(), "full schema scan, no cache, 10 col", + { ScanFlag::kDontCacheBlocks })); + NO_FATALS(ScanProjection(AllColumnNames(), + "fault-tolerant full schema scan, no cache, 10 col", + { ScanFlag::kDontCacheBlocks, ScanFlag::kFaultTolerant })); NO_FATALS(ScanProjection(AllColumnNames(), "full schema scan, 10 col")); NO_FATALS(ScanProjection(StringColumnNames(), "String projection, 1 col")); NO_FATALS(ScanProjection(Int32ColumnNames(), "Int32 projection, 4 col")); @@ -394,8 +410,10 @@ void FullStackInsertScanTest::InsertRows(CountDownLatch* start_latch, int id, FlushSessionOrDie(session); } + void FullStackInsertScanTest::ScanProjection(const vector<string>& cols, - const string& msg) { + const string& msg, + const set<ScanFlag>& flags) { { // Warmup codegen cache KuduScanner scanner(reader_table_.get()); @@ -404,6 +422,12 @@ void FullStackInsertScanTest::ScanProjection(const vector<string>& cols, codegen::CompilationManager::GetSingleton()->Wait(); } KuduScanner scanner(reader_table_.get()); + if (ContainsKey(flags, ScanFlag::kDontCacheBlocks)) { + CHECK_OK(scanner.SetCacheBlocks(false)); + } + if (ContainsKey(flags, ScanFlag::kFaultTolerant)) { + CHECK_OK(scanner.SetFaultTolerant()); + } ASSERT_OK(scanner.SetProjectedColumnNames(cols)); uint64_t nrows = 0; LOG_TIMING(INFO, msg) {
