This is an automated email from the ASF dual-hosted git repository. awong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit fb0f4bc3bf63614a831fedc6cc29cf860dddaf49 Author: Todd Lipcon <[email protected]> AuthorDate: Thu Apr 23 14:30:49 2020 -0700 KUDU-2844 (2/3): move RowBlock memory into a new RowBlockMemory struct This takes the Arena* member of RowBlock and moves it into a new RowBlockMemory structure. The RowBlockMemory structure will later be extended to include a list of reference-counted block handles. Change-Id: I17a21f33f44988795ffe064b3ba41055e1a19e90 Reviewed-on: http://gerrit.cloudera.org:8080/15801 Reviewed-by: Andrew Wong <[email protected]> Tested-by: Kudu Jenkins --- src/kudu/cfile/block_handle.h | 1 + src/kudu/cfile/cfile-test-base.h | 5 +- src/kudu/cfile/cfile-test.cc | 57 +++++++++------- src/kudu/cfile/cfile_util.cc | 9 ++- src/kudu/cfile/encoding-test.cc | 17 ++--- src/kudu/codegen/codegen-test.cc | 17 ++--- src/kudu/common/column_predicate-test.cc | 2 +- src/kudu/common/columnblock-test-util.h | 67 ++++++++++++++++++ src/kudu/common/columnblock-test.cc | 3 +- src/kudu/common/columnblock.cc | 4 +- src/kudu/common/columnblock.h | 79 ++++++---------------- src/kudu/common/generic_iterators-test.cc | 22 +++--- src/kudu/common/generic_iterators.cc | 29 ++++---- src/kudu/common/rowblock.cc | 4 +- src/kudu/common/rowblock.h | 12 ++-- src/kudu/common/rowblock_memory.h | 37 ++++++++++ src/kudu/common/wire_protocol-test.cc | 35 +++++----- src/kudu/integration-tests/linked_list-test-util.h | 5 +- src/kudu/master/sys_catalog.cc | 6 +- src/kudu/tablet/cfile_set-test.cc | 16 +++-- src/kudu/tablet/compaction-test.cc | 8 ++- src/kudu/tablet/compaction.cc | 11 +-- src/kudu/tablet/delta_compaction.cc | 19 +++--- src/kudu/tablet/deltafile-test.cc | 17 +++-- src/kudu/tablet/diskrowset-test-base.h | 12 ++-- src/kudu/tablet/memrowset-test.cc | 6 +- src/kudu/tablet/mt-rowset_delta_compaction-test.cc | 7 +- src/kudu/tablet/mt-tablet-test.cc | 19 +++--- src/kudu/tablet/tablet-decoder-eval-test.cc | 8 ++- src/kudu/tablet/tablet-test-base.h | 10 ++- src/kudu/tablet/tablet-test-util.h | 9 +-- src/kudu/tablet/tablet-test.cc | 13 ++-- src/kudu/tablet/tablet_random_access-test.cc | 8 +-- src/kudu/tools/tool_action_local_replica.cc | 7 +- src/kudu/tools/tool_action_perf.cc | 8 +-- src/kudu/transactions/txn_status_tablet.cc | 6 +- src/kudu/tserver/tablet_server-test-base.cc | 7 +- src/kudu/tserver/tablet_service.cc | 6 +- 38 files changed, 351 insertions(+), 257 deletions(-) diff --git a/src/kudu/cfile/block_handle.h b/src/kudu/cfile/block_handle.h index 2bd3544..d1bb62e 100644 --- a/src/kudu/cfile/block_handle.h +++ b/src/kudu/cfile/block_handle.h @@ -25,6 +25,7 @@ #include "kudu/cfile/block_cache.h" #include "kudu/gutil/ref_counted.h" +#include "kudu/common/rowblock_memory.h" namespace kudu { namespace cfile { diff --git a/src/kudu/cfile/cfile-test-base.h b/src/kudu/cfile/cfile-test-base.h index 64552b3..d79e7b5 100644 --- a/src/kudu/cfile/cfile-test-base.h +++ b/src/kudu/cfile/cfile-test-base.h @@ -29,6 +29,7 @@ #include "kudu/cfile/cfile_reader.h" #include "kudu/cfile/cfile_writer.h" #include "kudu/common/columnblock.h" +#include "kudu/common/columnblock-test-util.h" #include "kudu/fs/fs_manager.h" #include "kudu/gutil/port.h" #include "kudu/gutil/stringprintf.h" @@ -447,7 +448,7 @@ void TimeReadFileForDataType(CFileIterator* iter, int* count) { ASSERT_OK_FAST(iter->CopyNextValues(&n, &ctx)); sum += FastSum<ScopedColumnBlock<Type>, SumType>(cb, n); *count += n; - cb.arena()->Reset(); + cb.memory()->Reset(); } LOG(INFO)<< "Sum: " << sum; LOG(INFO)<< "Count: " << *count; @@ -469,7 +470,7 @@ void ReadBinaryFile(CFileIterator* iter, int* count) { } } *count += n; - cb.arena()->Reset(); + cb.memory()->Reset(); } LOG(INFO) << "Sum of value lengths: " << sum_lens; LOG(INFO) << "Count: " << *count; diff --git a/src/kudu/cfile/cfile-test.cc b/src/kudu/cfile/cfile-test.cc index 39a89c3..6e64f1f 100644 --- a/src/kudu/cfile/cfile-test.cc +++ b/src/kudu/cfile/cfile-test.cc @@ -42,10 +42,12 @@ #include "kudu/cfile/index_btree.h" #include "kudu/cfile/type_encodings.h" #include "kudu/common/column_materialization_context.h" +#include "kudu/common/columnblock-test-util.h" #include "kudu/common/columnblock.h" #include "kudu/common/common.pb.h" #include "kudu/common/encoded_key.h" #include "kudu/common/rowblock.h" +#include "kudu/common/rowblock_memory.h" #include "kudu/common/rowid.h" #include "kudu/common/schema.h" #include "kudu/common/types.h" @@ -67,7 +69,6 @@ #include "kudu/util/int128.h" #include "kudu/util/int128_util.h" #include "kudu/util/mem_tracker.h" -#include "kudu/util/memory/arena.h" #include "kudu/util/metrics.h" #include "kudu/util/nvm_cache.h" #include "kudu/util/slice.h" @@ -76,6 +77,10 @@ #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" +namespace kudu { +class Arena; +} // namespace kudu + DECLARE_bool(cfile_write_checksums); DECLARE_bool(cfile_verify_checksums); DECLARE_string(block_cache_type); @@ -163,9 +168,11 @@ class TestCFile : public CFileTestBase { ASSERT_OK(iter->SeekToOrdinal(0)); size_t fetched = 0; while (fetched < 10000) { - ColumnBlock advancing_block(out.type_info(), nullptr, + ColumnBlock advancing_block(out.type_info(), + nullptr, out.data() + (fetched * out.stride()), - out.nrows() - fetched, out.arena()); + out.nrows() - fetched, + out.memory()); ColumnMaterializationContext adv_ctx = CreateNonDecoderEvalContext(&advancing_block, &sel); ASSERT_TRUE(iter->HasNext()); size_t batch_size = random() % 5 + 1; @@ -204,7 +211,7 @@ class TestCFile : public CFileTestBase { unique_ptr<CFileIterator> iter; ASSERT_OK(reader->NewIterator(&iter, CFileReader::CACHE_BLOCK, nullptr)); - Arena arena(8192); + RowBlockMemory mem; ScopedColumnBlock<DataGeneratorType::kDataType> cb(10); SelectionVector sel(10); @@ -234,7 +241,7 @@ class TestCFile : public CFileTestBase { ASSERT_EQ((*generator)[j], cb[j]); } } - cb.arena()->Reset(); + cb.memory()->Reset(); read_offset += n; } } @@ -431,11 +438,9 @@ INSTANTIATE_TEST_CASE_P(CacheMemoryTypes, TestCFileBothCacheMemoryTypes, ::testing::Values(Cache::MemoryType::DRAM, Cache::MemoryType::NVM)); -template<DataType type> -void CopyOne(CFileIterator *it, - typename TypeTraits<type>::cpp_type *ret, - Arena *arena) { - ColumnBlock cb(GetTypeInfo(type), nullptr, ret, 1, arena); +template <DataType type> +void CopyOne(CFileIterator* it, typename TypeTraits<type>::cpp_type* ret, RowBlockMemory* mem) { + ColumnBlock cb(GetTypeInfo(type), nullptr, ret, 1, mem); SelectionVector sel(1); ColumnMaterializationContext ctx(0, nullptr, &cb, &sel); ctx.SetDecoderEvalNotSupported(); @@ -634,18 +639,18 @@ void TestCFile::TestReadWriteStrings(EncodingType encoding, unique_ptr<CFileIterator> iter; ASSERT_OK(reader->NewIterator(&iter, CFileReader::CACHE_BLOCK, nullptr)); - Arena arena(1024); + RowBlockMemory mem; ASSERT_OK(iter->SeekToOrdinal(5000)); - ASSERT_EQ(5000u, iter->GetCurrentOrdinal()); + ASSERT_EQ(5000, iter->GetCurrentOrdinal()); Slice s; - CopyOne<STRING>(iter.get(), &s, &arena); + CopyOne<STRING>(iter.get(), &s, &mem); ASSERT_EQ(formatter(5000), s.ToString()); // Seek to last key exactly, should succeed ASSERT_OK(iter->SeekToOrdinal(9999)); - ASSERT_EQ(9999u, iter->GetCurrentOrdinal()); + ASSERT_EQ(9999, iter->GetCurrentOrdinal()); // Seek to after last key. Should result in not found. ASSERT_TRUE(iter->SeekToOrdinal(10000).IsNotFound()); @@ -662,30 +667,30 @@ void TestCFile::TestReadWriteStrings(EncodingType encoding, // (seek to "hello 0000.5" through "hello 9999.5") string buf; for (int i = 1; i < 10000; i++) { - arena.Reset(); + mem.Reset(); buf = formatter(i - 1); buf.append(".5"); s = Slice(buf); - EncodeStringKey(schema, s, &arena, &encoded_key); + EncodeStringKey(schema, s, &mem.arena, &encoded_key); ASSERT_OK(iter->SeekAtOrAfter(*encoded_key, &exact)); ASSERT_FALSE(exact); ASSERT_EQ(i, iter->GetCurrentOrdinal()); - CopyOne<STRING>(iter.get(), &s, &arena); + CopyOne<STRING>(iter.get(), &s, &mem); ASSERT_EQ(formatter(i), s.ToString()); } // Seek exactly to each key // (seek to "hello 0000" through "hello 9999") for (int i = 0; i < 9999; i++) { - arena.Reset(); + mem.Reset(); buf = formatter(i); s = Slice(buf); - EncodeStringKey(schema, s, &arena, &encoded_key); + EncodeStringKey(schema, s, &mem.arena, &encoded_key); ASSERT_OK(iter->SeekAtOrAfter(*encoded_key, &exact)); ASSERT_TRUE(exact); ASSERT_EQ(i, iter->GetCurrentOrdinal()); Slice read_back; - CopyOne<STRING>(iter.get(), &read_back, &arena); + CopyOne<STRING>(iter.get(), &read_back, &mem); ASSERT_EQ(read_back.ToString(), s.ToString()); } @@ -693,7 +698,7 @@ void TestCFile::TestReadWriteStrings(EncodingType encoding, // (seek to "hello 9999.x") buf = formatter(9999) + ".x"; s = Slice(buf); - EncodeStringKey(schema, s, &arena, &encoded_key); + EncodeStringKey(schema, s, &mem.arena, &encoded_key); EXPECT_TRUE(iter->SeekAtOrAfter(*encoded_key, &exact).IsNotFound()); // before first entry @@ -701,17 +706,17 @@ void TestCFile::TestReadWriteStrings(EncodingType encoding, buf = formatter(0); buf.resize(buf.size() - 1); s = Slice(buf); - EncodeStringKey(schema, s, &arena, &encoded_key); + EncodeStringKey(schema, s, &mem.arena, &encoded_key); ASSERT_OK(iter->SeekAtOrAfter(*encoded_key, &exact)); EXPECT_FALSE(exact); EXPECT_EQ(0, iter->GetCurrentOrdinal()); - CopyOne<STRING>(iter.get(), &s, &arena); + CopyOne<STRING>(iter.get(), &s, &mem); EXPECT_EQ(formatter(0), s.ToString()); // Seek to start of file by ordinal ASSERT_OK(iter->SeekToFirst()); ASSERT_EQ(0, iter->GetCurrentOrdinal()); - CopyOne<STRING>(iter.get(), &s, &arena); + CopyOne<STRING>(iter.get(), &s, &mem); ASSERT_EQ(formatter(0), s.ToString()); // Reseek to start and fetch all data. @@ -850,9 +855,9 @@ TEST_P(TestCFileBothCacheMemoryTypes, TestDefaultColumnIter) { // Test String Default Value Slice str_data[kNumItems]; Slice str_value("Hello"); - Arena arena(32*1024); + RowBlockMemory mem; DefaultColumnValueIterator str_iter(GetTypeInfo(STRING), &str_value); - ColumnBlock str_col(GetTypeInfo(STRING), nullptr, str_data, kNumItems, &arena); + ColumnBlock str_col(GetTypeInfo(STRING), nullptr, str_data, kNumItems, &mem); ColumnMaterializationContext str_ctx = CreateNonDecoderEvalContext(&str_col, &sel); ASSERT_OK(str_iter.Scan(&str_ctx)); for (size_t i = 0; i < str_col.nrows(); ++i) { diff --git a/src/kudu/cfile/cfile_util.cc b/src/kudu/cfile/cfile_util.cc index 3bdb9a7..c700e4e 100644 --- a/src/kudu/cfile/cfile_util.cc +++ b/src/kudu/cfile/cfile_util.cc @@ -28,11 +28,11 @@ #include "kudu/common/column_materialization_context.h" #include "kudu/common/columnblock.h" #include "kudu/common/rowblock.h" +#include "kudu/common/rowblock_memory.h" #include "kudu/common/types.h" #include "kudu/gutil/port.h" #include "kudu/util/bitmap.h" #include "kudu/util/mem_tracker.h" -#include "kudu/util/memory/arena.h" namespace kudu { namespace cfile { @@ -55,13 +55,12 @@ Status DumpIterator(const CFileReader& reader, std::ostream* out, int num_rows, int indent) { - - Arena arena(8192); + RowBlockMemory mem(8192); uint8_t buf[kBufSize]; const TypeInfo *type = reader.type_info(); size_t max_rows = kBufSize/type->size(); uint8_t nulls[BitmapSize(max_rows)]; - ColumnBlock cb(type, reader.is_nullable() ? nulls : nullptr, buf, max_rows, &arena); + ColumnBlock cb(type, reader.is_nullable() ? nulls : nullptr, buf, max_rows, &mem); SelectionVector sel(max_rows); ColumnMaterializationContext ctx(0, nullptr, &cb, &sel); string strbuf; @@ -93,7 +92,7 @@ Status DumpIterator(const CFileReader& reader, *out << strbuf; strbuf.clear(); - arena.Reset(); + mem.Reset(); count += n; } diff --git a/src/kudu/cfile/encoding-test.cc b/src/kudu/cfile/encoding-test.cc index cf5147c..c889175 100644 --- a/src/kudu/cfile/encoding-test.cc +++ b/src/kudu/cfile/encoding-test.cc @@ -40,8 +40,10 @@ #include "kudu/cfile/block_handle.h" #include "kudu/cfile/cfile_util.h" #include "kudu/cfile/type_encodings.h" +#include "kudu/common/columnblock-test-util.h" #include "kudu/common/columnblock.h" #include "kudu/common/common.pb.h" +#include "kudu/common/rowblock_memory.h" #include "kudu/common/schema.h" #include "kudu/common/types.h" #include "kudu/gutil/port.h" @@ -73,20 +75,20 @@ namespace cfile { class TestEncoding : public KuduTest { public: TestEncoding() - : arena_(1024) { + : memory_(1024) { } protected: virtual void SetUp() OVERRIDE { KuduTest::SetUp(); - arena_.Reset(); + memory_.Reset(); default_write_options_.storage_attributes.cfile_block_size = 256 * 1024; } template<DataType type> void CopyOne(BlockDecoder *decoder, typename TypeTraits<type>::cpp_type *ret) { - ColumnBlock cb(GetTypeInfo(type), nullptr, ret, 1, &arena_); + ColumnBlock cb(GetTypeInfo(type), nullptr, ret, 1, &memory_); ColumnDataView cdv(&cb); size_t n = 1; ASSERT_OK(decoder->CopyNextValues(&n, &cdv)); @@ -461,7 +463,7 @@ class TestEncoding : public KuduTest { vector<CppType> decoded; decoded.resize(size); - ColumnBlock dst_block(GetTypeInfo(Type), nullptr, &decoded[0], size, &arena_); + ColumnBlock dst_block(GetTypeInfo(Type), nullptr, &decoded[0], size, &memory_); ColumnDataView view(&dst_block); int dec_count = 0; while (bd->HasNext()) { @@ -582,7 +584,7 @@ class TestEncoding : public KuduTest { ColumnBlock dst_block(GetTypeInfo(IntType), nullptr, &decoded[0], to_insert.size(), - &arena_); + &memory_); int dec_count = 0; while (ibd->HasNext()) { ASSERT_EQ((uint32_t)(dec_count), ibd->GetCurrentIndex()); @@ -666,7 +668,7 @@ class TestEncoding : public KuduTest { ColumnBlock dst_block(GetTypeInfo(BOOL), nullptr, &decoded[0], to_insert.size(), - &arena_); + &memory_); int dec_count = 0; while (bd->HasNext()) { @@ -704,8 +706,7 @@ class TestEncoding : public KuduTest { } } - Arena arena_; - faststring contiguous_buf_; + RowBlockMemory memory_; WriterOptions default_write_options_; }; diff --git a/src/kudu/codegen/codegen-test.cc b/src/kudu/codegen/codegen-test.cc index 2a91e25..b8ccead 100644 --- a/src/kudu/codegen/codegen-test.cc +++ b/src/kudu/codegen/codegen-test.cc @@ -35,6 +35,7 @@ #include "kudu/common/common.pb.h" #include "kudu/common/row.h" #include "kudu/common/rowblock.h" +#include "kudu/common/rowblock_memory.h" #include "kudu/common/schema.h" #include "kudu/gutil/ref_counted.h" #include "kudu/gutil/singleton.h" @@ -66,7 +67,7 @@ class CodegenTest : public KuduTest { CodegenTest() : random_(SeedRandom()), // Set the initial Arena size as small as possible to catch errors during relocation. - projections_arena_(16) { + projections_mem_(16) { // Create the base schema. vector<ColumnSchema> cols = { ColumnSchema("key ", UINT64, false), ColumnSchema("int32 ", INT32, false), @@ -138,7 +139,7 @@ class CodegenTest : public KuduTest { private: // Projects the test rows into parameter rowblock using projector and - // member projections_arena_ (should be Reset() manually). + // member projections_mem_ (should be Reset() manually). template<bool READ, class RowProjectorType> void ProjectTestRows(RowProjectorType* rp, RowBlock* rb); void AddRandomString(RowBuilder* rb); @@ -153,7 +154,7 @@ class CodegenTest : public KuduTest { codegen::CodeGenerator generator_; Random random_; unique_ptr<ConstContiguousRow> test_rows_[kNumTestRows]; - Arena projections_arena_; + RowBlockMemory projections_mem_; unique_ptr<Arena> test_rows_arena_; }; @@ -203,9 +204,9 @@ void CodegenTest::ProjectTestRows(RowProjectorType* rp, RowBlock* rb) { ConstContiguousRow src = *test_rows_[i]; RowBlockRow dst = rb->row(i); if (READ) { - CHECK_OK(rp->ProjectRowForRead(src, &dst, &projections_arena_)); + CHECK_OK(rp->ProjectRowForRead(src, &dst, rb->arena())); } else { - CHECK_OK(rp->ProjectRowForWrite(src, &dst, &projections_arena_)); + CHECK_OK(rp->ProjectRowForWrite(src, &dst, rb->arena())); } } } @@ -220,10 +221,10 @@ void CodegenTest::TestProjection(const Schema* proj) { CHECK_EQ(with->base_schema(), &base_); CHECK_EQ(with->projection(), proj); - RowBlock rb_with(proj, kNumTestRows, &projections_arena_); - RowBlock rb_without(proj, kNumTestRows, &projections_arena_); + RowBlock rb_with(proj, kNumTestRows, &projections_mem_); + RowBlock rb_without(proj, kNumTestRows, &projections_mem_); - projections_arena_.Reset(); + projections_mem_.Reset(); ProjectTestRows<READ>(with.get(), &rb_with); ProjectTestRows<READ>(&without, &rb_without); CheckRowBlocksEqual(&rb_with, &rb_without, "Codegen", "Expected"); diff --git a/src/kudu/common/column_predicate-test.cc b/src/kudu/common/column_predicate-test.cc index 8b1a9b1..151ce8a 100644 --- a/src/kudu/common/column_predicate-test.cc +++ b/src/kudu/common/column_predicate-test.cc @@ -31,7 +31,7 @@ #include <glog/logging.h> #include <gtest/gtest.h> -#include "kudu/common/columnblock.h" +#include "kudu/common/columnblock-test-util.h" #include "kudu/common/common.pb.h" #include "kudu/common/rowblock.h" #include "kudu/common/schema.h" diff --git a/src/kudu/common/columnblock-test-util.h b/src/kudu/common/columnblock-test-util.h new file mode 100644 index 0000000..db3764b --- /dev/null +++ b/src/kudu/common/columnblock-test-util.h @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include "kudu/gutil/macros.h" + +#include "kudu/common/columnblock.h" +#include "kudu/common/rowblock.h" + +namespace kudu { + +// Utility class which allocates temporary storage for a +// dense block of column data, freeing it when it goes +// out of scope. +// +// This is more useful in test code than production code, +// since it doesn't allocate from an arena, etc. +template<DataType type> +class ScopedColumnBlock : public ColumnBlock { + public: + typedef typename TypeTraits<type>::cpp_type cpp_type; + + explicit ScopedColumnBlock(size_t n_rows, bool allow_nulls = true) + : ColumnBlock(GetTypeInfo(type), + allow_nulls ? new uint8_t[BitmapSize(n_rows)] : nullptr, + new cpp_type[n_rows], + n_rows, + new RowBlockMemory()), + non_null_bitmap_(non_null_bitmap()), + data_(reinterpret_cast<cpp_type *>(data())), + memory_(memory()) { + if (allow_nulls) { + // All rows begin null. + BitmapChangeBits(non_null_bitmap(), /*offset=*/ 0, n_rows, /*value=*/ false); + } + } + + const cpp_type &operator[](size_t idx) const { + return data_[idx]; + } + + cpp_type &operator[](size_t idx) { + return data_[idx]; + } + + private: + std::unique_ptr<uint8_t[]> non_null_bitmap_; + std::unique_ptr<cpp_type[]> data_; + std::unique_ptr<RowBlockMemory> memory_; + +}; + +} // namespace kudu diff --git a/src/kudu/common/columnblock-test.cc b/src/kudu/common/columnblock-test.cc index 5bed5d4..37a49f5 100644 --- a/src/kudu/common/columnblock-test.cc +++ b/src/kudu/common/columnblock-test.cc @@ -15,12 +15,11 @@ // specific language governing permissions and limitations // under the License. -#include "kudu/common/columnblock.h" - #include <string> #include <gtest/gtest.h> +#include "kudu/common/columnblock-test-util.h" #include "kudu/common/common.pb.h" #include "kudu/common/rowblock.h" #include "kudu/common/types.h" diff --git a/src/kudu/common/columnblock.cc b/src/kudu/common/columnblock.cc index 63eda18..2f5bf8d 100644 --- a/src/kudu/common/columnblock.cc +++ b/src/kudu/common/columnblock.cc @@ -19,8 +19,10 @@ #include <cstring> +#include "kudu/common/common.pb.h" #include "kudu/common/row.h" #include "kudu/common/rowblock.h" +#include "kudu/util/memory/arena.h" namespace kudu { @@ -51,8 +53,8 @@ Status ColumnBlock::CopyTo(const SelectionVector& sel_vec, BitmapCopy(dst->non_null_bitmap_, dst_cell_off, non_null_bitmap_, src_cell_off, num_cells); + } } -} return Status::OK(); } diff --git a/src/kudu/common/columnblock.h b/src/kudu/common/columnblock.h index a91074c..a5c0461 100644 --- a/src/kudu/common/columnblock.h +++ b/src/kudu/common/columnblock.h @@ -18,23 +18,22 @@ #include <cstddef> #include <cstdint> -#include <memory> #include <ostream> #include <string> #include <glog/logging.h> -#include "kudu/common/common.pb.h" +#include "kudu/common/rowblock_memory.h" #include "kudu/common/types.h" #include "kudu/gutil/strings/fastmem.h" #include "kudu/gutil/strings/stringpiece.h" #include "kudu/util/bitmap.h" -#include "kudu/util/memory/arena.h" #include "kudu/util/memory/overwrite.h" #include "kudu/util/status.h" namespace kudu { +class Arena; class ColumnBlockCell; class SelectionVector; @@ -47,15 +46,15 @@ class ColumnBlock { typedef ColumnBlockCell Cell; ColumnBlock(const TypeInfo* type, - uint8_t *non_null_bitmap, - void *data, + uint8_t* non_null_bitmap, + void* data, size_t nrows, - Arena *arena) - : type_(type), - non_null_bitmap_(non_null_bitmap), - data_(reinterpret_cast<uint8_t *>(data)), - nrows_(nrows), - arena_(arena) { + RowBlockMemory* memory) + : type_(type), + non_null_bitmap_(non_null_bitmap), + data_(reinterpret_cast<uint8_t*>(data)), + nrows_(nrows), + memory_(memory) { DCHECK(data_) << "null data"; } @@ -106,12 +105,13 @@ class ColumnBlock { return !BitmapTest(non_null_bitmap_, idx); } - const size_t stride() const { return type_->size(); } - const uint8_t * data() const { return data_; } - uint8_t *data() { return data_; } - const size_t nrows() const { return nrows_; } + size_t stride() const { return type_->size(); } + const uint8_t* data() const { return data_; } + uint8_t* data() { return data_; } + size_t nrows() const { return nrows_; } - Arena *arena() { return arena_; } + RowBlockMemory* memory() { return memory_; } + Arena* arena() { return &memory_->arena; } const TypeInfo* type_info() const { return type_; @@ -164,7 +164,7 @@ class ColumnBlock { uint8_t *data_; size_t nrows_; - Arena *arena_; + RowBlockMemory* memory_; }; inline bool operator==(const ColumnBlock& a, const ColumnBlock& b) { @@ -261,7 +261,9 @@ class ColumnDataView { return column_block_->cell_ptr(row_offset_); } - Arena *arena() { return column_block_->arena(); } + RowBlockMemory* memory() { return column_block_->memory(); } + + Arena* arena() { return &memory()->arena; } size_t nrows() const { return column_block_->nrows() - row_offset_; @@ -280,45 +282,4 @@ class ColumnDataView { size_t row_offset_; }; -// Utility class which allocates temporary storage for a -// dense block of column data, freeing it when it goes -// out of scope. -// -// This is more useful in test code than production code, -// since it doesn't allocate from an arena, etc. -template<DataType type> -class ScopedColumnBlock : public ColumnBlock { - public: - typedef typename TypeTraits<type>::cpp_type cpp_type; - - explicit ScopedColumnBlock(size_t n_rows, bool allow_nulls = true) - : ColumnBlock(GetTypeInfo(type), - allow_nulls ? new uint8_t[BitmapSize(n_rows)] : nullptr, - new cpp_type[n_rows], - n_rows, - new Arena(1024)), - non_null_bitmap_(non_null_bitmap()), - data_(reinterpret_cast<cpp_type *>(data())), - arena_(arena()) { - if (allow_nulls) { - // All rows begin null. - BitmapChangeBits(non_null_bitmap(), /*offset=*/ 0, n_rows, /*value=*/ false); - } - } - - const cpp_type &operator[](size_t idx) const { - return data_[idx]; - } - - cpp_type &operator[](size_t idx) { - return data_[idx]; - } - - private: - std::unique_ptr<uint8_t[]> non_null_bitmap_; - std::unique_ptr<cpp_type[]> data_; - std::unique_ptr<Arena> arena_; - -}; - } // namespace kudu diff --git a/src/kudu/common/generic_iterators-test.cc b/src/kudu/common/generic_iterators-test.cc index 6713a09..08b39ea 100644 --- a/src/kudu/common/generic_iterators-test.cc +++ b/src/kudu/common/generic_iterators-test.cc @@ -45,6 +45,7 @@ #include "kudu/common/key_encoder.h" #include "kudu/common/predicate_effectiveness.h" #include "kudu/common/rowblock.h" +#include "kudu/common/rowblock_memory.h" #include "kudu/common/scan_spec.h" #include "kudu/common/schema.h" #include "kudu/common/types.h" @@ -54,7 +55,6 @@ #include "kudu/gutil/strings/substitute.h" #include "kudu/util/block_bloom_filter.h" #include "kudu/util/hash.pb.h" -#include "kudu/util/memory/arena.h" #include "kudu/util/random.h" #include "kudu/util/slice.h" #include "kudu/util/status.h" @@ -258,7 +258,8 @@ TEST(TestMergeIterator, TestNotConsumedCleanup) { ASSERT_OK(merger->Init(nullptr)); ASSERT_TRUE(merger->HasNext()); - RowBlock dst(&kIntSchema, 1, nullptr); + RowBlockMemory mem; + RowBlock dst(&kIntSchema, 1, &mem); ASSERT_OK(merger->NextBlock(&dst)); ASSERT_EQ(1, dst.nrows()); ASSERT_TRUE(merger->HasNext()); @@ -435,7 +436,8 @@ void TestMerge(const Schema& schema, // The RowBlock is sized to a power of 2 to improve BitmapCopy performance // when copying another RowBlock into it. - RowBlock dst(&schema, 128, nullptr); + RowBlockMemory mem; + RowBlock dst(&schema, 128, &mem); size_t total_idx = 0; auto expected_iter = expected.cbegin(); while (merger->HasNext()) { @@ -524,8 +526,8 @@ TEST(TestMaterializingIterator, TestMaterializingPredicatePushdown) { ASSERT_OK(materializing->Init(&spec)); ASSERT_EQ(0, spec.predicates().size()) << "Iterator should have pushed down predicate"; - Arena arena(1024); - RowBlock dst(&kIntSchema, 100, &arena); + RowBlockMemory mem(1024); + RowBlock dst(&kIntSchema, 100, &mem); ASSERT_OK(materializing->NextBlock(&dst)); ASSERT_EQ(dst.nrows(), 100); @@ -572,8 +574,8 @@ TEST(TestPredicateEvaluatingIterator, TestPredicateEvaluation) { ASSERT_EQ(1, GetIteratorPredicatesForTests(outer_iter).size()) << "Predicate should be evaluated by the outer iterator"; - Arena arena(1024); - RowBlock dst(&kIntSchema, 100, &arena); + RowBlockMemory mem(1024); + RowBlock dst(&kIntSchema, 100, &mem); ASSERT_OK(outer_iter->NextBlock(&dst)); ASSERT_EQ(dst.nrows(), 100); @@ -726,11 +728,11 @@ class PredicateEffectivenessTest : ASSERT_TRUE(GetIteratorPredicateEffectivenessCtxForTests(iter)[0].enabled) << "Predicate must be enabled to begin with"; - Arena arena(1024); + RowBlockMemory mem; FLAGS_predicate_effectivess_num_skip_blocks = 4; if (all_values) { for (int i = 0; i < kNumRows / kBatchSize; i++) { - RowBlock dst(&kIntSchema, kBatchSize, &arena); + RowBlock dst(&kIntSchema, kBatchSize, &mem); ASSERT_OK(iter->NextBlock(&dst)); ASSERT_EQ(kBatchSize, dst.nrows()); ASSERT_EQ(kBatchSize, dst.selection_vector()->CountSelected()); @@ -741,7 +743,7 @@ class PredicateEffectivenessTest : } } else { for (int i = 0; i < kNumRows / kBatchSize; i++) { - RowBlock dst(&kIntSchema, kBatchSize, &arena); + RowBlock dst(&kIntSchema, kBatchSize, &mem); ASSERT_OK(iter->NextBlock(&dst)); ASSERT_EQ(kBatchSize, dst.nrows()); // For subset case, the predicate should never be disabled. diff --git a/src/kudu/common/generic_iterators.cc b/src/kudu/common/generic_iterators.cc index d1301aa..390740a 100644 --- a/src/kudu/common/generic_iterators.cc +++ b/src/kudu/common/generic_iterators.cc @@ -49,6 +49,7 @@ #include "kudu/common/predicate_effectiveness.h" #include "kudu/common/row.h" #include "kudu/common/rowblock.h" +#include "kudu/common/rowblock_memory.h" #include "kudu/common/scan_spec.h" #include "kudu/common/schema.h" #include "kudu/gutil/casts.h" @@ -115,7 +116,7 @@ class MergeIterState : public boost::intrusive::list_base_hook<> { public: explicit MergeIterState(IterWithBounds iwb) : iwb_(std::move(iwb)), - arena_(1024), + memory_(1024), next_row_idx_(0) {} @@ -148,18 +149,18 @@ class MergeIterState : public boost::intrusive::list_base_hook<> { // exist. If not, we have to pull a block immediately: after Init() is // finished it must be safe to call next_row() and last_row(). // - // Decoded bound allocations are done against 'decoded_bounds_arena'. - Status Init(Arena* decoded_bounds_arena) { + // Decoded bound allocations are done against the arena in 'decoded_bounds_memory'. + Status Init(RowBlockMemory* decoded_bounds_memory) { DCHECK(!read_block_); if (iwb_.encoded_bounds) { - decoded_bounds_.emplace(&schema(), decoded_bounds_arena); + decoded_bounds_.emplace(&schema(), decoded_bounds_memory); decoded_bounds_->lower = decoded_bounds_->block.row(0); decoded_bounds_->upper = decoded_bounds_->block.row(1); RETURN_NOT_OK(schema().DecodeRowKey( - iwb_.encoded_bounds->first, &decoded_bounds_->lower, decoded_bounds_arena)); + iwb_.encoded_bounds->first, &decoded_bounds_->lower, &decoded_bounds_memory->arena)); RETURN_NOT_OK(schema().DecodeRowKey( - iwb_.encoded_bounds->second, &decoded_bounds_->upper, decoded_bounds_arena)); + iwb_.encoded_bounds->second, &decoded_bounds_->upper, &decoded_bounds_memory->arena)); } else { RETURN_NOT_OK(PullNextBlock()); } @@ -223,14 +224,14 @@ class MergeIterState : public boost::intrusive::list_base_hook<> { IterWithBounds iwb_; // Allocates memory for read_block_. - Arena arena_; + RowBlockMemory memory_; // Optional rowset bounds, decoded during Init(). struct DecodedBounds { // 'block' must be constructed immediately; the bounds themselves can be // initialized later. - DecodedBounds(const Schema* schema, Arena* arena) - : block(schema, /*nrows=*/2, arena) {} + DecodedBounds(const Schema* schema, RowBlockMemory* mem) + : block(schema, /*nrows=*/2, mem) {} RowBlock block; RowBlockRow lower; @@ -274,7 +275,7 @@ Status MergeIterState::Advance(size_t num_rows, bool* pulled_new_block) { // We either exhausted the block outright, or all subsequent rows were // deselected. Either way, we need to pull the next block. next_row_idx_ = read_block_->nrows(); - arena_.Reset(); + memory_.Reset(); RETURN_NOT_OK(PullNextBlock()); *pulled_new_block = true; return Status::OK(); @@ -285,7 +286,7 @@ Status MergeIterState::PullNextBlock() { << "should not pull next block until current block is exhausted"; if (!read_block_) { - read_block_.reset(new RowBlock(&schema(), kMergeRowBuffer, &arena_)); + read_block_.reset(new RowBlock(&schema(), kMergeRowBuffer, &memory_)); } while (iwb_.iter->HasNext()) { RETURN_NOT_OK(iwb_.iter->NextBlock(read_block_.get())); @@ -540,7 +541,7 @@ class MergeIterator : public RowwiseIterator { // Each MergeIterState has an arena for buffered row data, but it is reset // every time a new block is pulled. This single arena ensures that a // MergeIterState's decoded bounds remain allocated for its lifetime. - Arena decoded_bounds_arena_; + RowBlockMemory decoded_bounds_memory_; // Min-heap that orders rows by their keys. A call to top() will yield the row // with the smallest key. @@ -595,7 +596,7 @@ MergeIterator::MergeIterator(MergeIteratorOptions opts, initted_(false), orig_iters_(std::move(iters)), num_orig_iters_(orig_iters_.size()), - decoded_bounds_arena_(1024) { + decoded_bounds_memory_(1024) { CHECK_GT(orig_iters_.size(), 0); } @@ -664,7 +665,7 @@ Status MergeIterator::InitSubIterators(ScanSpec *spec) { ScanSpec *spec_copy = spec != nullptr ? scan_spec_copies_.Construct(*spec) : nullptr; RETURN_NOT_OK(InitAndMaybeWrap(&i.iter, spec_copy)); unique_ptr<MergeIterState> state(new MergeIterState(std::move(i))); - RETURN_NOT_OK(state->Init(&decoded_bounds_arena_)); + RETURN_NOT_OK(state->Init(&decoded_bounds_memory_)); states_.push_back(*state.release()); } orig_iters_.clear(); diff --git a/src/kudu/common/rowblock.cc b/src/kudu/common/rowblock.cc index 85bd4ca..059fbf1 100644 --- a/src/kudu/common/rowblock.cc +++ b/src/kudu/common/rowblock.cc @@ -168,13 +168,13 @@ std::vector<uint16_t> SelectedRows::CreateRowIndexes() { ////////////////////////////// RowBlock::RowBlock(const Schema* schema, size_t nrows, - Arena *arena) + RowBlockMemory* memory) : schema_(schema), columns_data_(schema->num_columns()), column_non_null_bitmaps_(schema->num_columns()), row_capacity_(nrows), nrows_(nrows), - arena_(arena), + memory_(memory), sel_vec_(nrows) { CHECK_GT(row_capacity_, 0); diff --git a/src/kudu/common/rowblock.h b/src/kudu/common/rowblock.h index 0db69d2..5dd5827 100644 --- a/src/kudu/common/rowblock.h +++ b/src/kudu/common/rowblock.h @@ -26,6 +26,7 @@ #include <glog/logging.h> #include "kudu/common/columnblock.h" +#include "kudu/common/rowblock_memory.h" #include "kudu/common/schema.h" #include "kudu/common/types.h" #include "kudu/gutil/macros.h" @@ -294,7 +295,6 @@ class SelectedRows { std::vector<uint16_t> indexes_; }; - // A block of decoded rows. // Wrapper around a buffer, which keeps the buffer's size, associated arena, // and schema. Provides convenience accessors for indexing by row, column, etc. @@ -313,9 +313,7 @@ class RowBlock { // Constructs a new RowBlock. // // The 'schema' and 'arena' objects must outlive this RowBlock. - RowBlock(const Schema* schema, - size_t nrows, - Arena* arena); + RowBlock(const Schema* schema, size_t nrows, RowBlockMemory* memory); ~RowBlock(); // Resize the block to the given number of rows. @@ -331,7 +329,7 @@ class RowBlock { RowBlockRow row(size_t idx) const; const Schema* schema() const { return schema_; } - Arena* arena() const { return arena_; } + Arena* arena() const { return &memory_->arena; } ColumnBlock column_block(size_t col_idx) const { return column_block(col_idx, nrows_); @@ -344,7 +342,7 @@ class RowBlock { uint8_t* col_data = columns_data_[col_idx]; uint8_t* nulls_bitmap = column_non_null_bitmaps_[col_idx]; - return ColumnBlock(col_schema.type_info(), nulls_bitmap, col_data, nrows, arena_); + return ColumnBlock(col_schema.type_info(), nulls_bitmap, col_data, nrows, memory_); } // Return the base pointer for the given column's data. @@ -447,7 +445,7 @@ class RowBlock { // nrows_ <= row_capacity_ size_t nrows_; - Arena* arena_; + RowBlockMemory* memory_; // The bitmap indicating which rows are valid in this block. // Deleted rows or rows which have failed to pass predicates will be zeroed diff --git a/src/kudu/common/rowblock_memory.h b/src/kudu/common/rowblock_memory.h new file mode 100644 index 0000000..9117ebb --- /dev/null +++ b/src/kudu/common/rowblock_memory.h @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include "kudu/util/memory/arena.h" + +namespace kudu { + +// Handles the memory allocated alongside a RowBlock for variable-length +// cells. +// +// When scanning rows into a RowBlock, the rows may contain variable-length +// data (eg BINARY columns). In this case, the data cannot be inlined directly +// into the columnar data arrays that are part of the RowBlock and instead need +// to be allocated out of a separate Arena. This class wraps that Arena. +struct RowBlockMemory { + Arena arena; + + explicit RowBlockMemory(int arena_size = 32 * 1024) : arena(arena_size) {} + void Reset() { arena.Reset(); } +}; + +} // namespace kudu diff --git a/src/kudu/common/wire_protocol-test.cc b/src/kudu/common/wire_protocol-test.cc index fb08521..ba391a8 100644 --- a/src/kudu/common/wire_protocol-test.cc +++ b/src/kudu/common/wire_protocol-test.cc @@ -35,6 +35,7 @@ #include "kudu/common/common.pb.h" #include "kudu/common/row.h" #include "kudu/common/rowblock.h" +#include "kudu/common/rowblock_memory.h" #include "kudu/common/schema.h" #include "kudu/common/types.h" #include "kudu/common/wire_protocol.pb.h" @@ -261,8 +262,8 @@ TEST_F(WireProtocolTest, TestBadSchema_DuplicateColumnName) { // Create a block of rows and ensure that it can be converted to and from protobuf. TEST_F(WireProtocolTest, TestRowBlockToRowwisePB) { - Arena arena(1024); - RowBlock block(&schema_, 30, &arena); + RowBlockMemory mem(1024); + RowBlock block(&schema_, 30, &mem); FillRowBlockWithTestRows(&block); // Convert to PB. @@ -299,10 +300,10 @@ TEST_F(WireProtocolTest, TestRowBlockToColumnarPB) { // Generate several blocks of random data. static constexpr int kNumBlocks = 3; static constexpr int kBatchSizeBytes = 8192 * 1024; - Arena arena(1024); + RowBlockMemory mem(1024); std::list<RowBlock> blocks; for (int i = 0; i < kNumBlocks; i++) { - blocks.emplace_back(&schema_, 30, &arena); + blocks.emplace_back(&schema_, 30, &mem); FillRowBlockWithTestRows(&blocks.back()); } @@ -363,7 +364,7 @@ TEST_F(WireProtocolTest, TestRowBlockToColumnarPB) { // converted to and from protobuf. TEST_F(WireProtocolTest, TestColumnarRowBlockToPBWithPadding) { int kNumRows = 10; - Arena arena(1024); + RowBlockMemory mem(1024); // Create a schema with multiple UNIXTIME_MICROS columns in different // positions. Schema tablet_schema({ ColumnSchema("key", UNIXTIME_MICROS), @@ -371,7 +372,7 @@ TEST_F(WireProtocolTest, TestColumnarRowBlockToPBWithPadding) { ColumnSchema("col2", UNIXTIME_MICROS), ColumnSchema("col3", INT32, true /* nullable */), ColumnSchema("col4", UNIXTIME_MICROS, true /* nullable */)}, 1); - RowBlock block(&tablet_schema, kNumRows, &arena); + RowBlock block(&tablet_schema, kNumRows, &mem); block.selection_vector()->SetAllTrue(); for (int i = 0; i < block.nrows(); i++) { @@ -573,8 +574,8 @@ class WireProtocolBenchmark : double RunBenchmark(const BenchmarkColumnsSpec& spec, double select_rate) { ResetBenchmarkSchema(spec); - Arena arena(1024); - RowBlock block(&benchmark_schema_, 1000, &arena); + RowBlockMemory mem(1024); + RowBlock block(&benchmark_schema_, 1000, &mem); // Regardless of the config, use a constant number of selected cells for the test by // looping the conversion an appropriate number of times. const int64_t kNumCellsToConvert = AllowSlowTests() ? 100000000 : 1000000; @@ -687,8 +688,8 @@ TEST_F(WireProtocolTest, TestInvalidRowBlock) { // projection (a COUNT(*) query). TEST_F(WireProtocolTest, TestBlockWithNoColumns) { Schema empty(std::vector<ColumnSchema>(), 0); - Arena arena(1024); - RowBlock block(&empty, 1000, &arena); + RowBlockMemory mem(1024); + RowBlock block(&empty, 1000, &mem); block.selection_vector()->SetAllTrue(); // Unselect 100 rows for (int i = 0; i < 100; i++) { @@ -792,7 +793,7 @@ TEST_F(WireProtocolTest, TestColumnPredicateInList) { ColumnSchema col1("col1", INT32); vector<ColumnSchema> cols = { col1 }; Schema schema(cols, 1); - Arena arena(1024); + RowBlockMemory mem(1024); boost::optional<ColumnPredicate> predicate; { // col1 IN (5, 6, 10) @@ -805,7 +806,7 @@ TEST_F(WireProtocolTest, TestColumnPredicateInList) { ColumnPredicatePB pb; NO_FATALS(ColumnPredicateToPB(cp, &pb)); - ASSERT_OK(ColumnPredicateFromPB(schema, &arena, pb, &predicate)); + ASSERT_OK(ColumnPredicateFromPB(schema, &mem.arena, pb, &predicate)); ASSERT_EQ(predicate->predicate_type(), PredicateType::InList); ASSERT_EQ(3, predicate->raw_values().size()); } @@ -819,7 +820,7 @@ TEST_F(WireProtocolTest, TestColumnPredicateInList) { *pb.mutable_in_list()->mutable_values()->Add() = string("\0\0\0\0", 4); *pb.mutable_in_list()->mutable_values()->Add() = string("\0\0\0\0", 4); - ASSERT_OK(ColumnPredicateFromPB(schema, &arena, pb, &predicate)); + ASSERT_OK(ColumnPredicateFromPB(schema, &mem.arena, pb, &predicate)); ASSERT_EQ(PredicateType::Equality, predicate->predicate_type()); } @@ -828,9 +829,9 @@ TEST_F(WireProtocolTest, TestColumnPredicateInList) { pb.set_column("col1"); pb.mutable_in_list(); - Arena arena(1024); + RowBlockMemory mem(1024); boost::optional<ColumnPredicate> predicate; - ASSERT_OK(ColumnPredicateFromPB(schema, &arena, pb, &predicate)); + ASSERT_OK(ColumnPredicateFromPB(schema, &mem.arena, pb, &predicate)); ASSERT_EQ(PredicateType::None, predicate->predicate_type()); } @@ -840,9 +841,9 @@ TEST_F(WireProtocolTest, TestColumnPredicateInList) { pb.mutable_in_list(); *pb.mutable_in_list()->mutable_values()->Add() = string("\0", 1); - Arena arena(1024); + RowBlockMemory mem(1024); boost::optional<ColumnPredicate> predicate; - ASSERT_TRUE(ColumnPredicateFromPB(schema, &arena, pb, &predicate).IsInvalidArgument()); + ASSERT_TRUE(ColumnPredicateFromPB(schema, &mem.arena, pb, &predicate).IsInvalidArgument()); } } diff --git a/src/kudu/integration-tests/linked_list-test-util.h b/src/kudu/integration-tests/linked_list-test-util.h index 32bf71e..8e395df 100644 --- a/src/kudu/integration-tests/linked_list-test-util.h +++ b/src/kudu/integration-tests/linked_list-test-util.h @@ -572,9 +572,10 @@ inline Status LinkedListTester::VerifyLinkedListLocal( "Cannot create new row iterator"); RETURN_NOT_OK_PREPEND(iter->Init(nullptr), "Cannot initialize row iterator"); - Arena arena(1024); - RowBlock block(&projection, 100, &arena); + RowBlockMemory mem(1024); + RowBlock block(&projection, 100, &mem); while (iter->HasNext()) { + mem.Reset(); RETURN_NOT_OK(iter->NextBlock(&block)); for (int i = 0; i < block.nrows(); i++) { int64_t key; diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc index f19e5cf..b7f078a 100644 --- a/src/kudu/master/sys_catalog.cc +++ b/src/kudu/master/sys_catalog.cc @@ -40,6 +40,7 @@ #include "kudu/common/partition.h" #include "kudu/common/row_operations.h" #include "kudu/common/rowblock.h" +#include "kudu/common/rowblock_memory.h" #include "kudu/common/scan_spec.h" #include "kudu/common/schema.h" #include "kudu/common/wire_protocol.h" @@ -77,7 +78,6 @@ #include "kudu/util/fault_injection.h" #include "kudu/util/flag_tags.h" #include "kudu/util/logging.h" -#include "kudu/util/memory/arena.h" #include "kudu/util/metrics.h" #include "kudu/util/monotime.h" #include "kudu/util/net/net_util.h" @@ -692,8 +692,8 @@ Status SysCatalogTable::ProcessRows( RETURN_NOT_OK(tablet_replica_->tablet()->NewRowIterator(schema_, &iter)); RETURN_NOT_OK(iter->Init(&spec)); - Arena arena(32 * 1024); - RowBlock block(&iter->schema(), 512, &arena); + RowBlockMemory mem(32 * 1024); + RowBlock block(&iter->schema(), 512, &mem); while (iter->HasNext()) { RETURN_NOT_OK(iter->NextBlock(&block)); const size_t nrows = block.nrows(); diff --git a/src/kudu/tablet/cfile_set-test.cc b/src/kudu/tablet/cfile_set-test.cc index 4b78835..d3f66c3 100644 --- a/src/kudu/tablet/cfile_set-test.cc +++ b/src/kudu/tablet/cfile_set-test.cc @@ -39,6 +39,7 @@ #include "kudu/common/iterator_stats.h" #include "kudu/common/row.h" #include "kudu/common/rowblock.h" +#include "kudu/common/rowblock_memory.h" #include "kudu/common/rowid.h" #include "kudu/common/scan_spec.h" #include "kudu/common/schema.h" @@ -192,9 +193,10 @@ class TestCFileSet : public KuduRowSetTest { ASSERT_OK(iter->Init(&spec)); // Check that the range was respected on all the results. - Arena arena(1024); - RowBlock block(&schema_, 100, &arena); + RowBlockMemory mem(1024); + RowBlock block(&schema_, 100, &mem); while (iter->HasNext()) { + mem.Reset(); ASSERT_OK_FAST(iter->NextBlock(&block)); for (size_t i = 0; i < block.nrows(); i++) { if (block.selection_vector()->IsRowSelected(i)) { @@ -226,8 +228,8 @@ class TestCFileSet : public KuduRowSetTest { } ASSERT_OK(iter->Init(&spec)); // Check that the range was respected on all the results. - Arena arena(1024); - RowBlock block(&schema_, 100, &arena); + RowBlockMemory mem(1024); + RowBlock block(&schema_, 100, &mem); while (iter->HasNext()) { ASSERT_OK_FAST(iter->NextBlock(&block)); for (size_t i = 0; i < block.nrows(); i++) { @@ -288,11 +290,11 @@ TEST_F(TestCFileSet, TestPartiallyMaterialize) { unique_ptr<CFileSet::Iterator> iter(fileset->NewIterator(&schema_, nullptr)); ASSERT_OK(iter->Init(nullptr)); - Arena arena(4096); - RowBlock block(&schema_, 100, &arena); + RowBlockMemory mem(4096); + RowBlock block(&schema_, 100, &mem); rowid_t row_idx = 0; while (iter->HasNext()) { - arena.Reset(); + mem.Reset(); size_t n = block.nrows(); ASSERT_OK_FAST(iter->PrepareBatch(&n)); diff --git a/src/kudu/tablet/compaction-test.cc b/src/kudu/tablet/compaction-test.cc index 0aff19d..07d4597 100644 --- a/src/kudu/tablet/compaction-test.cc +++ b/src/kudu/tablet/compaction-test.cc @@ -38,6 +38,7 @@ #include "kudu/common/row.h" #include "kudu/common/row_changelist.h" #include "kudu/common/rowblock.h" +#include "kudu/common/rowblock_memory.h" #include "kudu/common/rowid.h" #include "kudu/common/schema.h" #include "kudu/common/timestamp.h" @@ -781,7 +782,8 @@ TEST_F(TestCompaction, TestDuplicatedRowsRandomCompaction) { } - RowBlock block(&schema_, kBaseNumRowSets * kNumRowsPerRowSet, &arena_); + RowBlockMemory mem; + RowBlock block(&schema_, kBaseNumRowSets * kNumRowsPerRowSet, &mem); // Go through the expected compaction input rows, flip the last undo into a redo and // build the base. This will give us the final version that we'll expect the result // of the real compaction to match. @@ -791,13 +793,13 @@ TEST_F(TestCompaction, TestDuplicatedRowsRandomCompaction) { row->undo_head = reinsert->next(); row->row = block.row(i); BuildRow(i, i); - CopyRow(row_builder_.row(), &row->row, &arena_); + CopyRow(row_builder_.row(), &row->row, &mem.arena); RowChangeListDecoder redo_decoder(reinsert->changelist()); CHECK_OK(redo_decoder.Init()); faststring buf; RowChangeListEncoder dummy(&buf); dummy.SetToUpdate(); - redo_decoder.MutateRowAndCaptureChanges(&row->row, &arena_, &dummy); + redo_decoder.MutateRowAndCaptureChanges(&row->row, &mem.arena, &dummy); AddExpectedDelete(&row->redo_head, reinsert->timestamp()); } diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc index fbd28e8..23eba03 100644 --- a/src/kudu/tablet/compaction.cc +++ b/src/kudu/tablet/compaction.cc @@ -33,6 +33,7 @@ #include "kudu/common/iterator.h" #include "kudu/common/row.h" #include "kudu/common/row_changelist.h" +#include "kudu/common/rowblock_memory.h" #include "kudu/common/rowid.h" #include "kudu/common/scan_spec.h" #include "kudu/common/schema.h" @@ -202,8 +203,8 @@ class DiskRowSetCompactionInput : public CompactionInput { : base_iter_(std::move(base_iter)), redo_delta_iter_(std::move(redo_delta_iter)), undo_delta_iter_(std::move(undo_delta_iter)), - arena_(32 * 1024), - block_(&base_iter_->schema(), kRowsPerBlock, &arena_), + mem_(32 * 1024), + block_(&base_iter_->schema(), kRowsPerBlock, &mem_), redo_mutation_block_(kRowsPerBlock, static_cast<Mutation *>(nullptr)), undo_mutation_block_(kRowsPerBlock, static_cast<Mutation *>(nullptr)) {} @@ -248,7 +249,7 @@ class DiskRowSetCompactionInput : public CompactionInput { return Status::OK(); } - Arena* PreparedBlockArena() override { return &arena_; } + Arena* PreparedBlockArena() override { return &mem_.arena; } Status FinishBlock() override { return Status::OK(); @@ -264,7 +265,7 @@ class DiskRowSetCompactionInput : public CompactionInput { unique_ptr<DeltaIterator> redo_delta_iter_; unique_ptr<DeltaIterator> undo_delta_iter_; - Arena arena_; + RowBlockMemory mem_; // The current block of data which has come from the input iterator RowBlock block_; @@ -690,7 +691,7 @@ class MergeCompactionInput : public CompactionInput { num_dup_rows_++; if (row_idx == 0) { duplicated_rows_.push_back(std::unique_ptr<RowBlock>( - new RowBlock(schema_, kDuplicatedRowsPerBlock, static_cast<Arena*>(nullptr)))); + new RowBlock(schema_, kDuplicatedRowsPerBlock, static_cast<RowBlockMemory*>(nullptr)))); } return duplicated_rows_.back()->row(row_idx); } diff --git a/src/kudu/tablet/delta_compaction.cc b/src/kudu/tablet/delta_compaction.cc index b97af61..66516d4 100644 --- a/src/kudu/tablet/delta_compaction.cc +++ b/src/kudu/tablet/delta_compaction.cc @@ -31,6 +31,7 @@ #include "kudu/common/row.h" #include "kudu/common/row_changelist.h" #include "kudu/common/rowblock.h" +#include "kudu/common/rowblock_memory.h" #include "kudu/common/rowid.h" #include "kudu/common/scan_spec.h" #include "kudu/fs/block_manager.h" @@ -128,8 +129,8 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const IOContext* io_context) { RETURN_NOT_OK(delta_iter_->Init(&spec)); RETURN_NOT_OK(delta_iter_->SeekToOrdinal(0)); - Arena arena(32 * 1024); - RowBlock block(&partial_schema_, kRowsPerBlock, &arena); + RowBlockMemory mem(32 * 1024); + RowBlock block(&partial_schema_, kRowsPerBlock, &mem); DVLOG(1) << "Applying deltas and rewriting columns (" << partial_schema_.ToString() << ")"; unique_ptr<DeltaStats> redo_stats(new DeltaStats); @@ -140,7 +141,7 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const IOContext* io_context) { while (old_base_data_rwise->HasNext()) { // 1) Get the next batch of base data for the columns we're compacting. - arena.Reset(); + mem.Reset(); RETURN_NOT_OK(old_base_data_rwise->NextBlock(&block)); size_t n = block.nrows(); @@ -177,12 +178,8 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const IOContext* io_context) { // NOTE: This is presently ignored. bool is_garbage_collected; - RETURN_NOT_OK(ApplyMutationsAndGenerateUndos(snap, - *input_row, - &new_undos_head, - &new_redos_head, - &arena, - &dst_row)); + RETURN_NOT_OK(ApplyMutationsAndGenerateUndos( + snap, *input_row, &new_undos_head, &new_redos_head, &mem.arena, &dst_row)); RemoveAncientUndos(history_gc_opts_, &new_undos_head, @@ -210,9 +207,9 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const IOContext* io_context) { // 5) Remove the columns that we've done our major REDO delta compaction on // from this delta flush, except keep all the delete and reinsert // mutations. - arena.Reset(); + mem.Reset(); vector<DeltaKeyAndUpdate> out; - RETURN_NOT_OK(delta_iter_->FilterColumnIdsAndCollectDeltas(column_ids_, &out, &arena)); + RETURN_NOT_OK(delta_iter_->FilterColumnIdsAndCollectDeltas(column_ids_, &out, &mem.arena)); // We only create a new redo delta file if we need to. if (!out.empty() && !new_redo_delta_writer_) { diff --git a/src/kudu/tablet/deltafile-test.cc b/src/kudu/tablet/deltafile-test.cc index 5b0ee12..b58b2fa 100644 --- a/src/kudu/tablet/deltafile-test.cc +++ b/src/kudu/tablet/deltafile-test.cc @@ -31,10 +31,12 @@ #include <gtest/gtest.h> #include "kudu/cfile/cfile_util.h" +#include "kudu/common/columnblock-test-util.h" #include "kudu/common/columnblock.h" #include "kudu/common/common.pb.h" #include "kudu/common/row_changelist.h" #include "kudu/common/rowblock.h" +#include "kudu/common/rowblock_memory.h" #include "kudu/common/rowid.h" #include "kudu/common/schema.h" #include "kudu/common/timestamp.h" @@ -83,8 +85,7 @@ using fs::WritableBlock; class TestDeltaFile : public KuduTest { public: TestDeltaFile() : - schema_(CreateSchema()), - arena_(1024) { + schema_(CreateSchema()) { } public: @@ -175,7 +176,8 @@ class TestDeltaFile : public KuduTest { ASSERT_OK(s); ASSERT_OK(it->Init(nullptr)); - RowBlock block(&schema_, 100, &arena_); + RowBlockMemory mem; + RowBlock block(&schema_, 100, &mem); // Iterate through the faked table, starting with batches that // come before all of the updates, and extending a bit further @@ -185,7 +187,7 @@ class TestDeltaFile : public KuduTest { int start_row = 0; while (start_row < FLAGS_last_row_to_update + 10000) { block.ZeroMemory(); - arena_.Reset(); + mem.Reset(); ASSERT_OK_FAST(it->PrepareBatch(block.nrows(), DeltaIterator::PREPARE_FOR_APPLY)); SelectionVector sv(block.nrows()); @@ -218,7 +220,6 @@ class TestDeltaFile : public KuduTest { protected: unique_ptr<FsManager> fs_manager_; Schema schema_; - Arena arena_; BlockId test_block_; }; @@ -311,13 +312,15 @@ TEST_F(TestDeltaFile, TestCollectMutations) { vector<Mutation *> mutations; mutations.resize(100); + Arena arena(1024); + int start_row = 0; while (start_row < FLAGS_last_row_to_update + 10000) { + arena.Reset(); std::fill(mutations.begin(), mutations.end(), reinterpret_cast<Mutation *>(NULL)); - arena_.Reset(); ASSERT_OK_FAST(it->PrepareBatch(mutations.size(), DeltaIterator::PREPARE_FOR_COLLECT)); - ASSERT_OK(it->CollectMutations(&mutations, &arena_)); + ASSERT_OK(it->CollectMutations(&mutations, &arena)); for (int i = 0; i < mutations.size(); i++) { Mutation *mut_head = mutations[i]; diff --git a/src/kudu/tablet/diskrowset-test-base.h b/src/kudu/tablet/diskrowset-test-base.h index 70215b9..9eef818 100644 --- a/src/kudu/tablet/diskrowset-test-base.h +++ b/src/kudu/tablet/diskrowset-test-base.h @@ -221,13 +221,13 @@ class TestRowSet : public KuduRowSetTest { std::unique_ptr<RowwiseIterator> row_iter; CHECK_OK(rs.NewRowIterator(opts, &row_iter)); CHECK_OK(row_iter->Init(nullptr)); - Arena arena(1024); + RowBlockMemory mem(1024); int batch_size = 10000; - RowBlock dst(&proj_val, batch_size, &arena); + RowBlock dst(&proj_val, batch_size, &mem); int i = 0; while (row_iter->HasNext()) { - arena.Reset(); + mem.Reset(); CHECK_OK(row_iter->NextBlock(&dst)); VerifyUpdatedBlock(proj_val.ExtractColumnFromRow<UINT32>(dst.row(0), 0), i, dst.nrows(), updated); @@ -285,13 +285,13 @@ class TestRowSet : public KuduRowSetTest { CHECK_OK(row_iter->Init(nullptr)); int batch_size = 1000; - Arena arena(1024); - RowBlock dst(&schema, batch_size, &arena); + RowBlockMemory mem(1024); + RowBlock dst(&schema, batch_size, &mem); int i = 0; int log_interval = expected_rows/20 / batch_size; while (row_iter->HasNext()) { - arena.Reset(); + mem.Reset(); CHECK_OK(row_iter->NextBlock(&dst)); i += dst.nrows(); diff --git a/src/kudu/tablet/memrowset-test.cc b/src/kudu/tablet/memrowset-test.cc index c81d431..62afeb3 100644 --- a/src/kudu/tablet/memrowset-test.cc +++ b/src/kudu/tablet/memrowset-test.cc @@ -36,6 +36,7 @@ #include "kudu/common/row.h" #include "kudu/common/row_changelist.h" #include "kudu/common/rowblock.h" +#include "kudu/common/rowblock_memory.h" #include "kudu/common/schema.h" #include "kudu/common/timestamp.h" #include "kudu/consensus/log_anchor_registry.h" @@ -212,10 +213,11 @@ class TestMemRowSet : public KuduTest { unique_ptr<MemRowSet::Iterator> iter(mrs->NewIterator(opts)); CHECK_OK(iter->Init(nullptr)); - Arena arena(1024); - RowBlock block(&schema_, 100, &arena); + RowBlockMemory mem(1024); + RowBlock block(&schema_, 100, &mem); int fetched = 0; while (iter->HasNext()) { + mem.Reset(); CHECK_OK(iter->NextBlock(&block)); fetched += block.selection_vector()->CountSelected(); } diff --git a/src/kudu/tablet/mt-rowset_delta_compaction-test.cc b/src/kudu/tablet/mt-rowset_delta_compaction-test.cc index 70201e5..141e396 100644 --- a/src/kudu/tablet/mt-rowset_delta_compaction-test.cc +++ b/src/kudu/tablet/mt-rowset_delta_compaction-test.cc @@ -28,13 +28,13 @@ #include "kudu/common/common.pb.h" #include "kudu/common/iterator.h" #include "kudu/common/rowblock.h" +#include "kudu/common/rowblock_memory.h" #include "kudu/common/schema.h" #include "kudu/gutil/atomicops.h" #include "kudu/tablet/diskrowset-test-base.h" #include "kudu/tablet/diskrowset.h" #include "kudu/tablet/rowset.h" #include "kudu/tablet/tablet.pb.h" -#include "kudu/util/memory/arena.h" #include "kudu/util/monotime.h" #include "kudu/util/status.h" #include "kudu/util/test_macros.h" @@ -106,8 +106,8 @@ class TestMultiThreadedRowSetDeltaCompaction : public TestRowSet { } void ReadVerify(DiskRowSet *rs) { - Arena arena(1024); - RowBlock dst(&schema_, 1000, &arena); + RowBlockMemory mem(1024); + RowBlock dst(&schema_, 1000, &mem); RowIteratorOptions opts; opts.projection = &schema_; unique_ptr<RowwiseIterator> iter; @@ -115,6 +115,7 @@ class TestMultiThreadedRowSetDeltaCompaction : public TestRowSet { uint32_t expected = NoBarrier_Load(&update_counter_); ASSERT_OK(iter->Init(nullptr)); while (iter->HasNext()) { + mem.Reset(); ASSERT_OK_FAST(iter->NextBlock(&dst)); size_t n = dst.nrows(); ASSERT_GT(n, 0); diff --git a/src/kudu/tablet/mt-tablet-test.cc b/src/kudu/tablet/mt-tablet-test.cc index 8cf354b..f85d595 100644 --- a/src/kudu/tablet/mt-tablet-test.cc +++ b/src/kudu/tablet/mt-tablet-test.cc @@ -34,6 +34,7 @@ #include "kudu/common/iterator.h" #include "kudu/common/partial_row.h" #include "kudu/common/rowblock.h" +#include "kudu/common/rowblock_memory.h" #include "kudu/common/rowid.h" #include "kudu/common/schema.h" #include "kudu/gutil/basictypes.h" @@ -46,7 +47,6 @@ #include "kudu/tablet/tablet.h" #include "kudu/util/countdown_latch.h" #include "kudu/util/faststring.h" -#include "kudu/util/memory/arena.h" #include "kudu/util/monotime.h" #include "kudu/util/status.h" #include "kudu/util/stopwatch.h" @@ -148,8 +148,8 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> { LocalTabletWriter writer(this->tablet().get(), &this->client_schema_); - Arena tmp_arena(1024); - RowBlock block(&schema_, 1, &tmp_arena); + RowBlockMemory mem(1024); + RowBlock block(&schema_, 1, &mem); faststring update_buf; uint64_t updates_since_last_report = 0; @@ -164,7 +164,7 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> { CHECK_OK(iter->Init(nullptr)); while (iter->HasNext() && running_insert_count_.count() > 0) { - tmp_arena.Reset(); + mem.Reset(); CHECK_OK(iter->NextBlock(&block)); CHECK_EQ(block.nrows(), 1); @@ -218,8 +218,8 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> { // This is meant to test that outstanding iterators don't end up // trying to reference already-freed memrowset memory. void SlowReaderThread(int /*tid*/) { - Arena arena(32*1024); - RowBlock block(&schema_, 1, &arena); + RowBlockMemory mem(32 * 1024); + RowBlock block(&schema_, 1, &mem); uint64_t max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * FLAGS_num_insert_threads) / FLAGS_num_insert_threads; @@ -232,6 +232,7 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> { CHECK_OK(iter->Init(nullptr)); for (int i = 0; i < max_iters && iter->HasNext(); i++) { + mem.Reset(); CHECK_OK(iter->NextBlock(&block)); if (running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(1))) { @@ -251,10 +252,10 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> { } uint64_t CountSum(const shared_ptr<TimeSeries> &scanned_ts) { - Arena arena(1024); // unused, just scanning ints + RowBlockMemory mem(1024); // unused, just scanning ints static const int kBufInts = 1024*1024 / 8; - RowBlock block(&valcol_projection_, kBufInts, &arena); + RowBlock block(&valcol_projection_, kBufInts, &mem); ColumnBlock column = block.column_block(0); uint64_t count_since_report = 0; @@ -266,7 +267,7 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> { CHECK_OK(iter->Init(nullptr)); while (iter->HasNext()) { - arena.Reset(); + mem.Reset(); CHECK_OK(iter->NextBlock(&block)); for (size_t j = 0; j < block.nrows(); j++) { diff --git a/src/kudu/tablet/tablet-decoder-eval-test.cc b/src/kudu/tablet/tablet-decoder-eval-test.cc index bf5fd0f..c811912 100644 --- a/src/kudu/tablet/tablet-decoder-eval-test.cc +++ b/src/kudu/tablet/tablet-decoder-eval-test.cc @@ -34,6 +34,7 @@ #include "kudu/common/partial_row.h" #include "kudu/common/row.h" #include "kudu/common/rowblock.h" +#include "kudu/common/rowblock_memory.h" #include "kudu/common/scan_spec.h" #include "kudu/common/schema.h" #include "kudu/gutil/stringprintf.h" @@ -255,12 +256,13 @@ public: ASSERT_OK(iter->Init(&spec)); ASSERT_TRUE(spec.predicates().empty()) << "Should have accepted all predicates"; - Arena ret_arena(1024); + RowBlockMemory mem(1024); size_t expected_count = ExpectedCount(nrows, cardinality, lower, upper); Schema schema = iter->schema(); - RowBlock block(&schema, 100, &ret_arena); + RowBlock block(&schema, 100, &mem); int fetched = 0; - string column_str_a, column_str_b; + string column_str_a; + string column_str_b; while (iter->HasNext()) { ASSERT_OK(iter->NextBlock(&block)); for (size_t i = 0; i < block.nrows(); i++) { diff --git a/src/kudu/tablet/tablet-test-base.h b/src/kudu/tablet/tablet-test-base.h index b85640a..9f9b406 100644 --- a/src/kudu/tablet/tablet-test-base.h +++ b/src/kudu/tablet/tablet-test-base.h @@ -311,8 +311,7 @@ class TabletTestBase : public KuduTabletTest { TabletHarness::Options::ClockType::LOGICAL_CLOCK) : KuduTabletTest(TESTSETUP::CreateSchema(), clock_type), setup_(), - max_rows_(setup_.GetMaxRows()), - arena_(1024) + max_rows_(setup_.GetMaxRows()) {} // Inserts "count" rows. @@ -452,8 +451,8 @@ class TabletTestBase : public KuduTabletTest { ASSERT_OK(iter->Init(nullptr)); int batch_size = std::max<size_t>(1, std::min<size_t>(expected_row_count / 10, 4L * 1024 * 1024 / schema_.byte_size())); - Arena arena(32*1024); - RowBlock block(&schema_, batch_size, &arena); + RowBlockMemory mem(32 * 1024); + RowBlock block(&schema_, batch_size, &mem); bool check_for_dups = true; if (expected_row_count > INT_MAX) { @@ -469,6 +468,7 @@ class TabletTestBase : public KuduTabletTest { uint64_t actual_row_count = 0; while (iter->HasNext()) { + mem.Reset(); ASSERT_OK_FAST(iter->NextBlock(&block)); RowBlockRow rb_row = block.row(0); @@ -537,8 +537,6 @@ class TabletTestBase : public KuduTabletTest { TESTSETUP setup_; const uint64_t max_rows_; - - Arena arena_; }; diff --git a/src/kudu/tablet/tablet-test-util.h b/src/kudu/tablet/tablet-test-util.h index bf92857..1ca5a1a 100644 --- a/src/kudu/tablet/tablet-test-util.h +++ b/src/kudu/tablet/tablet-test-util.h @@ -38,6 +38,7 @@ #include "kudu/cfile/cfile_util.h" #include "kudu/common/columnblock.h" +#include "kudu/common/columnblock-test-util.h" #include "kudu/common/common.pb.h" #include "kudu/common/iterator.h" #include "kudu/common/row.h" @@ -199,8 +200,8 @@ class KuduRowSetTest : public KuduTabletTest { static inline Status SilentIterateToStringList(RowwiseIterator* iter, int* fetched) { const Schema& schema = iter->schema(); - Arena arena(1024); - RowBlock block(&schema, 100, &arena); + RowBlockMemory memory(1024); + RowBlock block(&schema, 100, &memory); *fetched = 0; while (iter->HasNext()) { RETURN_NOT_OK(iter->NextBlock(&block)); @@ -218,8 +219,8 @@ static inline Status IterateToStringList(RowwiseIterator* iter, int limit = INT_MAX) { out->clear(); Schema schema = iter->schema(); - Arena arena(1024); - RowBlock block(&schema, 100, &arena); + RowBlockMemory memory(1024); + RowBlock block(&schema, 100, &memory); int fetched = 0; while (iter->HasNext() && fetched < limit) { RETURN_NOT_OK(iter->NextBlock(&block)); diff --git a/src/kudu/tablet/tablet-test.cc b/src/kudu/tablet/tablet-test.cc index aaea8ef..f0e9c92 100644 --- a/src/kudu/tablet/tablet-test.cc +++ b/src/kudu/tablet/tablet-test.cc @@ -39,6 +39,7 @@ #include "kudu/common/key_range.h" #include "kudu/common/partial_row.h" #include "kudu/common/rowblock.h" +#include "kudu/common/rowblock_memory.h" #include "kudu/common/schema.h" #include "kudu/common/timestamp.h" #include "kudu/common/wire_protocol.pb.h" @@ -507,7 +508,8 @@ TYPED_TEST(TestTablet, TestRowIteratorSimple) { ASSERT_TRUE(iter->HasNext()); - RowBlock block(&this->schema_, 100, &this->arena_); + RowBlockMemory mem; + RowBlock block(&this->schema_, 100, &mem); // First call to CopyNextRows should fetch the whole memrowset. ASSERT_OK_FAST(iter->NextBlock(&block)); @@ -579,8 +581,10 @@ TYPED_TEST(TestTablet, TestRowIteratorOrdered) { // Iterate the tablet collecting rows. vector<shared_ptr<faststring> > rows; + RowBlockMemory mem; for (int i = 0; i < numBlocks; i++) { - RowBlock block(&this->schema_, rowsPerBlock, &this->arena_); + mem.Reset(); + RowBlock block(&this->schema_, rowsPerBlock, &mem); ASSERT_TRUE(iter->HasNext()); ASSERT_OK(iter->NextBlock(&block)); ASSERT_EQ(rowsPerBlock, block.nrows()) << "unexpected number of rows returned"; @@ -683,9 +687,10 @@ TYPED_TEST(TestTablet, TestRowIteratorComplex) { vector<bool> seen(max_rows, false); int seen_count = 0; - RowBlock block(&schema, 100, &this->arena_); + RowBlockMemory mem; + RowBlock block(&schema, 100, &mem); while (iter->HasNext()) { - this->arena_.Reset(); + mem.Reset(); ASSERT_OK(iter->NextBlock(&block)); LOG(INFO) << "Fetched batch of " << block.nrows(); for (size_t i = 0; i < block.nrows(); i++) { diff --git a/src/kudu/tablet/tablet_random_access-test.cc b/src/kudu/tablet/tablet_random_access-test.cc index 443e103..34ddd39 100644 --- a/src/kudu/tablet/tablet_random_access-test.cc +++ b/src/kudu/tablet/tablet_random_access-test.cc @@ -34,6 +34,7 @@ #include "kudu/common/iterator.h" #include "kudu/common/partial_row.h" #include "kudu/common/rowblock.h" +#include "kudu/common/rowblock_memory.h" #include "kudu/common/scan_spec.h" #include "kudu/common/schema.h" #include "kudu/common/wire_protocol.pb.h" @@ -44,7 +45,6 @@ #include "kudu/tablet/tablet-test-util.h" #include "kudu/tablet/tablet.h" #include "kudu/util/countdown_latch.h" -#include "kudu/util/memory/arena.h" #include "kudu/util/monotime.h" #include "kudu/util/scoped_cleanup.h" #include "kudu/util/status.h" @@ -294,10 +294,10 @@ class TestRandomAccess : public KuduTabletTest { optional<ExpectedKeyValueRow> ret; int n_results = 0; - Arena arena(1024); - RowBlock block(&schema, 100, &arena); + RowBlockMemory mem(1024); + RowBlock block(&schema, 100, &mem); while (iter->HasNext()) { - arena.Reset(); + mem.Reset(); CHECK_OK(iter->NextBlock(&block)); for (int i = 0; i < block.nrows(); i++) { if (!block.selection_vector()->IsRowSelected(i)) { diff --git a/src/kudu/tools/tool_action_local_replica.cc b/src/kudu/tools/tool_action_local_replica.cc index 2c313ff..70282e3 100644 --- a/src/kudu/tools/tool_action_local_replica.cc +++ b/src/kudu/tools/tool_action_local_replica.cc @@ -37,6 +37,7 @@ #include "kudu/common/iterator.h" #include "kudu/common/partition.h" #include "kudu/common/rowblock.h" +#include "kudu/common/rowblock_memory.h" #include "kudu/common/schema.h" #include "kudu/common/wire_protocol.h" #include "kudu/consensus/consensus.pb.h" @@ -81,7 +82,6 @@ #include "kudu/util/env.h" #include "kudu/util/env_util.h" #include "kudu/util/faststring.h" -#include "kudu/util/memory/arena.h" #include "kudu/util/metrics.h" #include "kudu/util/net/net_util.h" #include "kudu/util/pb_util.h" @@ -715,10 +715,11 @@ Status DumpRowSetInternal(const IOContext& ctx, RETURN_NOT_OK(rs->NewRowIterator(opts, &it)); RETURN_NOT_OK(it->Init(nullptr)); - Arena arena(1024); - RowBlock block(&key_proj, 100, &arena); + RowBlockMemory mem(1024); + RowBlock block(&key_proj, 100, &mem); faststring key; while (it->HasNext()) { + mem.Reset(); RETURN_NOT_OK(it->NextBlock(&block)); for (int i = 0; i < block.nrows(); i++) { key_proj.EncodeComparableKey(block.row(i), &key); diff --git a/src/kudu/tools/tool_action_perf.cc b/src/kudu/tools/tool_action_perf.cc index 5815f69..9aa19af 100644 --- a/src/kudu/tools/tool_action_perf.cc +++ b/src/kudu/tools/tool_action_perf.cc @@ -191,6 +191,7 @@ #include "kudu/common/iterator.h" #include "kudu/common/partial_row.h" #include "kudu/common/rowblock.h" +#include "kudu/common/rowblock_memory.h" #include "kudu/common/schema.h" #include "kudu/common/timestamp.h" #include "kudu/common/types.h" @@ -219,7 +220,6 @@ #include "kudu/util/flag_validators.h" #include "kudu/util/int128.h" #include "kudu/util/logging.h" -#include "kudu/util/memory/arena.h" #include "kudu/util/oid_generator.h" #include "kudu/util/random.h" #include "kudu/util/status.h" @@ -884,11 +884,11 @@ Status TabletScan(const RunnerContext& context) { unique_ptr<RowwiseIterator> iter; RETURN_NOT_OK(tablet->NewRowIterator(std::move(opts), &iter)); RETURN_NOT_OK(iter->Init(nullptr)); - Arena arena(1024); - RowBlock block(&projection, 100, &arena); + RowBlockMemory mem(1024); + RowBlock block(&projection, 100, &mem); int64_t rows_scanned = 0; while (iter->HasNext()) { - arena.Reset(); + mem.Reset(); RETURN_NOT_OK(iter->NextBlock(&block)); rows_scanned += block.nrows(); KLOG_EVERY_N_SECS(INFO, 10) << "scanned " << rows_scanned << " rows"; diff --git a/src/kudu/transactions/txn_status_tablet.cc b/src/kudu/transactions/txn_status_tablet.cc index cf3c76e..feb2698 100644 --- a/src/kudu/transactions/txn_status_tablet.cc +++ b/src/kudu/transactions/txn_status_tablet.cc @@ -32,6 +32,7 @@ #include "kudu/common/partial_row.h" #include "kudu/common/row_operations.h" #include "kudu/common/rowblock.h" +#include "kudu/common/rowblock_memory.h" #include "kudu/common/scan_spec.h" #include "kudu/common/schema.h" #include "kudu/common/types.h" @@ -47,7 +48,6 @@ #include "kudu/tserver/tserver.pb.h" #include "kudu/util/countdown_latch.h" #include "kudu/util/faststring.h" -#include "kudu/util/memory/arena.h" #include "kudu/util/once.h" #include "kudu/util/pb_util.h" #include "kudu/util/slice.h" @@ -234,8 +234,8 @@ Status TxnStatusTablet::VisitTransactions(TransactionsVisitor* visitor) { boost::optional<int64_t> prev_txn_id = boost::none; TxnStatusEntryPB prev_status_entry_pb; vector<ParticipantIdAndPB> prev_participants; - Arena arena(32 * 1024); - RowBlock block(&iter->schema(), 512, &arena); + RowBlockMemory mem; + RowBlock block(&iter->schema(), 512, &mem); // Iterate over the transaction and participant entries, notifying the // visitor once a transaction and all its participants have been found. while (iter->HasNext()) { diff --git a/src/kudu/tserver/tablet_server-test-base.cc b/src/kudu/tserver/tablet_server-test-base.cc index 9cbbeeb..ac1c779 100644 --- a/src/kudu/tserver/tablet_server-test-base.cc +++ b/src/kudu/tserver/tablet_server-test-base.cc @@ -33,6 +33,7 @@ #include "kudu/common/iterator.h" #include "kudu/common/partial_row.h" #include "kudu/common/rowblock.h" +#include "kudu/common/rowblock_memory.h" #include "kudu/common/wire_protocol-test-util.h" #include "kudu/common/wire_protocol.h" #include "kudu/common/wire_protocol.pb.h" @@ -51,7 +52,6 @@ #include "kudu/tserver/tablet_server_test_util.h" #include "kudu/tserver/ts_tablet_manager.h" #include "kudu/tserver/tserver_service.proxy.h" -#include "kudu/util/memory/arena.h" #include "kudu/util/metrics.h" #include "kudu/util/monotime.h" #include "kudu/util/net/net_util.h" @@ -413,11 +413,12 @@ void TabletServerTestBase::VerifyRows(const Schema& schema, std::min<int>(expected.size() / 10, 4*1024*1024 / schema.byte_size())); - Arena arena(32*1024); - RowBlock block(&schema, batch_size, &arena); + RowBlockMemory mem(32 * 1024); + RowBlock block(&schema, batch_size, &mem); int count = 0; while (iter->HasNext()) { + mem.Reset(); ASSERT_OK_FAST(iter->NextBlock(&block)); RowBlockRow rb_row = block.row(0); for (int i = 0; i < block.nrows(); i++) { diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc index 9fe419d..f1a6c74 100644 --- a/src/kudu/tserver/tablet_service.cc +++ b/src/kudu/tserver/tablet_service.cc @@ -44,6 +44,7 @@ #include "kudu/common/key_range.h" #include "kudu/common/partition.h" #include "kudu/common/rowblock.h" +#include "kudu/common/rowblock_memory.h" #include "kudu/common/scan_spec.h" #include "kudu/common/schema.h" #include "kudu/common/timestamp.h" @@ -2892,9 +2893,8 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req, // TODO(todd): could size the RowBlock based on the user's requested batch size? // If people had really large indirect objects, we would currently overshoot // their requested batch size by a lot. - Arena arena(32 * 1024); - RowBlock block(&iter->schema(), - FLAGS_scanner_batch_size_rows, &arena); + RowBlockMemory mem(32 * 1024); + RowBlock block(&iter->schema(), FLAGS_scanner_batch_size_rows, &mem); // TODO(todd): in the future, use the client timeout to set a budget. For now, // just use a half second, which should be plenty to amortize call overhead.
