Repository: incubator-impala Updated Branches: refs/heads/master 352ad55db -> 317c413a0
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/buffered-tuple-stream-v2-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream-v2-test.cc b/be/src/runtime/buffered-tuple-stream-v2-test.cc index 37ddad7..277a564 100644 --- a/be/src/runtime/buffered-tuple-stream-v2-test.cc +++ b/be/src/runtime/buffered-tuple-stream-v2-test.cc @@ -167,15 +167,15 @@ class SimpleTupleStreamTest : public testing::Test { /// needed to match the values generated in VerifyResults(). If 'gen_null' is true, /// some tuples will be set to NULL. virtual RowBatch* CreateBatch( - const RowDescriptor& row_desc, int offset, int num_rows, bool gen_null) { + const RowDescriptor* row_desc, int offset, int num_rows, bool gen_null) { RowBatch* batch = pool_.Add(new RowBatch(row_desc, num_rows, &tracker_)); - int num_tuples = row_desc.tuple_descriptors().size(); + int num_tuples = row_desc->tuple_descriptors().size(); - int idx = offset * CountSlotsPerRow(row_desc); + int idx = offset * CountSlotsPerRow(*row_desc); for (int row_idx = 0; row_idx < num_rows; ++row_idx) { TupleRow* row = batch->GetRow(row_idx); for (int tuple_idx = 0; tuple_idx < num_tuples; ++tuple_idx) { - TupleDescriptor* tuple_desc = row_desc.tuple_descriptors()[tuple_idx]; + TupleDescriptor* tuple_desc = row_desc->tuple_descriptors()[tuple_idx]; Tuple* tuple = Tuple::Create(tuple_desc->byte_size(), batch->tuple_data_pool()); bool is_null = gen_null && !GenBoolValue(idx); for (int slot_idx = 0; slot_idx < tuple_desc->slots().size(); ++slot_idx, ++idx) { @@ -205,11 +205,11 @@ class SimpleTupleStreamTest : public testing::Test { } virtual RowBatch* CreateIntBatch(int offset, int num_rows, bool gen_null) { - return CreateBatch(*int_desc_, offset, num_rows, gen_null); + return CreateBatch(int_desc_, offset, num_rows, gen_null); } virtual RowBatch* CreateStringBatch(int offset, int num_rows, bool gen_null) { - return CreateBatch(*string_desc_, offset, num_rows, gen_null); + return CreateBatch(string_desc_, offset, num_rows, gen_null); } void AppendValue(uint8_t* ptr, vector<int>* results) { @@ -258,7 +258,7 @@ class SimpleTupleStreamTest : public testing::Test { void ReadValues(BufferedTupleStreamV2* stream, RowDescriptor* desc, vector<T>* results, int num_batches = -1) { bool eos = false; - RowBatch batch(*desc, BATCH_SIZE, &tracker_); + RowBatch batch(desc, BATCH_SIZE, &tracker_); int batches_read = 0; do { batch.Reset(); @@ -319,7 +319,7 @@ class SimpleTupleStreamTest : public testing::Test { if (max_page_len == -1) max_page_len = default_page_len; BufferedTupleStreamV2 stream( - runtime_state_, *desc, &client_, default_page_len, max_page_len); + runtime_state_, desc, &client_, default_page_len, max_page_len); ASSERT_OK(stream.Init(-1, true)); bool got_write_reservation; ASSERT_OK(stream.PrepareForWrite(&got_write_reservation)); @@ -335,7 +335,7 @@ class SimpleTupleStreamTest : public testing::Test { Status status; ASSERT_TRUE(sizeof(T) == sizeof(int) || sizeof(T) == sizeof(StringValue)); - batch = CreateBatch(*desc, offset, num_rows, gen_null); + batch = CreateBatch(desc, offset, num_rows, gen_null); for (int j = 0; j < batch->num_rows(); ++j) { // TODO: test that AddRow succeeds after freeing memory. bool b = stream.AddRow(batch->GetRow(j), &status); @@ -363,8 +363,7 @@ class SimpleTupleStreamTest : public testing::Test { void TestIntValuesInterleaved(int num_batches, int num_batches_before_read, bool unpin_stream, int64_t page_len = PAGE_LEN) { - BufferedTupleStreamV2 stream( - runtime_state_, *int_desc_, &client_, page_len, page_len); + BufferedTupleStreamV2 stream(runtime_state_, int_desc_, &client_, page_len, page_len); ASSERT_OK(stream.Init(-1, true)); bool got_reservation; ASSERT_OK(stream.PrepareForReadWrite(true, &got_reservation)); @@ -606,7 +605,7 @@ void SimpleTupleStreamTest::TestUnpinPin(bool varlen_data, bool read_write) { RowDescriptor* row_desc = varlen_data ? string_desc_ : int_desc_; BufferedTupleStreamV2 stream( - runtime_state_, *row_desc, &client_, buffer_size, buffer_size); + runtime_state_, row_desc, &client_, buffer_size, buffer_size); ASSERT_OK(stream.Init(-1, true)); if (read_write) { bool got_reservation = false; @@ -701,7 +700,7 @@ void SimpleTupleStreamTest::TestTransferMemory(bool pin_stream, bool read_write) Init(100 * buffer_size); BufferedTupleStreamV2 stream( - runtime_state_, *int_desc_, &client_, buffer_size, buffer_size); + runtime_state_, int_desc_, &client_, buffer_size, buffer_size); ASSERT_OK(stream.Init(-1, pin_stream)); if (read_write) { bool got_reservation; @@ -784,7 +783,7 @@ TEST_F(SimpleTupleStreamTest, StringsOutsideStream) { } BufferedTupleStreamV2 stream( - runtime_state_, *string_desc_, &client_, buffer_size, buffer_size, external_slots); + runtime_state_, string_desc_, &client_, buffer_size, buffer_size, external_slots); ASSERT_OK(stream.Init(0, false)); bool got_reservation; ASSERT_OK(stream.PrepareForWrite(&got_reservation)); @@ -859,18 +858,18 @@ TEST_F(SimpleTupleStreamTest, BigRow) { // indicators in the stream so adding the row will fail. ASSERT_TRUE(nullable_big_row_desc_->IsAnyTupleNullable()); BufferedTupleStreamV2 nullable_stream( - runtime_state_, *nullable_big_row_desc_, &client_, BIG_ROW_BYTES, BIG_ROW_BYTES); + runtime_state_, nullable_big_row_desc_, &client_, BIG_ROW_BYTES, BIG_ROW_BYTES); ASSERT_OK(nullable_stream.Init(-1, true)); bool got_reservation; ASSERT_OK(nullable_stream.PrepareForWrite(&got_reservation)); // With null tuples, a row can fit in the stream. - RowBatch* batch = CreateBatch(*nullable_big_row_desc_, 0, 1, true); + RowBatch* batch = CreateBatch(nullable_big_row_desc_, 0, 1, true); Status status; EXPECT_TRUE(nullable_stream.AddRow(batch->GetRow(0), &status)); // With the additional null indicator, we can't fit all the tuples of a row into // the stream. - batch = CreateBatch(*nullable_big_row_desc_, 0, 1, false); + batch = CreateBatch(nullable_big_row_desc_, 0, 1, false); EXPECT_FALSE(nullable_stream.AddRow(batch->GetRow(0), &status)); EXPECT_EQ(TErrorCode::MAX_ROW_SIZE, status.code()); nullable_stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES); @@ -883,7 +882,7 @@ TEST_F(SimpleTupleStreamTest, BigRowMemoryUse) { Init(MAX_BUFFERS * BIG_ROW_BYTES); Status status; BufferedTupleStreamV2 stream( - runtime_state_, *big_row_desc_, &client_, DEFAULT_PAGE_LEN, BIG_ROW_BYTES * 2); + runtime_state_, big_row_desc_, &client_, DEFAULT_PAGE_LEN, BIG_ROW_BYTES * 2); ASSERT_OK(stream.Init(-1, true)); RowBatch* batch; bool got_reservation; @@ -891,7 +890,7 @@ TEST_F(SimpleTupleStreamTest, BigRowMemoryUse) { ASSERT_TRUE(got_reservation); // We should be able to append MAX_BUFFERS without problem. for (int i = 0; i < MAX_BUFFERS; ++i) { - batch = CreateBatch(*big_row_desc_, i, 1, false); + batch = CreateBatch(big_row_desc_, i, 1, false); bool success = stream.AddRow(batch->GetRow(0), &status); ASSERT_TRUE(success); // We should have one large page per row. @@ -900,7 +899,7 @@ TEST_F(SimpleTupleStreamTest, BigRowMemoryUse) { } // We can't fit another row in memory - need to unpin to make progress. - batch = CreateBatch(*big_row_desc_, MAX_BUFFERS, 1, false); + batch = CreateBatch(big_row_desc_, MAX_BUFFERS, 1, false); bool success = stream.AddRow(batch->GetRow(0), &status); ASSERT_FALSE(success); ASSERT_OK(status); @@ -919,7 +918,7 @@ TEST_F(SimpleTupleStreamTest, BigRowMemoryUse) { // Test for IMPALA-3923: overflow of 32-bit int in GetRows(). TEST_F(SimpleTupleStreamTest, TestGetRowsOverflow) { Init(BUFFER_POOL_LIMIT); - BufferedTupleStreamV2 stream(runtime_state_, *int_desc_, &client_, PAGE_LEN, PAGE_LEN); + BufferedTupleStreamV2 stream(runtime_state_, int_desc_, &client_, PAGE_LEN, PAGE_LEN); ASSERT_OK(stream.Init(-1, true)); Status status; @@ -941,10 +940,10 @@ TEST_F(SimpleTupleStreamTest, BigStringReadWrite) { Init(MAX_BUFFERS * BIG_ROW_BYTES); Status status; BufferedTupleStreamV2 stream( - runtime_state_, *string_desc_, &client_, DEFAULT_PAGE_LEN, BIG_ROW_BYTES * 2); + runtime_state_, string_desc_, &client_, DEFAULT_PAGE_LEN, BIG_ROW_BYTES * 2); ASSERT_OK(stream.Init(-1, true)); - RowBatch write_batch(*string_desc_, 1024, &tracker_); - RowBatch read_batch(*string_desc_, 1024, &tracker_); + RowBatch write_batch(string_desc_, 1024, &tracker_); + RowBatch read_batch(string_desc_, 1024, &tracker_); bool got_reservation; ASSERT_OK(stream.PrepareForReadWrite(false, &got_reservation)); ASSERT_TRUE(got_reservation); @@ -1109,7 +1108,7 @@ TEST_F(MultiTupleStreamTest, MultiTupleAddRowCustom) { int num_batches = 1; int rows_added = 0; BufferedTupleStreamV2 stream( - runtime_state_, *string_desc_, &client_, buffer_size, buffer_size); + runtime_state_, string_desc_, &client_, buffer_size, buffer_size); ASSERT_OK(stream.Init(-1, false)); bool got_write_reservation; ASSERT_OK(stream.PrepareForWrite(&got_write_reservation)); @@ -1229,7 +1228,7 @@ TEST_F(MultiNullableTupleStreamTest, TestComputeRowSize) { external_slots.insert(external_string_slot->id()); BufferedTupleStreamV2 stream( - runtime_state_, *string_desc_, &client_, PAGE_LEN, PAGE_LEN, external_slots); + runtime_state_, string_desc_, &client_, PAGE_LEN, PAGE_LEN, external_slots); gscoped_ptr<TupleRow, FreeDeleter> row( reinterpret_cast<TupleRow*>(malloc(tuple_descs.size() * sizeof(Tuple*)))); gscoped_ptr<Tuple, FreeDeleter> tuple0( @@ -1279,8 +1278,7 @@ TEST_F(ArrayTupleStreamTest, TestArrayDeepCopy) { Status status; Init(BUFFER_POOL_LIMIT); const int NUM_ROWS = 4000; - BufferedTupleStreamV2 stream( - runtime_state_, *array_desc_, &client_, PAGE_LEN, PAGE_LEN); + BufferedTupleStreamV2 stream(runtime_state_, array_desc_, &client_, PAGE_LEN, PAGE_LEN); const vector<TupleDescriptor*>& tuple_descs = array_desc_->tuple_descriptors(); // Write out a predictable pattern of data by iterating over arrays of constants. int strings_index = 0; // we take the mod of this as index into STRINGS. @@ -1350,7 +1348,7 @@ TEST_F(ArrayTupleStreamTest, TestArrayDeepCopy) { array_len_index = 0; bool eos = false; int rows_read = 0; - RowBatch batch(*array_desc_, BATCH_SIZE, &tracker_); + RowBatch batch(array_desc_, BATCH_SIZE, &tracker_); do { batch.Reset(); ASSERT_OK(stream.GetNext(&batch, &eos)); @@ -1394,7 +1392,7 @@ TEST_F(ArrayTupleStreamTest, TestComputeRowSize) { external_slots.insert(external_array_slot->id()); BufferedTupleStreamV2 stream( - runtime_state_, *array_desc_, &client_, PAGE_LEN, PAGE_LEN, external_slots); + runtime_state_, array_desc_, &client_, PAGE_LEN, PAGE_LEN, external_slots); gscoped_ptr<TupleRow, FreeDeleter> row( reinterpret_cast<TupleRow*>(malloc(tuple_descs.size() * sizeof(Tuple*)))); gscoped_ptr<Tuple, FreeDeleter> tuple0( http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/buffered-tuple-stream-v2.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream-v2.cc b/be/src/runtime/buffered-tuple-stream-v2.cc index 424678b..82da2bc 100644 --- a/be/src/runtime/buffered-tuple-stream-v2.cc +++ b/be/src/runtime/buffered-tuple-stream-v2.cc @@ -47,7 +47,7 @@ using namespace strings; using BufferHandle = BufferPool::BufferHandle; BufferedTupleStreamV2::BufferedTupleStreamV2(RuntimeState* state, - const RowDescriptor& row_desc, BufferPool::ClientHandle* buffer_pool_client, + const RowDescriptor* row_desc, BufferPool::ClientHandle* buffer_pool_client, int64_t default_page_len, int64_t max_page_len, const set<SlotId>& ext_varlen_slots) : state_(state), desc_(row_desc), @@ -70,7 +70,7 @@ BufferedTupleStreamV2::BufferedTupleStreamV2(RuntimeState* state, num_rows_(0), default_page_len_(default_page_len), max_page_len_(max_page_len), - has_nullable_tuple_(row_desc.IsAnyTupleNullable()), + has_nullable_tuple_(row_desc->IsAnyTupleNullable()), delete_on_read_(false), closed_(false), pinned_(true) { @@ -78,8 +78,8 @@ BufferedTupleStreamV2::BufferedTupleStreamV2(RuntimeState* state, DCHECK(BitUtil::IsPowerOf2(default_page_len)) << default_page_len; DCHECK(BitUtil::IsPowerOf2(max_page_len)) << max_page_len; read_page_ = pages_.end(); - for (int i = 0; i < desc_.tuple_descriptors().size(); ++i) { - const TupleDescriptor* tuple_desc = desc_.tuple_descriptors()[i]; + for (int i = 0; i < desc_->tuple_descriptors().size(); ++i) { + const TupleDescriptor* tuple_desc = desc_->tuple_descriptors()[i]; const int tuple_byte_size = tuple_desc->byte_size(); fixed_tuple_sizes_.push_back(tuple_byte_size); @@ -704,7 +704,7 @@ template <bool FILL_FLAT_ROWS, bool HAS_NULLABLE_TUPLE> Status BufferedTupleStreamV2::GetNextInternal( RowBatch* batch, bool* eos, vector<FlatRowPtr>* flat_rows) { DCHECK(!closed_); - DCHECK(batch->row_desc().Equals(desc_)); + DCHECK(batch->row_desc()->Equals(*desc_)); DCHECK(is_pinned() || !FILL_FLAT_ROWS) << "FlatRowPtrs are only valid for pinned streams"; *eos = (rows_returned_ == num_rows_); @@ -739,7 +739,7 @@ Status BufferedTupleStreamV2::GetNextInternal( flat_rows->reserve(rows_to_fill); } - const uint64_t tuples_per_row = desc_.tuple_descriptors().size(); + const uint64_t tuples_per_row = desc_->tuple_descriptors().size(); // Start reading from the current position in 'read_page_'. for (int i = 0; i < rows_to_fill; ++i) { if (FILL_FLAT_ROWS) { @@ -921,7 +921,7 @@ template <bool HAS_NULLABLE_TUPLE> bool BufferedTupleStreamV2::DeepCopyInternal( TupleRow* row, uint8_t** data, const uint8_t* data_end) noexcept { uint8_t* pos = *data; - const uint64_t tuples_per_row = desc_.tuple_descriptors().size(); + const uint64_t tuples_per_row = desc_->tuple_descriptors().size(); // Copy the not NULL fixed len tuples. For the NULL tuples just update the NULL tuple // indicator. if (HAS_NULLABLE_TUPLE) { @@ -1038,7 +1038,7 @@ void BufferedTupleStreamV2::GetTupleRow(FlatRowPtr flat_row, TupleRow* row) cons template <bool HAS_NULLABLE_TUPLE> void BufferedTupleStreamV2::UnflattenTupleRow(uint8_t** data, TupleRow* row) const { - const int tuples_per_row = desc_.tuple_descriptors().size(); + const int tuples_per_row = desc_->tuple_descriptors().size(); uint8_t* ptr = *data; if (has_nullable_tuple_) { // Stitch together the tuples from the page and the NULL ones. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/buffered-tuple-stream-v2.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream-v2.h b/be/src/runtime/buffered-tuple-stream-v2.h index 4376c11..c06dc6c 100644 --- a/be/src/runtime/buffered-tuple-stream-v2.h +++ b/be/src/runtime/buffered-tuple-stream-v2.h @@ -208,7 +208,7 @@ class BufferedTupleStreamV2 { /// that are added and the rows being returned. /// page_len: the size of pages to use in the stream /// ext_varlen_slots: set of varlen slots with data stored externally to the stream - BufferedTupleStreamV2(RuntimeState* state, const RowDescriptor& row_desc, + BufferedTupleStreamV2(RuntimeState* state, const RowDescriptor* row_desc, BufferPool::ClientHandle* buffer_pool_client, int64_t default_page_len, int64_t max_page_len, const std::set<SlotId>& ext_varlen_slots = std::set<SlotId>()); @@ -406,7 +406,7 @@ class BufferedTupleStreamV2 { RuntimeState* const state_; /// Description of rows stored in the stream. - const RowDescriptor& desc_; + const RowDescriptor* desc_; /// Plan node ID, used for error reporting. int node_id_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/buffered-tuple-stream.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc index bfe933c..cce6390 100644 --- a/be/src/runtime/buffered-tuple-stream.cc +++ b/be/src/runtime/buffered-tuple-stream.cc @@ -47,7 +47,7 @@ string BufferedTupleStream::RowIdx::DebugString() const { } BufferedTupleStream::BufferedTupleStream(RuntimeState* state, - const RowDescriptor& row_desc, BufferedBlockMgr* block_mgr, + const RowDescriptor* row_desc, BufferedBlockMgr* block_mgr, BufferedBlockMgr::Client* client, bool use_initial_small_buffers, bool read_write, const set<SlotId>& ext_varlen_slots) : state_(state), @@ -71,7 +71,7 @@ BufferedTupleStream::BufferedTupleStream(RuntimeState* state, unpin_timer_(NULL), get_new_block_timer_(NULL), read_write_(read_write), - has_nullable_tuple_(row_desc.IsAnyTupleNullable()), + has_nullable_tuple_(row_desc->IsAnyTupleNullable()), use_small_buffers_(use_initial_small_buffers), delete_on_read_(false), closed_(false), @@ -81,8 +81,8 @@ BufferedTupleStream::BufferedTupleStream(RuntimeState* state, max_null_indicators_size_ = -1; read_block_ = blocks_.end(); fixed_tuple_row_size_ = 0; - for (int i = 0; i < desc_.tuple_descriptors().size(); ++i) { - const TupleDescriptor* tuple_desc = desc_.tuple_descriptors()[i]; + for (int i = 0; i < desc_->tuple_descriptors().size(); ++i) { + const TupleDescriptor* tuple_desc = desc_->tuple_descriptors()[i]; const int tuple_byte_size = tuple_desc->byte_size(); fixed_tuple_sizes_.push_back(tuple_byte_size); fixed_tuple_row_size_ += tuple_byte_size; @@ -157,7 +157,8 @@ Status BufferedTupleStream::Init(int node_id, RuntimeProfile* profile, bool pinn max_null_indicators_size_ = ComputeNumNullIndicatorBytes(block_mgr_->max_block_size()); if (UNLIKELY(max_null_indicators_size_ < 0)) { // The block cannot even fit in a row of tuples so just assume there is one row. - int null_indicators_size = BitUtil::RoundUpNumi64(desc_.tuple_descriptors().size()) * 8; + int null_indicators_size = + BitUtil::RoundUpNumi64(desc_->tuple_descriptors().size()) * 8; return Status(TErrorCode::BTS_BLOCK_OVERFLOW, PrettyPrinter::Print(fixed_tuple_row_size_, TUnit::BYTES), PrettyPrinter::Print(null_indicators_size, TUnit::BYTES)); @@ -489,7 +490,7 @@ int BufferedTupleStream::ComputeNumNullIndicatorBytes(int block_size) const { if (has_nullable_tuple_) { // We assume that all rows will use their max size, so we may be underutilizing the // space, i.e. we may have some unused space in case of rows with NULL tuples. - const uint32_t tuples_per_row = desc_.tuple_descriptors().size(); + const uint32_t tuples_per_row = desc_->tuple_descriptors().size(); const uint32_t min_row_size_in_bits = 8 * fixed_tuple_row_size_ + tuples_per_row; const uint32_t block_size_in_bits = 8 * block_size; const uint32_t max_num_rows = block_size_in_bits / min_row_size_in_bits; @@ -547,12 +548,12 @@ template <bool FILL_INDICES, bool HAS_NULLABLE_TUPLE> Status BufferedTupleStream::GetNextInternal(RowBatch* batch, bool* eos, vector<RowIdx>* indices) { DCHECK(!closed_); - DCHECK(batch->row_desc().LayoutEquals(desc_)); + DCHECK(batch->row_desc()->LayoutEquals(*desc_)); *eos = (rows_returned_ == num_rows_); if (*eos) return Status::OK(); DCHECK_GE(read_block_null_indicators_size_, 0); - const uint64_t tuples_per_row = desc_.tuple_descriptors().size(); + const uint64_t tuples_per_row = desc_->tuple_descriptors().size(); DCHECK_LE(read_tuple_idx_ / tuples_per_row, (*read_block_)->num_rows()); DCHECK_EQ(read_tuple_idx_ % tuples_per_row, 0); int rows_returned_curr_block = read_tuple_idx_ / tuples_per_row; @@ -762,7 +763,7 @@ bool BufferedTupleStream::DeepCopyInternal(TupleRow* row) noexcept { DCHECK(write_block_->is_pinned()) << DebugString() << std::endl << write_block_->DebugString(); - const uint64_t tuples_per_row = desc_.tuple_descriptors().size(); + const uint64_t tuples_per_row = desc_->tuple_descriptors().size(); uint32_t bytes_remaining = write_block_bytes_remaining(); if (UNLIKELY((bytes_remaining < fixed_tuple_row_size_) || (HasNullableTuple && @@ -882,7 +883,7 @@ void BufferedTupleStream::GetTupleRow(const RowIdx& idx, TupleRow* row) const { uint8_t* data = block_start_idx_[idx.block()] + idx.offset(); if (has_nullable_tuple_) { // Stitch together the tuples from the block and the NULL ones. - const int tuples_per_row = desc_.tuple_descriptors().size(); + const int tuples_per_row = desc_->tuple_descriptors().size(); uint32_t tuple_idx = idx.idx() * tuples_per_row; for (int i = 0; i < tuples_per_row; ++i) { const uint8_t* null_word = block_start_idx_[idx.block()] + (tuple_idx >> 3); @@ -890,13 +891,13 @@ void BufferedTupleStream::GetTupleRow(const RowIdx& idx, TupleRow* row) const { const bool is_not_null = ((*null_word & (1 << (7 - null_pos))) == 0); row->SetTuple(i, reinterpret_cast<Tuple*>( reinterpret_cast<uint64_t>(data) * is_not_null)); - data += desc_.tuple_descriptors()[i]->byte_size() * is_not_null; + data += desc_->tuple_descriptors()[i]->byte_size() * is_not_null; ++tuple_idx; } } else { - for (int i = 0; i < desc_.tuple_descriptors().size(); ++i) { + for (int i = 0; i < desc_->tuple_descriptors().size(); ++i) { row->SetTuple(i, reinterpret_cast<Tuple*>(data)); - data += desc_.tuple_descriptors()[i]->byte_size(); + data += desc_->tuple_descriptors()[i]->byte_size(); } } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/buffered-tuple-stream.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream.h b/be/src/runtime/buffered-tuple-stream.h index ea5e8b8..41d63bf 100644 --- a/be/src/runtime/buffered-tuple-stream.h +++ b/be/src/runtime/buffered-tuple-stream.h @@ -214,7 +214,7 @@ class BufferedTupleStream { /// read_write: Stream allows interchanging read and write operations. Requires at /// least two blocks may be pinned. /// ext_varlen_slots: set of varlen slots with data stored externally to the stream - BufferedTupleStream(RuntimeState* state, const RowDescriptor& row_desc, + BufferedTupleStream(RuntimeState* state, const RowDescriptor* row_desc, BufferedBlockMgr* block_mgr, BufferedBlockMgr::Client* client, bool use_initial_small_buffers, bool read_write, const std::set<SlotId>& ext_varlen_slots = std::set<SlotId>()); @@ -350,7 +350,7 @@ class BufferedTupleStream { RuntimeState* const state_; /// Description of rows stored in the stream. - const RowDescriptor& desc_; + const RowDescriptor* desc_; /// Sum of the fixed length portion of all the tuples in desc_. int fixed_tuple_row_size_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/data-stream-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-mgr.cc b/be/src/runtime/data-stream-mgr.cc index ef9669e..f36f4c9 100644 --- a/be/src/runtime/data-stream-mgr.cc +++ b/be/src/runtime/data-stream-mgr.cc @@ -73,7 +73,7 @@ inline uint32_t DataStreamMgr::GetHashValue( } shared_ptr<DataStreamRecvr> DataStreamMgr::CreateRecvr(RuntimeState* state, - const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id, + const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, int buffer_size, RuntimeProfile* profile, bool is_merging) { DCHECK(profile != NULL); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/data-stream-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-mgr.h b/be/src/runtime/data-stream-mgr.h index 8aa3b3a..7819c24 100644 --- a/be/src/runtime/data-stream-mgr.h +++ b/be/src/runtime/data-stream-mgr.h @@ -73,10 +73,9 @@ class DataStreamMgr { /// single stream. /// Ownership of the receiver is shared between this DataStream mgr instance and the /// caller. - std::shared_ptr<DataStreamRecvr> CreateRecvr( - RuntimeState* state, const RowDescriptor& row_desc, - const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, - int num_senders, int buffer_size, RuntimeProfile* profile, + std::shared_ptr<DataStreamRecvr> CreateRecvr(RuntimeState* state, + const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id, + PlanNodeId dest_node_id, int num_senders, int buffer_size, RuntimeProfile* profile, bool is_merging); /// Adds a row batch to the recvr identified by fragment_instance_id/dest_node_id http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/data-stream-recvr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-recvr.cc b/be/src/runtime/data-stream-recvr.cc index 50103ba..2acab8b 100644 --- a/be/src/runtime/data-stream-recvr.cc +++ b/be/src/runtime/data-stream-recvr.cc @@ -208,7 +208,7 @@ void DataStreamRecvr::SenderQueue::AddBatch(const TRowBatch& thrift_batch) { // Note: if this function makes a row batch, the batch *must* be added // to batch_queue_. It is not valid to create the row batch and destroy // it in this thread. - batch = new RowBatch(recvr_->row_desc(), thrift_batch, recvr_->mem_tracker()); + batch = new RowBatch(recvr_->row_desc_, thrift_batch, recvr_->mem_tracker()); } VLOG_ROW << "added #rows=" << batch->num_rows() << " batch_size=" << batch_size << "\n"; @@ -265,7 +265,7 @@ Status DataStreamRecvr::CreateMerger(const TupleRowComparator& less_than) { input_batch_suppliers.reserve(sender_queues_.size()); // Create the merger that will a single stream of sorted rows. - merger_.reset(new SortedRunMerger(less_than, &row_desc_, profile_, false)); + merger_.reset(new SortedRunMerger(less_than, row_desc_, profile_, false)); for (int i = 0; i < sender_queues_.size(); ++i) { input_batch_suppliers.push_back( @@ -284,7 +284,7 @@ void DataStreamRecvr::TransferAllResources(RowBatch* transfer_batch) { } DataStreamRecvr::DataStreamRecvr(DataStreamMgr* stream_mgr, MemTracker* parent_tracker, - const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id, + const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, bool is_merging, int total_buffer_limit, RuntimeProfile* profile) : mgr_(stream_mgr), http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/data-stream-recvr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-recvr.h b/be/src/runtime/data-stream-recvr.h index 07e9224..96adfc4 100644 --- a/be/src/runtime/data-stream-recvr.h +++ b/be/src/runtime/data-stream-recvr.h @@ -90,7 +90,7 @@ class DataStreamRecvr { const TUniqueId& fragment_instance_id() const { return fragment_instance_id_; } PlanNodeId dest_node_id() const { return dest_node_id_; } - const RowDescriptor& row_desc() const { return row_desc_; } + const RowDescriptor& row_desc() const { return *row_desc_; } MemTracker* mem_tracker() const { return mem_tracker_.get(); } private: @@ -98,7 +98,7 @@ class DataStreamRecvr { class SenderQueue; DataStreamRecvr(DataStreamMgr* stream_mgr, MemTracker* parent_tracker, - const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id, + const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, bool is_merging, int total_buffer_limit, RuntimeProfile* profile); @@ -131,8 +131,8 @@ class DataStreamRecvr { /// exceeds this value int total_buffer_limit_; - /// Row schema, copied from the caller of CreateRecvr(). - RowDescriptor row_desc_; + /// Row schema. + const RowDescriptor* row_desc_; /// True if this reciver merges incoming rows from different senders. Per-sender /// row batch queues are maintained in this case. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/data-stream-sender.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-sender.cc b/be/src/runtime/data-stream-sender.cc index a45c2c4..aeadb56 100644 --- a/be/src/runtime/data-stream-sender.cc +++ b/be/src/runtime/data-stream-sender.cc @@ -66,9 +66,9 @@ class DataStreamSender::Channel : public CacheLineAligned { // combination. buffer_size is specified in bytes and a soft limit on // how much tuple data is getting accumulated before being sent; it only applies // when data is added via AddRow() and not sent directly via SendBatch(). - Channel(DataStreamSender* parent, const RowDescriptor& row_desc, - const TNetworkAddress& destination, const TUniqueId& fragment_instance_id, - PlanNodeId dest_node_id, int buffer_size) + Channel(DataStreamSender* parent, const RowDescriptor* row_desc, + const TNetworkAddress& destination, const TUniqueId& fragment_instance_id, + PlanNodeId dest_node_id, int buffer_size) : parent_(parent), buffer_size_(buffer_size), row_desc_(row_desc), @@ -78,8 +78,7 @@ class DataStreamSender::Channel : public CacheLineAligned { num_data_bytes_sent_(0), rpc_thread_("DataStreamSender", "SenderThread", 1, 1, bind<void>(mem_fn(&Channel::TransmitData), this, _1, _2)), - rpc_in_flight_(false) { - } + rpc_in_flight_(false) {} // Initialize channel. // Returns OK if successful, error indication otherwise. @@ -115,7 +114,7 @@ class DataStreamSender::Channel : public CacheLineAligned { DataStreamSender* parent_; int buffer_size_; - const RowDescriptor& row_desc_; + const RowDescriptor* row_desc_; TNetworkAddress address_; TUniqueId fragment_instance_id_; PlanNodeId dest_node_id_; @@ -159,7 +158,7 @@ class DataStreamSender::Channel : public CacheLineAligned { Status DataStreamSender::Channel::Init(RuntimeState* state) { runtime_state_ = state; // TODO: figure out how to size batch_ - int capacity = max(1, buffer_size_ / max(row_desc_.GetRowSize(), 1)); + int capacity = max(1, buffer_size_ / max(row_desc_->GetRowSize(), 1)); batch_.reset(new RowBatch(row_desc_, capacity, parent_->mem_tracker())); return Status::OK(); } @@ -250,7 +249,7 @@ Status DataStreamSender::Channel::AddRow(TupleRow* row) { RETURN_IF_ERROR(SendCurrentBatch()); } TupleRow* dest = batch_->GetRow(batch_->AddRow()); - const vector<TupleDescriptor*>& descs = row_desc_.tuple_descriptors(); + const vector<TupleDescriptor*>& descs = row_desc_->tuple_descriptors(); for (int i = 0; i < descs.size(); ++i) { if (UNLIKELY(row->GetTuple(i) == NULL)) { dest->SetTuple(i, NULL); @@ -325,9 +324,8 @@ void DataStreamSender::Channel::Teardown(RuntimeState* state) { batch_.reset(); } -DataStreamSender::DataStreamSender(int sender_id, - const RowDescriptor& row_desc, const TDataStreamSink& sink, - const vector<TPlanFragmentDestination>& destinations, +DataStreamSender::DataStreamSender(int sender_id, const RowDescriptor* row_desc, + const TDataStreamSink& sink, const vector<TPlanFragmentDestination>& destinations, int per_channel_buffer_size) : DataSink(row_desc), sender_id_(sender_id), @@ -381,7 +379,7 @@ Status DataStreamSender::Init(const vector<TExpr>& thrift_output_exprs, if (partition_type_ == TPartitionType::HASH_PARTITIONED || partition_type_ == TPartitionType::KUDU) { RETURN_IF_ERROR(ScalarExpr::Create(tsink.stream_sink.output_partition.partition_exprs, - row_desc_, state, &partition_exprs_)); + *row_desc_, state, &partition_exprs_)); } return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/data-stream-sender.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-sender.h b/be/src/runtime/data-stream-sender.h index 0719daf..95a83e5 100644 --- a/be/src/runtime/data-stream-sender.h +++ b/be/src/runtime/data-stream-sender.h @@ -57,7 +57,7 @@ class DataStreamSender : public DataSink { /// The RowDescriptor must live until Close() is called. /// NOTE: supported partition types are UNPARTITIONED (broadcast), HASH_PARTITIONED, /// and RANDOM. - DataStreamSender(int sender_id, const RowDescriptor& row_desc, + DataStreamSender(int sender_id, const RowDescriptor* row_desc, const TDataStreamSink& tsink, const std::vector<TPlanFragmentDestination>& destinations, int per_channel_buffer_size); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/data-stream-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc index 1048fb6..a358286 100644 --- a/be/src/runtime/data-stream-test.cc +++ b/be/src/runtime/data-stream-test.cc @@ -310,7 +310,7 @@ class DataStreamTest : public testing::Test { // Create batch_, but don't fill it with data yet. Assumes we created row_desc_. RowBatch* CreateRowBatch() { - RowBatch* batch = new RowBatch(*row_desc_, BATCH_CAPACITY, &tracker_); + RowBatch* batch = new RowBatch(row_desc_, BATCH_CAPACITY, &tracker_); int64_t* tuple_mem = reinterpret_cast<int64_t*>( batch->tuple_data_pool()->Allocate(BATCH_CAPACITY * 8)); bzero(tuple_mem, BATCH_CAPACITY * 8); @@ -341,7 +341,7 @@ class DataStreamTest : public testing::Test { GetNextInstanceId(&instance_id); receiver_info_.push_back(ReceiverInfo(stream_type, num_senders, receiver_num)); ReceiverInfo& info = receiver_info_.back(); - info.stream_recvr = stream_mgr_->CreateRecvr(runtime_state_.get(), *row_desc_, + info.stream_recvr = stream_mgr_->CreateRecvr(runtime_state_.get(), row_desc_, instance_id, DEST_NODE_ID, num_senders, buffer_size, profile, is_merging); if (!is_merging) { info.thread_handle = new thread(&DataStreamTest::ReadStream, this, &info); @@ -380,7 +380,7 @@ class DataStreamTest : public testing::Test { void ReadStreamMerging(ReceiverInfo* info, RuntimeProfile* profile) { info->status = info->stream_recvr->CreateMerger(*less_than_); if (info->status.IsCancelled()) return; - RowBatch batch(*row_desc_, 1024, &tracker_); + RowBatch batch(row_desc_, 1024, &tracker_); VLOG_QUERY << "start reading merging"; bool eos; while (!(info->status = info->stream_recvr->GetNext(&batch, &eos)).IsCancelled()) { @@ -489,7 +489,7 @@ class DataStreamTest : public testing::Test { VLOG_QUERY << "create sender " << sender_num; const TDataSink& sink = GetSink(partition_type); DataStreamSender sender( - sender_num, *row_desc_, sink.stream_sink, dest_, channel_buffer_size); + sender_num, row_desc_, sink.stream_sink, dest_, channel_buffer_size); TExprNode expr_node; expr_node.node_type = TExprNodeType::SLOT_REF; @@ -607,7 +607,7 @@ TEST_F(DataStreamTest, CloseRecvrWhileReferencesRemain) { TUniqueId instance_id; GetNextInstanceId(&instance_id); shared_ptr<DataStreamRecvr> stream_recvr = stream_mgr_->CreateRecvr(runtime_state.get(), - *row_desc_, instance_id, DEST_NODE_ID, 1, 1, profile.get(), false); + row_desc_, instance_id, DEST_NODE_ID, 1, 1, profile.get(), false); // Perform tear down, but keep a reference to the receiver so that it is deleted last // (to confirm that the destructor does not access invalid state after tear-down). http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/descriptors.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc index ad66c56..0f97f76 100644 --- a/be/src/runtime/descriptors.cc +++ b/be/src/runtime/descriptors.cc @@ -428,7 +428,7 @@ bool RowDescriptor::IsAnyTupleNullable() const { return false; } -void RowDescriptor::ToThrift(vector<TTupleId>* row_tuple_ids) { +void RowDescriptor::ToThrift(vector<TTupleId>* row_tuple_ids) const { row_tuple_ids->clear(); for (int i = 0; i < tuple_desc_map_.size(); ++i) { row_tuple_ids->push_back(tuple_desc_map_[i]->id()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/descriptors.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index 8a8dd7b..bdb8f8c 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -492,7 +492,9 @@ class DescriptorTbl { const HdfsTableDescriptor& hdfs_tbl, ObjectPool* pool) WARN_UNUSED_RESULT; }; -/// Records positions of tuples within row produced by ExecNode. +/// Records positions of tuples within row produced by ExecNode. RowDescriptors are +/// typically owned by their ExecNode, and shared by reference across the plan tree. +/// /// TODO: this needs to differentiate between tuples contained in row /// and tuples produced by ExecNode (parallel to PlanNode.rowTupleIds and /// PlanNode.tupleIds); right now, we conflate the two (and distinguish based on @@ -544,7 +546,7 @@ class RowDescriptor { } /// Populate row_tuple_ids with our ids. - void ToThrift(std::vector<TTupleId>* row_tuple_ids); + void ToThrift(std::vector<TTupleId>* row_tuple_ids) const; /// Return true if the tuple ids of this descriptor are a prefix /// of the tuple ids of other_desc. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/row-batch-serialize-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/row-batch-serialize-test.cc b/be/src/runtime/row-batch-serialize-test.cc index f4c4f0b..7a084fc 100644 --- a/be/src/runtime/row-batch-serialize-test.cc +++ b/be/src/runtime/row-batch-serialize-test.cc @@ -68,7 +68,7 @@ class RowBatchSerializeTest : public testing::Test { TRowBatch trow_batch; EXPECT_OK(batch->Serialize(&trow_batch, full_dedup)); - RowBatch deserialized_batch(row_desc, trow_batch, tracker_.get()); + RowBatch deserialized_batch(&row_desc, trow_batch, tracker_.get()); if (print_batches) cout << PrintBatch(&deserialized_batch) << endl; EXPECT_EQ(batch->num_rows(), deserialized_batch.num_rows()); @@ -198,7 +198,7 @@ class RowBatchSerializeTest : public testing::Test { // Creates a row batch with randomized values. RowBatch* CreateRowBatch(const RowDescriptor& row_desc) { - RowBatch* batch = pool_.Add(new RowBatch(row_desc, NUM_ROWS, tracker_.get())); + RowBatch* batch = pool_.Add(new RowBatch(&row_desc, NUM_ROWS, tracker_.get())); int len = row_desc.GetRowSize() * NUM_ROWS; uint8_t* tuple_mem = batch->tuple_data_pool()->Allocate(len); memset(tuple_mem, 0, len); @@ -259,7 +259,7 @@ class RowBatchSerializeTest : public testing::Test { // order provided, starting at the beginning once all are used. void AddTuplesToRowBatch(int num_rows, const vector<vector<Tuple*>>& tuples, const vector<int>& repeats, RowBatch* batch) { - int tuples_per_row = batch->row_desc().tuple_descriptors().size(); + int tuples_per_row = batch->row_desc()->tuple_descriptors().size(); ASSERT_EQ(tuples_per_row, tuples.size()); ASSERT_EQ(tuples_per_row, repeats.size()); vector<int> next_tuple(tuples_per_row, 0); @@ -447,7 +447,7 @@ void RowBatchSerializeTest::TestDupCorrectness(bool full_dedup) { repeats.push_back(1); // All string dups are consecutive repeats.push_back(num_rows / distinct_string_tuples + 1); - RowBatch* batch = pool_.Add(new RowBatch(row_desc, num_rows, tracker_.get())); + RowBatch* batch = pool_.Add(new RowBatch(&row_desc, num_rows, tracker_.get())); vector<vector<Tuple*>> distinct_tuples(2); CreateTuples(*row_desc.tuple_descriptors()[0], batch->tuple_data_pool(), distinct_int_tuples, 0, 10, &distinct_tuples[0]); @@ -481,7 +481,7 @@ void RowBatchSerializeTest::TestDupRemoval(bool full_dedup) { int num_distinct_tuples = 100; // All dups are consecutive int repeats = num_rows / num_distinct_tuples; - RowBatch* batch = pool_.Add(new RowBatch(row_desc, num_rows, tracker_.get())); + RowBatch* batch = pool_.Add(new RowBatch(&row_desc, num_rows, tracker_.get())); vector<Tuple*> tuples; CreateTuples(tuple_desc, batch->tuple_data_pool(), num_distinct_tuples, 0, 10, &tuples); AddTuplesToRowBatch(num_rows, tuples, repeats, batch); @@ -517,7 +517,7 @@ void RowBatchSerializeTest::TestConsecutiveNulls(bool full_dedup) { int num_rows = 100; int num_distinct_tuples = 20; int repeats = 5; - RowBatch* batch = pool_.Add(new RowBatch(row_desc, num_rows, tracker_.get())); + RowBatch* batch = pool_.Add(new RowBatch(&row_desc, num_rows, tracker_.get())); vector<Tuple*> tuples; CreateTuples(*row_desc.tuple_descriptors()[0], batch->tuple_data_pool(), num_distinct_tuples, 50, 10, &tuples); @@ -587,7 +587,7 @@ TEST_F(RowBatchSerializeTest, DedupPathologicalFull) { vector<vector<Tuple*>> tuples(num_tuples); vector<int> repeats(num_tuples, 1); // Don't repeat tuples adjacently int64_t total_byte_size = 0; - RowBatch* batch = pool_.Add(new RowBatch(row_desc, num_rows, tracker_.get())); + RowBatch* batch = pool_.Add(new RowBatch(&row_desc, num_rows, tracker_.get())); // First two tuples are integers because it doesn't matter for this test. for (int tuple_idx = 0; tuple_idx < num_int_tuples; ++tuple_idx) { TupleDescriptor* tuple_desc = row_desc.tuple_descriptors()[tuple_idx]; @@ -630,7 +630,7 @@ TEST_F(RowBatchSerializeTest, DedupPathologicalFull) { // Serialized data should only have one copy of each tuple. EXPECT_EQ(total_byte_size, trow_batch.uncompressed_size); LOG(INFO) << "Deserializing row batch"; - RowBatch deserialized_batch(row_desc, trow_batch, tracker_.get()); + RowBatch deserialized_batch(&row_desc, trow_batch, tracker_.get()); LOG(INFO) << "Verifying row batch"; // Need to do special verification: comparing all duplicate strings is too slow. EXPECT_EQ(batch->num_rows(), deserialized_batch.num_rows()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/row-batch-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/row-batch-test.cc b/be/src/runtime/row-batch-test.cc index bbdcc2d..dc9c395 100644 --- a/be/src/runtime/row-batch-test.cc +++ b/be/src/runtime/row-batch-test.cc @@ -47,25 +47,25 @@ TEST(RowBatchTest, AcquireStateWithMarkFlushResources) { RowDescriptor row_desc(*desc_tbl, tuple_id, nullable_tuples); MemTracker tracker; { - RowBatch src(row_desc, 1024, &tracker); + RowBatch src(&row_desc, 1024, &tracker); src.AddRow(); src.CommitLastRow(); // Calls MarkFlushResources(). src.MarkNeedsDeepCopy(); // Note InitialCapacity(), not capacity(). Latter will DCHECK. - RowBatch dest(row_desc, src.InitialCapacity(), &tracker); + RowBatch dest(&row_desc, src.InitialCapacity(), &tracker); dest.AcquireState(&src); } // Confirm the bad pattern causes an error. { - RowBatch src(row_desc, 1024, &tracker); + RowBatch src(&row_desc, 1024, &tracker); src.AddRow(); src.CommitLastRow(); // Calls MarkFlushResources(). src.MarkNeedsDeepCopy(); - RowBatch bad_dest(row_desc, src.capacity(), &tracker); + RowBatch bad_dest(&row_desc, src.capacity(), &tracker); IMPALA_ASSERT_DEBUG_DEATH(bad_dest.AcquireState(&src), ""); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/row-batch.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc index 7668063..9bb96aa 100644 --- a/be/src/runtime/row-batch.cc +++ b/be/src/runtime/row-batch.cc @@ -42,12 +42,12 @@ namespace impala { const int RowBatch::AT_CAPACITY_MEM_USAGE; const int RowBatch::FIXED_LEN_BUFFER_LIMIT; -RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity, MemTracker* mem_tracker) +RowBatch::RowBatch(const RowDescriptor* row_desc, int capacity, MemTracker* mem_tracker) : num_rows_(0), capacity_(capacity), flush_(FlushMode::NO_FLUSH_RESOURCES), needs_deep_copy_(false), - num_tuples_per_row_(row_desc.tuple_descriptors().size()), + num_tuples_per_row_(row_desc->tuple_descriptors().size()), auxiliary_mem_usage_(0), tuple_data_pool_(mem_tracker), row_desc_(row_desc), @@ -73,7 +73,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity, MemTracker* mem_ // to allocated string data in special mempool // (change via python script that runs over Data_types.cc) RowBatch::RowBatch( - const RowDescriptor& row_desc, const TRowBatch& input_batch, MemTracker* mem_tracker) + const RowDescriptor* row_desc, const TRowBatch& input_batch, MemTracker* mem_tracker) : num_rows_(input_batch.num_rows), capacity_(input_batch.num_rows), flush_(FlushMode::NO_FLUSH_RESOURCES), @@ -85,7 +85,7 @@ RowBatch::RowBatch( mem_tracker_(mem_tracker) { DCHECK(mem_tracker_ != NULL); tuple_ptrs_size_ = num_rows_ * input_batch.row_tuples.size() * sizeof(Tuple*); - DCHECK_EQ(input_batch.row_tuples.size(), row_desc.tuple_descriptors().size()); + DCHECK_EQ(input_batch.row_tuples.size(), row_desc->tuple_descriptors().size()); DCHECK_GT(tuple_ptrs_size_, 0); // TODO: switch to Init() pattern so we can check memory limit and return Status. if (FLAGS_enable_partitioned_aggregation && FLAGS_enable_partitioned_hash_join) { @@ -131,7 +131,7 @@ RowBatch::RowBatch( } // Check whether we have slots that require offset-to-pointer conversion. - if (!row_desc_.HasVarlenSlots()) return; + if (!row_desc_->HasVarlenSlots()) return; // For every unique tuple, convert string offsets contained in tuple data into // pointers. Tuples were serialized in the order we are deserializing them in, @@ -140,7 +140,7 @@ RowBatch::RowBatch( Tuple* last_converted = NULL; for (int i = 0; i < num_rows_; ++i) { for (int j = 0; j < num_tuples_per_row_; ++j) { - const TupleDescriptor* desc = row_desc_.tuple_descriptors()[j]; + const TupleDescriptor* desc = row_desc_->tuple_descriptors()[j]; if (!desc->HasVarlenSlots()) continue; Tuple* tuple = GetRow(i)->GetTuple(j); // Handle NULL or already converted tuples with one check. @@ -182,7 +182,7 @@ Status RowBatch::Serialize(TRowBatch* output_batch, bool full_dedup) { output_batch->compression_type = THdfsCompression::NONE; output_batch->num_rows = num_rows_; - row_desc_.ToThrift(&output_batch->row_tuples); + row_desc_->ToThrift(&output_batch->row_tuples); // As part of the serialization process we deduplicate tuples to avoid serializing a // Tuple multiple times for the RowBatch. By default we only detect duplicate tuples @@ -232,10 +232,10 @@ bool RowBatch::UseFullDedup() { // be common: when a row contains tuples with collections and where there are three or // more tuples per row so non-adjacent duplicate tuples may have been created when // joining tuples from multiple sources into a single row. - if (row_desc_.tuple_descriptors().size() < 3) return false; + if (row_desc_->tuple_descriptors().size() < 3) return false; vector<TupleDescriptor*>::const_iterator tuple_desc = - row_desc_.tuple_descriptors().begin(); - for (; tuple_desc != row_desc_.tuple_descriptors().end(); ++tuple_desc) { + row_desc_->tuple_descriptors().begin(); + for (; tuple_desc != row_desc_->tuple_descriptors().end(); ++tuple_desc) { if (!(*tuple_desc)->collection_slots().empty()) return true; } return false; @@ -260,8 +260,9 @@ void RowBatch::SerializeInternal(int64_t size, DedupMap* distinct_tuples, char* tuple_data = const_cast<char*>(output_batch->tuple_data.c_str()); for (int i = 0; i < num_rows_; ++i) { - vector<TupleDescriptor*>::const_iterator desc = row_desc_.tuple_descriptors().begin(); - for (int j = 0; desc != row_desc_.tuple_descriptors().end(); ++desc, ++j) { + vector<TupleDescriptor*>::const_iterator desc = + row_desc_->tuple_descriptors().begin(); + for (int j = 0; desc != row_desc_->tuple_descriptors().end(); ++desc, ++j) { Tuple* tuple = GetRow(i)->GetTuple(j); if (UNLIKELY(tuple == NULL)) { // NULLs are encoded as -1 @@ -382,7 +383,7 @@ int RowBatch::GetBatchSize(const TRowBatch& batch) { } void RowBatch::AcquireState(RowBatch* src) { - DCHECK(row_desc_.LayoutEquals(src->row_desc_)); + DCHECK(row_desc_->LayoutEquals(*src->row_desc_)); DCHECK_EQ(num_tuples_per_row_, src->num_tuples_per_row_); DCHECK_EQ(tuple_ptrs_size_, src->tuple_ptrs_size_); @@ -405,7 +406,7 @@ void RowBatch::AcquireState(RowBatch* src) { } void RowBatch::DeepCopyTo(RowBatch* dst) { - DCHECK(dst->row_desc_.Equals(row_desc_)); + DCHECK(dst->row_desc_->Equals(*row_desc_)); DCHECK_EQ(dst->num_rows_, 0); DCHECK_GE(dst->capacity_, num_rows_); dst->AddRows(num_rows_); @@ -413,8 +414,8 @@ void RowBatch::DeepCopyTo(RowBatch* dst) { TupleRow* src_row = GetRow(i); TupleRow* dst_row = reinterpret_cast<TupleRow*>(dst->tuple_ptrs_ + i * num_tuples_per_row_); - src_row->DeepCopy(dst_row, row_desc_.tuple_descriptors(), &dst->tuple_data_pool_, - false); + src_row->DeepCopy( + dst_row, row_desc_->tuple_descriptors(), &dst->tuple_data_pool_, false); } dst->CommitRows(num_rows_); } @@ -423,7 +424,7 @@ void RowBatch::DeepCopyTo(RowBatch* dst) { int64_t RowBatch::TotalByteSize(DedupMap* distinct_tuples) { DCHECK(distinct_tuples == NULL || distinct_tuples->size() == 0); int64_t result = 0; - vector<int> tuple_count(row_desc_.tuple_descriptors().size(), 0); + vector<int> tuple_count(row_desc_->tuple_descriptors().size(), 0); // Sum total variable length byte sizes. for (int i = 0; i < num_rows_; ++i) { @@ -435,17 +436,17 @@ int64_t RowBatch::TotalByteSize(DedupMap* distinct_tuples) { // Fast tuple deduplication for adjacent rows. continue; } else if (UNLIKELY(distinct_tuples != NULL)) { - if (row_desc_.tuple_descriptors()[j]->byte_size() == 0) continue; + if (row_desc_->tuple_descriptors()[j]->byte_size() == 0) continue; bool inserted = distinct_tuples->InsertIfNotPresent(tuple, -1); if (!inserted) continue; } - result += tuple->VarlenByteSize(*row_desc_.tuple_descriptors()[j]); + result += tuple->VarlenByteSize(*row_desc_->tuple_descriptors()[j]); ++tuple_count[j]; } } // Compute sum of fixed component of tuple sizes. for (int j = 0; j < num_tuples_per_row_; ++j) { - result += row_desc_.tuple_descriptors()[j]->byte_size() * tuple_count[j]; + result += row_desc_->tuple_descriptors()[j]->byte_size() * tuple_count[j]; } return result; } @@ -453,7 +454,7 @@ int64_t RowBatch::TotalByteSize(DedupMap* distinct_tuples) { Status RowBatch::ResizeAndAllocateTupleBuffer( RuntimeState* state, int64_t* buffer_size, uint8_t** buffer) { return ResizeAndAllocateTupleBuffer( - state, &tuple_data_pool_, row_desc_.GetRowSize(), &capacity_, buffer_size, buffer); + state, &tuple_data_pool_, row_desc_->GetRowSize(), &capacity_, buffer_size, buffer); } Status RowBatch::ResizeAndAllocateTupleBuffer(RuntimeState* state, MemPool* pool, @@ -475,7 +476,7 @@ void RowBatch::VLogRows(const string& context) { if (!VLOG_ROW_IS_ON) return; VLOG_ROW << context << ": #rows=" << num_rows_; for (int i = 0; i < num_rows_; ++i) { - VLOG_ROW << PrintRow(GetRow(i), row_desc_); + VLOG_ROW << PrintRow(GetRow(i), *row_desc_); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/row-batch.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h index 0068478..3cf1093 100644 --- a/be/src/runtime/row-batch.h +++ b/be/src/runtime/row-batch.h @@ -85,15 +85,15 @@ class RowBatch { /// Create RowBatch for a maximum of 'capacity' rows of tuples specified /// by 'row_desc'. /// tracker cannot be NULL. - RowBatch(const RowDescriptor& row_desc, int capacity, MemTracker* tracker); + RowBatch(const RowDescriptor* row_desc, int capacity, MemTracker* tracker); /// Populate a row batch from input_batch by copying input_batch's /// tuple_data into the row batch's mempool and converting all offsets /// in the data back into pointers. /// TODO: figure out how to transfer the data from input_batch to this RowBatch /// (so that we don't need to make yet another copy) - RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch, - MemTracker* tracker); + RowBatch( + const RowDescriptor* row_desc, const TRowBatch& input_batch, MemTracker* tracker); /// Releases all resources accumulated at this row batch. This includes /// - tuple_ptrs @@ -321,7 +321,7 @@ class RowBatch { return tuple_ptrs_size_ / (num_tuples_per_row_ * sizeof(Tuple*)); } - const RowDescriptor& row_desc() const { return row_desc_; } + const RowDescriptor* row_desc() const { return row_desc_; } /// Max memory that this row batch can accumulate before it is considered at capacity. /// This is a soft capacity: row batches may exceed the capacity, preferably only by a @@ -424,8 +424,9 @@ class RowBatch { // Less frequently used members that are not accessed on performance-critical paths // should go below here. - /// Full row descriptor for rows in this batch. - RowDescriptor row_desc_; + /// Full row descriptor for rows in this batch. Owned by the exec node that produced + /// this batch. + const RowDescriptor* row_desc_; MemTracker* mem_tracker_; // not owned http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/sorted-run-merger.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/sorted-run-merger.cc b/be/src/runtime/sorted-run-merger.cc index 36d7a61..673d6c3 100644 --- a/be/src/runtime/sorted-run-merger.cc +++ b/be/src/runtime/sorted-run-merger.cc @@ -126,7 +126,7 @@ void SortedRunMerger::Heapify(int parent_index) { } SortedRunMerger::SortedRunMerger(const TupleRowComparator& comparator, - RowDescriptor* row_desc, RuntimeProfile* profile, bool deep_copy_input) + const RowDescriptor* row_desc, RuntimeProfile* profile, bool deep_copy_input) : comparator_(comparator), input_row_desc_(row_desc), deep_copy_input_(deep_copy_input) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/sorted-run-merger.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/sorted-run-merger.h b/be/src/runtime/sorted-run-merger.h index 563b6f7..99ffbe9 100644 --- a/be/src/runtime/sorted-run-merger.h +++ b/be/src/runtime/sorted-run-merger.h @@ -55,7 +55,7 @@ class SortedRunMerger { /// zero). typedef boost::function<Status (RowBatch**)> RunBatchSupplierFn; - SortedRunMerger(const TupleRowComparator& comparator, RowDescriptor* row_desc, + SortedRunMerger(const TupleRowComparator& comparator, const RowDescriptor* row_desc, RuntimeProfile* profile, bool deep_copy_input); /// Prepare this merger to merge and return rows from the sorted runs in 'input_runs'. @@ -96,7 +96,7 @@ class SortedRunMerger { /// Descriptor for the rows provided by the input runs. Owned by the exec-node through /// which this merger was created. - RowDescriptor* input_row_desc_; + const RowDescriptor* input_row_desc_; /// True if rows must be deep copied into the output batch. bool deep_copy_input_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/sorter.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc index 4c33d7f..b8182e2 100644 --- a/be/src/runtime/sorter.cc +++ b/be/src/runtime/sorter.cc @@ -538,8 +538,8 @@ Status Sorter::Run::AddBatchInternal(RowBatch* batch, int start_index, int* num_ if (!INITIAL_RUN) { // For intermediate merges, the input row is the sort tuple. - DCHECK_EQ(batch->row_desc().tuple_descriptors().size(), 1); - DCHECK_EQ(batch->row_desc().tuple_descriptors()[0], sort_tuple_desc_); + DCHECK_EQ(batch->row_desc()->tuple_descriptors().size(), 1); + DCHECK_EQ(batch->row_desc()->tuple_descriptors()[0], sort_tuple_desc_); } /// Keep initial unsorted runs pinned in memory so we can sort them. @@ -756,8 +756,8 @@ Status Sorter::Run::PrepareRead(bool* pinned_all_blocks) { end_of_fixed_len_block_ = end_of_var_len_block_ = fixed_len_blocks_.empty(); num_tuples_returned_ = 0; - buffered_batch_.reset(new RowBatch(*sorter_->output_row_desc_, - sorter_->state_->batch_size(), sorter_->mem_tracker_)); + buffered_batch_.reset(new RowBatch( + sorter_->output_row_desc_, sorter_->state_->batch_size(), sorter_->mem_tracker_)); // If the run is pinned, all blocks are already pinned, so we're ready to read. if (is_pinned_) { @@ -1587,8 +1587,7 @@ Status Sorter::CreateMerger(int max_num_runs) { } Status Sorter::ExecuteIntermediateMerge(Sorter::Run* merged_run) { - RowBatch intermediate_merge_batch(*output_row_desc_, state_->batch_size(), - mem_tracker_); + RowBatch intermediate_merge_batch(output_row_desc_, state_->batch_size(), mem_tracker_); bool eos = false; while (!eos) { // Copy rows into the new run until done. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/util/debug-util.cc ---------------------------------------------------------------------- diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc index d04550c..6a94496 100644 --- a/be/src/util/debug-util.cc +++ b/be/src/util/debug-util.cc @@ -224,7 +224,7 @@ string PrintRow(TupleRow* row, const RowDescriptor& d) { string PrintBatch(RowBatch* batch) { stringstream out; for (int i = 0; i < batch->num_rows(); ++i) { - out << PrintRow(batch->GetRow(i), batch->row_desc()) << "\n"; + out << PrintRow(batch->GetRow(i), *batch->row_desc()) << "\n"; } return out.str(); }
