http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/buffered-tuple-stream-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc deleted file mode 100644 index 0904833..0000000 --- a/be/src/runtime/buffered-tuple-stream-test.cc +++ /dev/null @@ -1,1264 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include <boost/scoped_ptr.hpp> -#include <boost/bind.hpp> -#include <boost/filesystem.hpp> - -#include <set> -#include <string> -#include <limits> // for std::numeric_limits<int>::max() - -#include "testutil/gtest-util.h" -#include "codegen/llvm-codegen.h" -#include "gutil/gscoped_ptr.h" -#include "runtime/buffered-tuple-stream.inline.h" -#include "runtime/collection-value.h" -#include "runtime/collection-value-builder.h" -#include "runtime/raw-value.h" -#include "runtime/row-batch.h" -#include "runtime/string-value.inline.h" -#include "runtime/test-env.h" -#include "runtime/tmp-file-mgr.h" -#include "service/fe-support.h" -#include "testutil/desc-tbl-builder.h" -#include "util/test-info.h" - -#include "gen-cpp/Types_types.h" -#include "gen-cpp/ImpalaInternalService_types.h" - -#include "common/names.h" - -using kudu::FreeDeleter; - -static const int BATCH_SIZE = 250; -static const int IO_BLOCK_SIZE = 8 * 1024 * 1024; -static const uint32_t PRIME = 479001599; - -namespace impala { - -static const StringValue STRINGS[] = { - StringValue("ABC"), - StringValue("HELLO"), - StringValue("123456789"), - StringValue("FOOBAR"), - StringValue("ONE"), - StringValue("THREE"), - StringValue("abcdefghijklmno"), - StringValue("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"), - StringValue("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"), -}; - -static const int NUM_STRINGS = sizeof(STRINGS) / sizeof(StringValue); - -class SimpleTupleStreamTest : public testing::Test { - protected: - virtual void SetUp() { - test_env_.reset(new TestEnv()); - ASSERT_OK(test_env_->Init()); - - CreateDescriptors(); - - mem_pool_.reset(new MemPool(&tracker_)); - } - - virtual void CreateDescriptors() { - vector<bool> nullable_tuples(1, false); - vector<TTupleId> tuple_ids(1, static_cast<TTupleId>(0)); - - DescriptorTblBuilder int_builder(test_env_->exec_env()->frontend(), &pool_); - int_builder.DeclareTuple() << TYPE_INT; - int_desc_ = pool_.Add(new RowDescriptor( - *int_builder.Build(), tuple_ids, nullable_tuples)); - - DescriptorTblBuilder string_builder(test_env_->exec_env()->frontend(), &pool_); - string_builder.DeclareTuple() << TYPE_STRING; - string_desc_ = pool_.Add(new RowDescriptor( - *string_builder.Build(), tuple_ids, nullable_tuples)); - } - - virtual void TearDown() { - runtime_state_ = NULL; - client_ = NULL; - pool_.Clear(); - mem_pool_->FreeAll(); - test_env_.reset(); - } - - /// Setup a block manager with the provided settings and client with no reservation, - /// tracked by tracker_. - void InitBlockMgr(int64_t limit, int block_size) { - ASSERT_OK(test_env_->CreateQueryStateWithBlockMgr( - 0, limit, block_size, nullptr, &runtime_state_)); - MemTracker* client_tracker = - pool_.Add(new MemTracker(-1, "client", runtime_state_->instance_mem_tracker())); - ASSERT_OK(runtime_state_->block_mgr()->RegisterClient( - "", 0, false, client_tracker, runtime_state_, &client_)); - } - - /// Generate the ith element of a sequence of int values. - int GenIntValue(int i) { - // Multiply by large prime to get varied bit patterns. - return i * PRIME; - } - - /// Generate the ith element of a sequence of bool values. - bool GenBoolValue(int i) { - // Use a middle bit of the int value. - return ((GenIntValue(i) >> 8) & 0x1) != 0; - } - - /// Count the total number of slots per row based on the given row descriptor. - int CountSlotsPerRow(const RowDescriptor& row_desc) { - int slots_per_row = 0; - for (int i = 0; i < row_desc.tuple_descriptors().size(); ++i) { - TupleDescriptor* tuple_desc = row_desc.tuple_descriptors()[i]; - slots_per_row += tuple_desc->slots().size(); - } - return slots_per_row; - } - - /// Allocate a row batch with 'num_rows' of rows with layout described by 'row_desc'. - /// 'offset' is used to account for rows occupied by any previous row batches. This is - /// needed to match the values generated in VerifyResults(). If 'gen_null' is true, - /// some tuples will be set to NULL. - virtual RowBatch* CreateBatch( - const RowDescriptor* row_desc, int offset, int num_rows, bool gen_null) { - RowBatch* batch = pool_.Add(new RowBatch(row_desc, num_rows, &tracker_)); - int num_tuples = row_desc->tuple_descriptors().size(); - - int idx = offset * CountSlotsPerRow(*row_desc); - for (int row_idx = 0; row_idx < num_rows; ++row_idx) { - TupleRow* row = batch->GetRow(row_idx); - for (int tuple_idx = 0; tuple_idx < num_tuples; ++tuple_idx) { - TupleDescriptor* tuple_desc = row_desc->tuple_descriptors()[tuple_idx]; - Tuple* tuple = Tuple::Create(tuple_desc->byte_size(), batch->tuple_data_pool()); - bool is_null = gen_null && !GenBoolValue(idx); - for (int slot_idx = 0; slot_idx < tuple_desc->slots().size(); ++slot_idx, ++idx) { - SlotDescriptor* slot_desc = tuple_desc->slots()[slot_idx]; - void* slot = tuple->GetSlot(slot_desc->tuple_offset()); - switch (slot_desc->type().type) { - case TYPE_INT: - *reinterpret_cast<int*>(slot) = GenIntValue(idx); - break; - case TYPE_STRING: - *reinterpret_cast<StringValue*>(slot) = STRINGS[idx % NUM_STRINGS]; - break; - default: - // The memory has been zero'ed out already by Tuple::Create(). - break; - } - } - if (is_null) { - row->SetTuple(tuple_idx, NULL); - } else { - row->SetTuple(tuple_idx, tuple); - } - } - batch->CommitLastRow(); - } - return batch; - } - - virtual RowBatch* CreateIntBatch(int offset, int num_rows, bool gen_null) { - return CreateBatch(int_desc_, offset, num_rows, gen_null); - } - - virtual RowBatch* CreateStringBatch(int offset, int num_rows, bool gen_null) { - return CreateBatch(string_desc_, offset, num_rows, gen_null); - } - - void AppendValue(uint8_t* ptr, vector<int>* results) { - if (ptr == NULL) { - // For the tests indicate null-ability using the max int value - results->push_back(std::numeric_limits<int>::max()); - } else { - results->push_back(*reinterpret_cast<int*>(ptr)); - } - } - - void AppendValue(uint8_t* ptr, vector<StringValue>* results) { - if (ptr == NULL) { - results->push_back(StringValue()); - } else { - StringValue sv = *reinterpret_cast<StringValue*>(ptr); - uint8_t* copy = mem_pool_->Allocate(sv.len); - memcpy(copy, sv.ptr, sv.len); - sv.ptr = reinterpret_cast<char*>(copy); - results->push_back(sv); - } - } - - template <typename T> - void AppendRowTuples(TupleRow* row, RowDescriptor* row_desc, vector<T>* results) { - DCHECK(row != NULL); - const int num_tuples = row_desc->tuple_descriptors().size(); - - for (int tuple_idx = 0; tuple_idx < num_tuples; ++tuple_idx) { - TupleDescriptor* tuple_desc = row_desc->tuple_descriptors()[tuple_idx]; - Tuple* tuple = row->GetTuple(tuple_idx); - const int num_slots = tuple_desc->slots().size(); - for (int slot_idx = 0; slot_idx < num_slots; ++slot_idx) { - SlotDescriptor* slot_desc = tuple_desc->slots()[slot_idx]; - if (tuple == NULL) { - AppendValue(NULL, results); - } else { - void* slot = tuple->GetSlot(slot_desc->tuple_offset()); - AppendValue(reinterpret_cast<uint8_t*>(slot), results); - } - } - } - } - - template <typename T> - void ReadValues(BufferedTupleStream* stream, RowDescriptor* desc, vector<T>* results, - int num_batches = -1) { - bool eos = false; - RowBatch batch(desc, BATCH_SIZE, &tracker_); - int batches_read = 0; - do { - batch.Reset(); - EXPECT_OK(stream->GetNext(&batch, &eos)); - ++batches_read; - for (int i = 0; i < batch.num_rows(); ++i) { - AppendRowTuples(batch.GetRow(i), desc, results); - } - } while (!eos && (num_batches < 0 || batches_read <= num_batches)); - } - - void GetExpectedValue(int idx, bool is_null, int* val) { - if (is_null) { - *val = std::numeric_limits<int>::max(); - } else { - *val = GenIntValue(idx); - } - } - - void GetExpectedValue(int idx, bool is_null, StringValue* val) { - if (is_null) { - *val = StringValue(); - } else { - *val = STRINGS[idx % NUM_STRINGS]; - } - } - - template <typename T> - void VerifyResults(const RowDescriptor& row_desc, const vector<T>& results, - int num_rows, bool gen_null) { - int idx = 0; - for (int row_idx = 0; row_idx < num_rows; ++row_idx) { - const int num_tuples = row_desc.tuple_descriptors().size(); - for (int tuple_idx = 0; tuple_idx < num_tuples; ++tuple_idx) { - const TupleDescriptor* tuple_desc = row_desc.tuple_descriptors()[tuple_idx]; - const int num_slots = tuple_desc->slots().size(); - bool is_null = gen_null && !GenBoolValue(idx); - for (int slot_idx = 0; slot_idx < num_slots; ++slot_idx, ++idx) { - T expected_val; - GetExpectedValue(idx, is_null, &expected_val); - ASSERT_EQ(results[idx], expected_val) - << "results[" << idx << "] " << results[idx] << " != " - << expected_val << " row_idx=" << row_idx - << " tuple_idx=" << tuple_idx << " slot_idx=" << slot_idx - << " gen_null=" << gen_null; - } - } - } - DCHECK_EQ(results.size(), idx); - } - - // Test adding num_batches of ints to the stream and reading them back. - // If unpin_stream is true, operate the stream in unpinned mode. - // Assumes that enough buffers are available to read and write the stream. - template <typename T> - void TestValues(int num_batches, RowDescriptor* desc, bool gen_null, - bool unpin_stream, int num_rows = BATCH_SIZE, bool use_small_buffers = true) { - BufferedTupleStream stream(runtime_state_, desc, runtime_state_->block_mgr(), client_, - use_small_buffers, false); - ASSERT_OK(stream.Init(-1, NULL, true)); - bool got_write_buffer; - ASSERT_OK(stream.PrepareForWrite(&got_write_buffer)); - ASSERT_TRUE(got_write_buffer); - - if (unpin_stream) { - ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT)); - } - // Add rows to the stream - int offset = 0; - for (int i = 0; i < num_batches; ++i) { - RowBatch* batch = NULL; - - Status status; - ASSERT_TRUE(sizeof(T) == sizeof(int) || sizeof(T) == sizeof(StringValue)); - batch = CreateBatch(desc, offset, num_rows, gen_null); - for (int j = 0; j < batch->num_rows(); ++j) { - bool b = stream.AddRow(batch->GetRow(j), &status); - ASSERT_OK(status); - if (!b) { - ASSERT_TRUE(stream.using_small_buffers()); - bool got_buffer; - ASSERT_OK(stream.SwitchToIoBuffers(&got_buffer)); - ASSERT_TRUE(got_buffer); - b = stream.AddRow(batch->GetRow(j), &status); - ASSERT_OK(status); - } - ASSERT_TRUE(b); - } - offset += batch->num_rows(); - // Reset the batch to make sure the stream handles the memory correctly. - batch->Reset(); - } - - bool got_read_buffer; - ASSERT_OK(stream.PrepareForRead(false, &got_read_buffer)); - ASSERT_TRUE(got_read_buffer); - - // Read all the rows back - vector<T> results; - ReadValues(&stream, desc, &results); - - // Verify result - VerifyResults<T>(*desc, results, num_rows * num_batches, gen_null); - - stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES); - } - - void TestIntValuesInterleaved(int num_batches, int num_batches_before_read, - bool unpin_stream) { - for (int small_buffers = 0; small_buffers < 2; ++small_buffers) { - BufferedTupleStream stream(runtime_state_, int_desc_, runtime_state_->block_mgr(), - client_, small_buffers == 0, // initial small buffers - true); // read_write - ASSERT_OK(stream.Init(-1, NULL, true)); - bool got_write_buffer; - ASSERT_OK(stream.PrepareForWrite(&got_write_buffer)); - ASSERT_TRUE(got_write_buffer); - bool got_read_buffer; - ASSERT_OK(stream.PrepareForRead(true, &got_read_buffer)); - ASSERT_TRUE(got_read_buffer); - if (unpin_stream) { - ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT)); - } - - vector<int> results; - - for (int i = 0; i < num_batches; ++i) { - RowBatch* batch = CreateIntBatch(i * BATCH_SIZE, BATCH_SIZE, false); - for (int j = 0; j < batch->num_rows(); ++j) { - Status status; - bool b = stream.AddRow(batch->GetRow(j), &status); - ASSERT_TRUE(b); - ASSERT_OK(status); - } - // Reset the batch to make sure the stream handles the memory correctly. - batch->Reset(); - if (i % num_batches_before_read == 0) { - ReadValues(&stream, int_desc_, &results, - (rand() % num_batches_before_read) + 1); - } - } - ReadValues(&stream, int_desc_, &results); - - VerifyResults<int>(*int_desc_, results, BATCH_SIZE * num_batches, false); - - stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES); - } - } - - void TestUnpinPin(bool varlen_data); - - void TestTransferMemory(bool pinned_stream, bool read_write); - - scoped_ptr<TestEnv> test_env_; - RuntimeState* runtime_state_; - BufferedBlockMgr::Client* client_; - - MemTracker tracker_; - ObjectPool pool_; - RowDescriptor* int_desc_; - RowDescriptor* string_desc_; - scoped_ptr<MemPool> mem_pool_; -}; - - -// Tests with a non-NULLable tuple per row. -class SimpleNullStreamTest : public SimpleTupleStreamTest { - protected: - virtual void CreateDescriptors() { - vector<bool> nullable_tuples(1, true); - vector<TTupleId> tuple_ids(1, static_cast<TTupleId>(0)); - - DescriptorTblBuilder int_builder(test_env_->exec_env()->frontend(), &pool_); - int_builder.DeclareTuple() << TYPE_INT; - int_desc_ = pool_.Add(new RowDescriptor( - *int_builder.Build(), tuple_ids, nullable_tuples)); - - DescriptorTblBuilder string_builder(test_env_->exec_env()->frontend(), &pool_); - string_builder.DeclareTuple() << TYPE_STRING; - string_desc_ = pool_.Add(new RowDescriptor( - *string_builder.Build(), tuple_ids, nullable_tuples)); - } -}; // SimpleNullStreamTest - -// Tests with multiple non-NULLable tuples per row. -class MultiTupleStreamTest : public SimpleTupleStreamTest { - protected: - virtual void CreateDescriptors() { - vector<bool> nullable_tuples; - nullable_tuples.push_back(false); - nullable_tuples.push_back(false); - nullable_tuples.push_back(false); - - vector<TTupleId> tuple_ids; - tuple_ids.push_back(static_cast<TTupleId>(0)); - tuple_ids.push_back(static_cast<TTupleId>(1)); - tuple_ids.push_back(static_cast<TTupleId>(2)); - - DescriptorTblBuilder int_builder(test_env_->exec_env()->frontend(), &pool_); - int_builder.DeclareTuple() << TYPE_INT; - int_builder.DeclareTuple() << TYPE_INT; - int_builder.DeclareTuple() << TYPE_INT; - int_desc_ = pool_.Add(new RowDescriptor( - *int_builder.Build(), tuple_ids, nullable_tuples)); - - DescriptorTblBuilder string_builder(test_env_->exec_env()->frontend(), &pool_); - string_builder.DeclareTuple() << TYPE_STRING; - string_builder.DeclareTuple() << TYPE_STRING; - string_builder.DeclareTuple() << TYPE_STRING; - string_desc_ = pool_.Add(new RowDescriptor( - *string_builder.Build(), tuple_ids, nullable_tuples)); - } -}; - -// Tests with multiple NULLable tuples per row. -class MultiNullableTupleStreamTest : public SimpleTupleStreamTest { - protected: - virtual void CreateDescriptors() { - vector<bool> nullable_tuples; - nullable_tuples.push_back(false); - nullable_tuples.push_back(true); - nullable_tuples.push_back(true); - - vector<TTupleId> tuple_ids; - tuple_ids.push_back(static_cast<TTupleId>(0)); - tuple_ids.push_back(static_cast<TTupleId>(1)); - tuple_ids.push_back(static_cast<TTupleId>(2)); - - DescriptorTblBuilder int_builder(test_env_->exec_env()->frontend(), &pool_); - int_builder.DeclareTuple() << TYPE_INT; - int_builder.DeclareTuple() << TYPE_INT; - int_builder.DeclareTuple() << TYPE_INT; - int_desc_ = pool_.Add(new RowDescriptor( - *int_builder.Build(), tuple_ids, nullable_tuples)); - - DescriptorTblBuilder string_builder(test_env_->exec_env()->frontend(), &pool_); - string_builder.DeclareTuple() << TYPE_STRING; - string_builder.DeclareTuple() << TYPE_STRING; - string_builder.DeclareTuple() << TYPE_STRING; - string_desc_ = pool_.Add(new RowDescriptor( - *string_builder.Build(), tuple_ids, nullable_tuples)); - } -}; - -/// Tests with collection types. -class ArrayTupleStreamTest : public SimpleTupleStreamTest { - protected: - RowDescriptor* array_desc_; - - virtual void CreateDescriptors() { - // tuples: (array<string>, array<array<int>>) (array<int>) - vector<bool> nullable_tuples(2, true); - vector<TTupleId> tuple_ids; - tuple_ids.push_back(static_cast<TTupleId>(0)); - tuple_ids.push_back(static_cast<TTupleId>(1)); - ColumnType string_array_type; - string_array_type.type = TYPE_ARRAY; - string_array_type.children.push_back(TYPE_STRING); - - ColumnType int_array_type; - int_array_type.type = TYPE_ARRAY; - int_array_type.children.push_back(TYPE_STRING); - - ColumnType nested_array_type; - nested_array_type.type = TYPE_ARRAY; - nested_array_type.children.push_back(int_array_type); - - DescriptorTblBuilder builder(test_env_->exec_env()->frontend(), &pool_); - builder.DeclareTuple() << string_array_type << nested_array_type; - builder.DeclareTuple() << int_array_type; - array_desc_ = pool_.Add(new RowDescriptor( - *builder.Build(), tuple_ids, nullable_tuples)); - } -}; - -// Basic API test. No data should be going to disk. -TEST_F(SimpleTupleStreamTest, Basic) { - InitBlockMgr(-1, IO_BLOCK_SIZE); - TestValues<int>(1, int_desc_, false, true); - TestValues<int>(10, int_desc_, false, true); - TestValues<int>(100, int_desc_, false, true); - TestValues<int>(1, int_desc_, false, false); - TestValues<int>(10, int_desc_, false, false); - TestValues<int>(100, int_desc_, false, false); - - TestValues<StringValue>(1, string_desc_, false, true); - TestValues<StringValue>(10, string_desc_, false, true); - TestValues<StringValue>(100, string_desc_, false, true); - TestValues<StringValue>(1, string_desc_, false, false); - TestValues<StringValue>(10, string_desc_, false, false); - TestValues<StringValue>(100, string_desc_, false, false); - - TestIntValuesInterleaved(1, 1, true); - TestIntValuesInterleaved(10, 5, true); - TestIntValuesInterleaved(100, 15, true); - TestIntValuesInterleaved(1, 1, false); - TestIntValuesInterleaved(10, 5, false); - TestIntValuesInterleaved(100, 15, false); -} - -// Test with only 1 buffer. -TEST_F(SimpleTupleStreamTest, OneBufferSpill) { - // Each buffer can only hold 100 ints, so this spills quite often. - int buffer_size = 100 * sizeof(int); - InitBlockMgr(buffer_size, buffer_size); - TestValues<int>(1, int_desc_, false, true); - TestValues<int>(10, int_desc_, false, true); - - TestValues<StringValue>(1, string_desc_, false, true); - TestValues<StringValue>(10, string_desc_, false, true); -} - -// Test with a few buffers. -TEST_F(SimpleTupleStreamTest, ManyBufferSpill) { - int buffer_size = 100 * sizeof(int); - InitBlockMgr(10 * buffer_size, buffer_size); - - TestValues<int>(1, int_desc_, false, true); - TestValues<int>(10, int_desc_, false, true); - TestValues<int>(100, int_desc_, false, true); - TestValues<StringValue>(1, string_desc_, false, true); - TestValues<StringValue>(10, string_desc_, false, true); - TestValues<StringValue>(100, string_desc_, false, true); - - TestIntValuesInterleaved(1, 1, true); - TestIntValuesInterleaved(10, 5, true); - TestIntValuesInterleaved(100, 15, true); -} - -void SimpleTupleStreamTest::TestUnpinPin(bool varlen_data) { - int buffer_size = 100 * sizeof(int); - InitBlockMgr(3 * buffer_size, buffer_size); - RowDescriptor* row_desc = varlen_data ? string_desc_ : int_desc_; - - BufferedTupleStream stream( - runtime_state_, row_desc, runtime_state_->block_mgr(), client_, true, false); - ASSERT_OK(stream.Init(-1, NULL, true)); - bool got_write_buffer; - ASSERT_OK(stream.PrepareForWrite(&got_write_buffer)); - ASSERT_TRUE(got_write_buffer); - - int offset = 0; - bool full = false; - while (!full) { - RowBatch* batch = varlen_data ? CreateStringBatch(offset, BATCH_SIZE, false) - : CreateIntBatch(offset, BATCH_SIZE, false); - int j = 0; - for (; j < batch->num_rows(); ++j) { - Status status; - full = !stream.AddRow(batch->GetRow(j), &status); - ASSERT_OK(status); - if (full) break; - } - offset += j; - } - - ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT)); - - bool pinned = false; - ASSERT_OK(stream.PinStream(false, &pinned)); - ASSERT_TRUE(pinned); - - - // Read and verify result a few times. We should be able to reread the stream if - // we don't use delete on read mode. - int read_iters = 3; - for (int i = 0; i < read_iters; ++i) { - bool delete_on_read = i == read_iters - 1; - bool got_read_buffer; - ASSERT_OK(stream.PrepareForRead(delete_on_read, &got_read_buffer)); - ASSERT_TRUE(got_read_buffer); - - if (varlen_data) { - vector<StringValue> results; - ReadValues(&stream, row_desc, &results); - VerifyResults<StringValue>(*string_desc_, results, offset, false); - } else { - vector<int> results; - ReadValues(&stream, row_desc, &results); - VerifyResults<int>(*int_desc_, results, offset, false); - } - } - - // After delete_on_read, all blocks aside from the last should be deleted. - // Note: this should really be 0, but the BufferedTupleStream returns eos before - // deleting the last block, rather than after, so the last block isn't deleted - // until the stream is closed. - ASSERT_EQ(stream.bytes_in_mem(false), buffer_size); - - stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES); - - ASSERT_EQ(stream.bytes_in_mem(false), 0); -} - -TEST_F(SimpleTupleStreamTest, UnpinPin) { - TestUnpinPin(false); -} - -TEST_F(SimpleTupleStreamTest, UnpinPinVarlen) { - TestUnpinPin(false); -} - -TEST_F(SimpleTupleStreamTest, SmallBuffers) { - int buffer_size = IO_BLOCK_SIZE; - InitBlockMgr(2 * buffer_size, buffer_size); - - BufferedTupleStream stream( - runtime_state_, int_desc_, runtime_state_->block_mgr(), client_, true, false); - ASSERT_OK(stream.Init(-1, NULL, false)); - bool got_write_buffer; - ASSERT_OK(stream.PrepareForWrite(&got_write_buffer)); - ASSERT_TRUE(got_write_buffer); - - // Initial buffer should be small. - EXPECT_LT(stream.bytes_in_mem(false), buffer_size); - - RowBatch* batch = CreateIntBatch(0, 1024, false); - - Status status; - for (int i = 0; i < batch->num_rows(); ++i) { - bool ret = stream.AddRow(batch->GetRow(i), &status); - EXPECT_TRUE(ret); - ASSERT_OK(status); - } - EXPECT_LT(stream.bytes_in_mem(false), buffer_size); - EXPECT_LT(stream.byte_size(), buffer_size); - ASSERT_TRUE(stream.using_small_buffers()); - - // 40 MB of ints - batch = CreateIntBatch(0, 10 * 1024 * 1024, false); - for (int i = 0; i < batch->num_rows(); ++i) { - bool ret = stream.AddRow(batch->GetRow(i), &status); - ASSERT_OK(status); - if (!ret) { - ASSERT_TRUE(stream.using_small_buffers()); - bool got_buffer; - ASSERT_OK(stream.SwitchToIoBuffers(&got_buffer)); - ASSERT_TRUE(got_buffer); - ret = stream.AddRow(batch->GetRow(i), &status); - ASSERT_OK(status); - } - ASSERT_TRUE(ret); - } - EXPECT_EQ(stream.bytes_in_mem(false), buffer_size); - - // TODO: Test for IMPALA-2330. In case SwitchToIoBuffers() fails to get buffer then - // using_small_buffers() should still return true. - stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES); -} - -void SimpleTupleStreamTest::TestTransferMemory(bool pin_stream, bool read_write) { - // Use smaller buffers so that the explicit FLUSH_RESOURCES flag is required to - // make the batch at capacity. - int buffer_size = 4 * 1024; - InitBlockMgr(100 * buffer_size, buffer_size); - - BufferedTupleStream stream( - runtime_state_, int_desc_, runtime_state_->block_mgr(), client_, false, read_write); - ASSERT_OK(stream.Init(-1, NULL, pin_stream)); - bool got_write_buffer; - ASSERT_OK(stream.PrepareForWrite(&got_write_buffer)); - ASSERT_TRUE(got_write_buffer); - RowBatch* batch = CreateIntBatch(0, 1024, false); - - // Construct a stream with 4 blocks. - const int total_num_blocks = 4; - while (stream.byte_size() < total_num_blocks * buffer_size) { - Status status; - for (int i = 0; i < batch->num_rows(); ++i) { - bool ret = stream.AddRow(batch->GetRow(i), &status); - EXPECT_TRUE(ret); - ASSERT_OK(status); - } - } - - bool got_read_buffer; - ASSERT_OK(stream.PrepareForRead(true, &got_read_buffer)); - ASSERT_TRUE(got_read_buffer); - - batch->Reset(); - stream.Close(batch, RowBatch::FlushMode::FLUSH_RESOURCES); - if (pin_stream) { - DCHECK_EQ(total_num_blocks, batch->num_blocks()); - } else if (read_write) { - // Read and write block should be attached. - DCHECK_EQ(2, batch->num_blocks()); - } else { - // Read block should be attached. - DCHECK_EQ(1, batch->num_blocks()); - } - DCHECK(batch->AtCapacity()); // Flush resources flag should have been set. - batch->Reset(); - DCHECK_EQ(0, batch->num_blocks()); -} - -/// Test attaching memory to a row batch from a pinned stream. -TEST_F(SimpleTupleStreamTest, TransferMemoryFromPinnedStreamReadWrite) { - TestTransferMemory(true, true); -} - -TEST_F(SimpleTupleStreamTest, TransferMemoryFromPinnedStreamNoReadWrite) { - TestTransferMemory(true, false); -} - -/// Test attaching memory to a row batch from an unpinned stream. -TEST_F(SimpleTupleStreamTest, TransferMemoryFromUnpinnedStreamReadWrite) { - TestTransferMemory(false, true); -} - -TEST_F(SimpleTupleStreamTest, TransferMemoryFromUnpinnedStreamNoReadWrite) { - TestTransferMemory(false, false); -} - -// Test that tuple stream functions if it references strings outside stream. The -// aggregation node relies on this since it updates tuples in-place. -TEST_F(SimpleTupleStreamTest, StringsOutsideStream) { - int buffer_size = 8 * 1024 * 1024; - InitBlockMgr(2 * buffer_size, buffer_size); - Status status = Status::OK(); - - int num_batches = 100; - int rows_added = 0; - DCHECK_EQ(string_desc_->tuple_descriptors().size(), 1); - TupleDescriptor& tuple_desc = *string_desc_->tuple_descriptors()[0]; - - set<SlotId> external_slots; - for (int i = 0; i < tuple_desc.string_slots().size(); ++i) { - external_slots.insert(tuple_desc.string_slots()[i]->id()); - } - - BufferedTupleStream stream(runtime_state_, string_desc_, runtime_state_->block_mgr(), - client_, true, false, external_slots); - for (int i = 0; i < num_batches; ++i) { - RowBatch* batch = CreateStringBatch(rows_added, BATCH_SIZE, false); - for (int j = 0; j < batch->num_rows(); ++j) { - uint8_t* varlen_data; - int fixed_size = tuple_desc.byte_size(); - uint8_t* tuple = stream.AllocateRow(fixed_size, 0, &varlen_data, &status); - ASSERT_TRUE(tuple != NULL); - ASSERT_TRUE(status.ok()); - // Copy fixed portion in, but leave it pointing to row batch's varlen data. - memcpy(tuple, batch->GetRow(j)->GetTuple(0), fixed_size); - } - rows_added += batch->num_rows(); - } - - DCHECK_EQ(rows_added, stream.num_rows()); - - for (int delete_on_read = 0; delete_on_read <= 1; ++delete_on_read) { - // Keep stream in memory and test we can read ok. - vector<StringValue> results; - bool got_read_buffer; - ASSERT_OK(stream.PrepareForRead(delete_on_read, &got_read_buffer)); - ASSERT_TRUE(got_read_buffer); - ReadValues(&stream, string_desc_, &results); - VerifyResults<StringValue>(*string_desc_, results, rows_added, false); - } - - stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES); -} - -// Construct a big row by stiching together many tuples so the total row size -// will be close to the IO block size. With null indicators, stream will fail to -// be initialized; Without null indicators, things should work fine. -TEST_F(SimpleTupleStreamTest, BigRow) { - InitBlockMgr(2 * IO_BLOCK_SIZE, IO_BLOCK_SIZE); - vector<TupleId> tuple_ids; - vector<bool> nullable_tuples; - vector<bool> non_nullable_tuples; - - DescriptorTblBuilder big_row_builder(test_env_->exec_env()->frontend(), &pool_); - // Each tuple contains 8 slots of TYPE_INT and a single byte for null indicator. - const int num_tuples = IO_BLOCK_SIZE / (8 * sizeof(int) + 1); - for (int tuple_idx = 0; tuple_idx < num_tuples; ++tuple_idx) { - big_row_builder.DeclareTuple() << TYPE_INT << TYPE_INT << TYPE_INT << TYPE_INT - << TYPE_INT << TYPE_INT << TYPE_INT << TYPE_INT; - tuple_ids.push_back(static_cast<TTupleId>(tuple_idx)); - nullable_tuples.push_back(true); - non_nullable_tuples.push_back(false); - } - DescriptorTbl *desc = big_row_builder.Build(); - - // Construct a big row with all non-nullable tuples. - RowDescriptor* row_desc = pool_.Add(new RowDescriptor( - *desc, tuple_ids, non_nullable_tuples)); - ASSERT_FALSE(row_desc->IsAnyTupleNullable()); - // Test writing this row into the stream and then reading it back. - TestValues<int>(1, row_desc, false, false, 1, false); - TestValues<int>(1, row_desc, false, true, 1, false); - - // Construct a big row with nullable tuples. This requires space for null indicators - // in the stream which, as a result, will fail to initialize. - RowDescriptor* nullable_row_desc = pool_.Add(new RowDescriptor( - *desc, tuple_ids, nullable_tuples)); - ASSERT_TRUE(nullable_row_desc->IsAnyTupleNullable()); - BufferedTupleStream nullable_stream(runtime_state_, nullable_row_desc, - runtime_state_->block_mgr(), client_, false, false); - Status status = nullable_stream.Init(-1, NULL, true); - ASSERT_FALSE(status.ok()); - nullable_stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES); -} - -// Test for IMPALA-3923: overflow of 32-bit int in GetRows(). -TEST_F(SimpleTupleStreamTest, TestGetRowsOverflow) { - InitBlockMgr(-1, 8 * 1024 * 1024); - BufferedTupleStream stream( - runtime_state_, int_desc_, runtime_state_->block_mgr(), client_, false, false); - ASSERT_OK(stream.Init(-1, NULL, true)); - - Status status; - // Add more rows than can be fit in a RowBatch (limited by its 32-bit row count). - // Actually adding the rows would take a very long time, so just set num_rows_. - // This puts the stream in an inconsistent state, but exercises the right code path. - stream.num_rows_ = 1L << 33; - bool got_rows; - scoped_ptr<RowBatch> overflow_batch; - ASSERT_FALSE(stream.GetRows(&overflow_batch, &got_rows).ok()); - stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES); -} - -// Basic API test. No data should be going to disk. -TEST_F(SimpleNullStreamTest, Basic) { - InitBlockMgr(-1, IO_BLOCK_SIZE); - TestValues<int>(1, int_desc_, false, true); - TestValues<int>(10, int_desc_, false, true); - TestValues<int>(100, int_desc_, false, true); - TestValues<int>(1, int_desc_, true, true); - TestValues<int>(10, int_desc_, true, true); - TestValues<int>(100, int_desc_, true, true); - TestValues<int>(1, int_desc_, false, false); - TestValues<int>(10, int_desc_, false, false); - TestValues<int>(100, int_desc_, false, false); - TestValues<int>(1, int_desc_, true, false); - TestValues<int>(10, int_desc_, true, false); - TestValues<int>(100, int_desc_, true, false); - - TestValues<StringValue>(1, string_desc_, false, true); - TestValues<StringValue>(10, string_desc_, false, true); - TestValues<StringValue>(100, string_desc_, false, true); - TestValues<StringValue>(1, string_desc_, true, true); - TestValues<StringValue>(10, string_desc_, true, true); - TestValues<StringValue>(100, string_desc_, true, true); - TestValues<StringValue>(1, string_desc_, false, false); - TestValues<StringValue>(10, string_desc_, false, false); - TestValues<StringValue>(100, string_desc_, false, false); - TestValues<StringValue>(1, string_desc_, true, false); - TestValues<StringValue>(10, string_desc_, true, false); - TestValues<StringValue>(100, string_desc_, true, false); - - TestIntValuesInterleaved(1, 1, true); - TestIntValuesInterleaved(10, 5, true); - TestIntValuesInterleaved(100, 15, true); - TestIntValuesInterleaved(1, 1, false); - TestIntValuesInterleaved(10, 5, false); - TestIntValuesInterleaved(100, 15, false); -} - -// Test tuple stream with only 1 buffer and rows with multiple tuples. -TEST_F(MultiTupleStreamTest, MultiTupleOneBufferSpill) { - // Each buffer can only hold 100 ints, so this spills quite often. - int buffer_size = 100 * sizeof(int); - InitBlockMgr(buffer_size, buffer_size); - TestValues<int>(1, int_desc_, false, true); - TestValues<int>(10, int_desc_, false, true); - - TestValues<StringValue>(1, string_desc_, false, true); - TestValues<StringValue>(10, string_desc_, false, true); -} - -// Test with a few buffers and rows with multiple tuples. -TEST_F(MultiTupleStreamTest, MultiTupleManyBufferSpill) { - int buffer_size = 100 * sizeof(int); - InitBlockMgr(10 * buffer_size, buffer_size); - - TestValues<int>(1, int_desc_, false, true); - TestValues<int>(10, int_desc_, false, true); - TestValues<int>(100, int_desc_, false, true); - - TestValues<StringValue>(1, string_desc_, false, true); - TestValues<StringValue>(10, string_desc_, false, true); - TestValues<StringValue>(100, string_desc_, false, true); - - TestIntValuesInterleaved(1, 1, true); - TestIntValuesInterleaved(10, 5, true); - TestIntValuesInterleaved(100, 15, true); -} - -// Test that we can allocate a row in the stream and copy in multiple tuples then -// read it back from the stream. -TEST_F(MultiTupleStreamTest, MultiTupleAllocateRow) { - // Use small buffers so it will be flushed to disk. - int buffer_size = 4 * 1024; - InitBlockMgr(2 * buffer_size, buffer_size); - Status status = Status::OK(); - - int num_batches = 1; - int rows_added = 0; - BufferedTupleStream stream( - runtime_state_, string_desc_, runtime_state_->block_mgr(), client_, false, false); - ASSERT_OK(stream.Init(-1, NULL, false)); - bool got_write_buffer; - ASSERT_OK(stream.PrepareForWrite(&got_write_buffer)); - ASSERT_TRUE(got_write_buffer); - - for (int i = 0; i < num_batches; ++i) { - RowBatch* batch = CreateStringBatch(rows_added, 1, false); - for (int j = 0; j < batch->num_rows(); ++j) { - TupleRow* row = batch->GetRow(j); - int64_t fixed_size = 0; - int64_t varlen_size = 0; - for (int k = 0; k < string_desc_->tuple_descriptors().size(); k++) { - TupleDescriptor* tuple_desc = string_desc_->tuple_descriptors()[k]; - fixed_size += tuple_desc->byte_size(); - varlen_size += row->GetTuple(k)->VarlenByteSize(*tuple_desc); - } - uint8_t* varlen_data; - uint8_t* fixed_data = stream.AllocateRow(fixed_size, varlen_size, &varlen_data, - &status); - ASSERT_TRUE(fixed_data != NULL); - ASSERT_TRUE(status.ok()); - uint8_t* varlen_write_ptr = varlen_data; - for (int k = 0; k < string_desc_->tuple_descriptors().size(); k++) { - TupleDescriptor* tuple_desc = string_desc_->tuple_descriptors()[k]; - Tuple* src = row->GetTuple(k); - Tuple* dst = reinterpret_cast<Tuple*>(fixed_data); - fixed_data += tuple_desc->byte_size(); - memcpy(dst, src, tuple_desc->byte_size()); - for (int l = 0; l < tuple_desc->slots().size(); l++) { - SlotDescriptor* slot = tuple_desc->slots()[l]; - StringValue* src_string = src->GetStringSlot(slot->tuple_offset()); - StringValue* dst_string = dst->GetStringSlot(slot->tuple_offset()); - dst_string->ptr = reinterpret_cast<char*>(varlen_write_ptr); - memcpy(dst_string->ptr, src_string->ptr, src_string->len); - varlen_write_ptr += src_string->len; - } - } - ASSERT_EQ(varlen_data + varlen_size, varlen_write_ptr); - } - rows_added += batch->num_rows(); - } - - for (int i = 0; i < 3; ++i) { - bool delete_on_read = i == 2; - vector<StringValue> results; - bool got_read_buffer; - stream.PrepareForRead(delete_on_read, &got_read_buffer); - ASSERT_TRUE(got_read_buffer); - ReadValues(&stream, string_desc_, &results); - VerifyResults<StringValue>(*string_desc_, results, rows_added, false); - } - - stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES); -} - -// Test with rows with multiple nullable tuples. -TEST_F(MultiNullableTupleStreamTest, MultiNullableTupleOneBufferSpill) { - // Each buffer can only hold 100 ints, so this spills quite often. - int buffer_size = 100 * sizeof(int); - InitBlockMgr(buffer_size, buffer_size); - TestValues<int>(1, int_desc_, false, true); - TestValues<int>(10, int_desc_, false, true); - TestValues<int>(1, int_desc_, true, true); - TestValues<int>(10, int_desc_, true, true); - - TestValues<StringValue>(1, string_desc_, false, true); - TestValues<StringValue>(10, string_desc_, false, true); - TestValues<StringValue>(1, string_desc_, true, true); - TestValues<StringValue>(10, string_desc_, true, true); -} - -// Test with a few buffers. -TEST_F(MultiNullableTupleStreamTest, MultiNullableTupleManyBufferSpill) { - int buffer_size = 100 * sizeof(int); - InitBlockMgr(10 * buffer_size, buffer_size); - - TestValues<int>(1, int_desc_, false, true); - TestValues<int>(10, int_desc_, false, true); - TestValues<int>(100, int_desc_, false, true); - TestValues<int>(1, int_desc_, true, true); - TestValues<int>(10, int_desc_, true, true); - TestValues<int>(100, int_desc_, true, true); - - TestValues<StringValue>(1, string_desc_, false, true); - TestValues<StringValue>(10, string_desc_, false, true); - TestValues<StringValue>(100, string_desc_, false, true); - TestValues<StringValue>(1, string_desc_, true, true); - TestValues<StringValue>(10, string_desc_, true, true); - TestValues<StringValue>(100, string_desc_, true, true); - - TestIntValuesInterleaved(1, 1, true); - TestIntValuesInterleaved(10, 5, true); - TestIntValuesInterleaved(100, 15, true); -} - -/// Test that ComputeRowSize handles nulls -TEST_F(MultiNullableTupleStreamTest, TestComputeRowSize) { - InitBlockMgr(-1, 8 * 1024 * 1024); - const vector<TupleDescriptor*>& tuple_descs = string_desc_->tuple_descriptors(); - // String in second tuple is stored externally. - set<SlotId> external_slots; - const SlotDescriptor* external_string_slot = tuple_descs[1]->slots()[0]; - external_slots.insert(external_string_slot->id()); - - BufferedTupleStream stream(runtime_state_, string_desc_, runtime_state_->block_mgr(), - client_, false, false, external_slots); - gscoped_ptr<TupleRow, FreeDeleter> row(reinterpret_cast<TupleRow*>( - malloc(tuple_descs.size() * sizeof(Tuple*)))); - gscoped_ptr<Tuple, FreeDeleter> tuple0(reinterpret_cast<Tuple*>( - malloc(tuple_descs[0]->byte_size()))); - gscoped_ptr<Tuple, FreeDeleter> tuple1(reinterpret_cast<Tuple*>( - malloc(tuple_descs[1]->byte_size()))); - gscoped_ptr<Tuple, FreeDeleter> tuple2(reinterpret_cast<Tuple*>( - malloc(tuple_descs[2]->byte_size()))); - memset(tuple0.get(), 0, tuple_descs[0]->byte_size()); - memset(tuple1.get(), 0, tuple_descs[1]->byte_size()); - memset(tuple2.get(), 0, tuple_descs[2]->byte_size()); - - // All nullable tuples are NULL. - row->SetTuple(0, tuple0.get()); - row->SetTuple(1, NULL); - row->SetTuple(2, NULL); - EXPECT_EQ(tuple_descs[0]->byte_size(), stream.ComputeRowSize(row.get())); - - // Tuples are initialized to empty and have no var-len data. - row->SetTuple(1, tuple1.get()); - row->SetTuple(2, tuple2.get()); - EXPECT_EQ(string_desc_->GetRowSize(), stream.ComputeRowSize(row.get())); - - // Tuple 0 has some data. - const SlotDescriptor* string_slot = tuple_descs[0]->slots()[0]; - StringValue* sv = tuple0->GetStringSlot(string_slot->tuple_offset()); - *sv = STRINGS[0]; - int64_t expected_len = string_desc_->GetRowSize() + sv->len; - EXPECT_EQ(expected_len, stream.ComputeRowSize(row.get())); - - // Check that external slots aren't included in count. - sv = tuple1->GetStringSlot(external_string_slot->tuple_offset()); - sv->ptr = reinterpret_cast<char*>(1234); - sv->len = 1234; - EXPECT_EQ(expected_len, stream.ComputeRowSize(row.get())); - - stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES); -} - -/// Test that deep copy works with arrays by copying into a BufferedTupleStream, freeing -/// the original rows, then reading back the rows and verifying the contents. -TEST_F(ArrayTupleStreamTest, TestArrayDeepCopy) { - Status status; - InitBlockMgr(-1, IO_BLOCK_SIZE); - const int NUM_ROWS = 4000; - BufferedTupleStream stream( - runtime_state_, array_desc_, runtime_state_->block_mgr(), client_, false, false); - const vector<TupleDescriptor*>& tuple_descs = array_desc_->tuple_descriptors(); - // Write out a predictable pattern of data by iterating over arrays of constants. - int strings_index = 0; // we take the mod of this as index into STRINGS. - int array_lens[] = { 0, 1, 5, 10, 1000, 2, 49, 20 }; - int num_array_lens = sizeof(array_lens) / sizeof(array_lens[0]); - int array_len_index = 0; - ASSERT_OK(stream.Init(-1, NULL, false)); - bool got_write_buffer; - ASSERT_OK(stream.PrepareForWrite(&got_write_buffer)); - ASSERT_TRUE(got_write_buffer); - - for (int i = 0; i < NUM_ROWS; ++i) { - int expected_row_size = tuple_descs[0]->byte_size() + tuple_descs[1]->byte_size(); - gscoped_ptr<TupleRow, FreeDeleter> row(reinterpret_cast<TupleRow*>( - malloc(tuple_descs.size() * sizeof(Tuple*)))); - gscoped_ptr<Tuple, FreeDeleter> tuple0(reinterpret_cast<Tuple*>( - malloc(tuple_descs[0]->byte_size()))); - gscoped_ptr<Tuple, FreeDeleter> tuple1(reinterpret_cast<Tuple*>( - malloc(tuple_descs[1]->byte_size()))); - memset(tuple0.get(), 0, tuple_descs[0]->byte_size()); - memset(tuple1.get(), 0, tuple_descs[1]->byte_size()); - row->SetTuple(0, tuple0.get()); - row->SetTuple(1, tuple1.get()); - - // Only array<string> is non-null. - tuple0->SetNull(tuple_descs[0]->slots()[1]->null_indicator_offset()); - tuple1->SetNull(tuple_descs[1]->slots()[0]->null_indicator_offset()); - const SlotDescriptor* array_slot_desc = tuple_descs[0]->slots()[0]; - const TupleDescriptor* item_desc = array_slot_desc->collection_item_descriptor(); - - int array_len = array_lens[array_len_index++ % num_array_lens]; - CollectionValue* cv = tuple0->GetCollectionSlot(array_slot_desc->tuple_offset()); - cv->ptr = NULL; - cv->num_tuples = 0; - CollectionValueBuilder builder(cv, *item_desc, mem_pool_.get(), runtime_state_, - array_len); - Tuple* array_data; - int num_rows; - builder.GetFreeMemory(&array_data, &num_rows); - expected_row_size += item_desc->byte_size() * array_len; - - // Fill the array with pointers to our constant strings. - for (int j = 0; j < array_len; ++j) { - const StringValue* string = &STRINGS[strings_index++ % NUM_STRINGS]; - array_data->SetNotNull(item_desc->slots()[0]->null_indicator_offset()); - RawValue::Write(string, array_data, item_desc->slots()[0], mem_pool_.get()); - array_data += item_desc->byte_size(); - expected_row_size += string->len; - } - builder.CommitTuples(array_len); - - // Check that internal row size computation gives correct result. - EXPECT_EQ(expected_row_size, stream.ComputeRowSize(row.get())); - bool b = stream.AddRow(row.get(), &status); - ASSERT_TRUE(b); - ASSERT_OK(status); - mem_pool_->FreeAll(); // Free data as soon as possible to smoke out issues. - } - - // Read back and verify data. - bool got_read_buffer; - stream.PrepareForRead(false, &got_read_buffer); - ASSERT_TRUE(got_read_buffer); - strings_index = 0; - array_len_index = 0; - bool eos = false; - int rows_read = 0; - RowBatch batch(array_desc_, BATCH_SIZE, &tracker_); - do { - batch.Reset(); - ASSERT_OK(stream.GetNext(&batch, &eos)); - for (int i = 0; i < batch.num_rows(); ++i) { - TupleRow* row = batch.GetRow(i); - Tuple* tuple0 = row->GetTuple(0); - Tuple* tuple1 = row->GetTuple(1); - ASSERT_TRUE(tuple0 != NULL); - ASSERT_TRUE(tuple1 != NULL); - const SlotDescriptor* array_slot_desc = tuple_descs[0]->slots()[0]; - ASSERT_FALSE(tuple0->IsNull(array_slot_desc->null_indicator_offset())); - ASSERT_TRUE(tuple0->IsNull(tuple_descs[0]->slots()[1]->null_indicator_offset())); - ASSERT_TRUE(tuple1->IsNull(tuple_descs[1]->slots()[0]->null_indicator_offset())); - - const TupleDescriptor* item_desc = array_slot_desc->collection_item_descriptor(); - int expected_array_len = array_lens[array_len_index++ % num_array_lens]; - CollectionValue* cv = tuple0->GetCollectionSlot(array_slot_desc->tuple_offset()); - ASSERT_EQ(expected_array_len, cv->num_tuples); - for (int j = 0; j < cv->num_tuples; ++j) { - Tuple* item = reinterpret_cast<Tuple*>(cv->ptr + j * item_desc->byte_size()); - const SlotDescriptor* string_desc = item_desc->slots()[0]; - ASSERT_FALSE(item->IsNull(string_desc->null_indicator_offset())); - const StringValue* expected = &STRINGS[strings_index++ % NUM_STRINGS]; - const StringValue* actual = item->GetStringSlot(string_desc->tuple_offset()); - ASSERT_EQ(*expected, *actual); - } - } - rows_read += batch.num_rows(); - } while (!eos); - ASSERT_EQ(NUM_ROWS, rows_read); - stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES); -} - -/// Test that ComputeRowSize handles nulls -TEST_F(ArrayTupleStreamTest, TestComputeRowSize) { - InitBlockMgr(-1, 8 * 1024 * 1024); - const vector<TupleDescriptor*>& tuple_descs = array_desc_->tuple_descriptors(); - set<SlotId> external_slots; - // Second array slot in first tuple is stored externally. - const SlotDescriptor* external_array_slot = tuple_descs[0]->slots()[1]; - external_slots.insert(external_array_slot->id()); - - BufferedTupleStream stream(runtime_state_, array_desc_, runtime_state_->block_mgr(), - client_, false, false, external_slots); - gscoped_ptr<TupleRow, FreeDeleter> row(reinterpret_cast<TupleRow*>( - malloc(tuple_descs.size() * sizeof(Tuple*)))); - gscoped_ptr<Tuple, FreeDeleter> tuple0(reinterpret_cast<Tuple*>( - malloc(tuple_descs[0]->byte_size()))); - gscoped_ptr<Tuple, FreeDeleter> tuple1(reinterpret_cast<Tuple*>( - malloc(tuple_descs[1]->byte_size()))); - memset(tuple0.get(), 0, tuple_descs[0]->byte_size()); - memset(tuple1.get(), 0, tuple_descs[1]->byte_size()); - - // All tuples are NULL. - row->SetTuple(0, NULL); - row->SetTuple(1, NULL); - EXPECT_EQ(0, stream.ComputeRowSize(row.get())); - - // Tuples are initialized to empty and have no var-len data. - row->SetTuple(0, tuple0.get()); - row->SetTuple(1, tuple1.get()); - EXPECT_EQ(array_desc_->GetRowSize(), stream.ComputeRowSize(row.get())); - - // Tuple 0 has an array. - int expected_row_size = array_desc_->GetRowSize(); - const SlotDescriptor* array_slot = tuple_descs[0]->slots()[0]; - const TupleDescriptor* item_desc = array_slot->collection_item_descriptor(); - int array_len = 128; - CollectionValue* cv = tuple0->GetCollectionSlot(array_slot->tuple_offset()); - CollectionValueBuilder builder(cv, *item_desc, mem_pool_.get(), runtime_state_, - array_len); - Tuple* array_data; - int num_rows; - builder.GetFreeMemory(&array_data, &num_rows); - expected_row_size += item_desc->byte_size() * array_len; - - // Fill the array with pointers to our constant strings. - for (int i = 0; i < array_len; ++i) { - const StringValue* str = &STRINGS[i % NUM_STRINGS]; - array_data->SetNotNull(item_desc->slots()[0]->null_indicator_offset()); - RawValue::Write(str, array_data, item_desc->slots()[0], mem_pool_.get()); - array_data += item_desc->byte_size(); - expected_row_size += str->len; - } - builder.CommitTuples(array_len); - EXPECT_EQ(expected_row_size, stream.ComputeRowSize(row.get())); - - // Check that the external slot isn't included in size. - cv = tuple0->GetCollectionSlot(external_array_slot->tuple_offset()); - // ptr of external slot shouldn't be dereferenced when computing size. - cv->ptr = reinterpret_cast<uint8_t*>(1234); - cv->num_tuples = 1234; - EXPECT_EQ(expected_row_size, stream.ComputeRowSize(row.get())); - - // Check that the array is excluded if tuple 0's array has its null indicator set. - tuple0->SetNull(array_slot->null_indicator_offset()); - EXPECT_EQ(array_desc_->GetRowSize(), stream.ComputeRowSize(row.get())); - - stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES); -} - -// TODO: more tests. -// - The stream can operate in many modes - -} - -int main(int argc, char** argv) { - ::testing::InitGoogleTest(&argc, argv); - impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST); - impala::InitFeSupport(); - impala::LlvmCodeGen::InitializeLlvm(); - return RUN_ALL_TESTS(); -}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/buffered-tuple-stream.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc deleted file mode 100644 index cce6390..0000000 --- a/be/src/runtime/buffered-tuple-stream.cc +++ /dev/null @@ -1,903 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "runtime/buffered-tuple-stream.inline.h" - -#include <boost/bind.hpp> -#include <gutil/strings/substitute.h> - -#include "runtime/collection-value.h" -#include "runtime/descriptors.h" -#include "runtime/string-value.h" -#include "runtime/tuple-row.h" -#include "util/bit-util.h" -#include "util/debug-util.h" -#include "util/runtime-profile-counters.h" - -#include "common/names.h" - -using namespace impala; -using namespace strings; - -// The first NUM_SMALL_BLOCKS of the tuple stream are made of blocks less than the -// IO size. These blocks never spill. -// TODO: Consider adding a 4MB in-memory buffer that would split the gap between the -// 512KB in-memory buffer and the 8MB (IO-sized) spillable buffer. -static const int64_t INITIAL_BLOCK_SIZES[] = { 64 * 1024, 512 * 1024 }; -static const int NUM_SMALL_BLOCKS = sizeof(INITIAL_BLOCK_SIZES) / sizeof(int64_t); - -string BufferedTupleStream::RowIdx::DebugString() const { - stringstream ss; - ss << "RowIdx block=" << block() << " offset=" << offset() << " idx=" << idx(); - return ss.str(); -} - -BufferedTupleStream::BufferedTupleStream(RuntimeState* state, - const RowDescriptor* row_desc, BufferedBlockMgr* block_mgr, - BufferedBlockMgr::Client* client, bool use_initial_small_buffers, bool read_write, - const set<SlotId>& ext_varlen_slots) - : state_(state), - desc_(row_desc), - block_mgr_(block_mgr), - block_mgr_client_(client), - total_byte_size_(0), - read_tuple_idx_(-1), - read_ptr_(NULL), - read_end_ptr_(NULL), - write_tuple_idx_(-1), - write_ptr_(NULL), - write_end_ptr_(NULL), - rows_returned_(0), - read_block_idx_(-1), - write_block_(NULL), - num_pinned_(0), - num_small_blocks_(0), - num_rows_(0), - pin_timer_(NULL), - unpin_timer_(NULL), - get_new_block_timer_(NULL), - read_write_(read_write), - has_nullable_tuple_(row_desc->IsAnyTupleNullable()), - use_small_buffers_(use_initial_small_buffers), - delete_on_read_(false), - closed_(false), - pinned_(true) { - read_block_null_indicators_size_ = -1; - write_block_null_indicators_size_ = -1; - max_null_indicators_size_ = -1; - read_block_ = blocks_.end(); - fixed_tuple_row_size_ = 0; - for (int i = 0; i < desc_->tuple_descriptors().size(); ++i) { - const TupleDescriptor* tuple_desc = desc_->tuple_descriptors()[i]; - const int tuple_byte_size = tuple_desc->byte_size(); - fixed_tuple_sizes_.push_back(tuple_byte_size); - fixed_tuple_row_size_ += tuple_byte_size; - - vector<SlotDescriptor*> tuple_string_slots; - vector<SlotDescriptor*> tuple_coll_slots; - for (int j = 0; j < tuple_desc->slots().size(); ++j) { - SlotDescriptor* slot = tuple_desc->slots()[j]; - if (!slot->type().IsVarLenType()) continue; - if (ext_varlen_slots.find(slot->id()) == ext_varlen_slots.end()) { - if (slot->type().IsVarLenStringType()) { - tuple_string_slots.push_back(slot); - } else { - DCHECK(slot->type().IsCollectionType()); - tuple_coll_slots.push_back(slot); - } - } - } - if (!tuple_string_slots.empty()) { - inlined_string_slots_.push_back(make_pair(i, tuple_string_slots)); - } - - if (!tuple_coll_slots.empty()) { - inlined_coll_slots_.push_back(make_pair(i, tuple_coll_slots)); - } - } -} - -BufferedTupleStream::~BufferedTupleStream() { - DCHECK(closed_); -} - -// Returns the number of pinned blocks in the list. Only called in DCHECKs to validate -// num_pinned_. -int NumPinned(const list<BufferedBlockMgr::Block*>& blocks) { - int num_pinned = 0; - for (BufferedBlockMgr::Block* block : blocks) { - if (block->is_pinned() && block->is_max_size()) ++num_pinned; - } - return num_pinned; -} - -string BufferedTupleStream::DebugString() const { - stringstream ss; - ss << "BufferedTupleStream num_rows=" << num_rows_ << " rows_returned=" - << rows_returned_ << " pinned=" << (pinned_ ? "true" : "false") - << " delete_on_read=" << (delete_on_read_ ? "true" : "false") - << " closed=" << (closed_ ? "true" : "false") - << " num_pinned=" << num_pinned_ - << " write_block=" << write_block_ << " read_block_="; - if (read_block_ == blocks_.end()) { - ss << "<end>"; - } else { - ss << *read_block_; - } - ss << " blocks=[\n"; - for (BufferedBlockMgr::Block* block : blocks_) { - ss << "{" << block->DebugString() << "}"; - if (block != blocks_.back()) ss << ",\n"; - } - ss << "]"; - return ss.str(); -} - -Status BufferedTupleStream::Init(int node_id, RuntimeProfile* profile, bool pinned) { - if (profile != NULL) { - pin_timer_ = ADD_TIMER(profile, "PinTime"); - unpin_timer_ = ADD_TIMER(profile, "UnpinTime"); - get_new_block_timer_ = ADD_TIMER(profile, "GetNewBlockTime"); - } - - max_null_indicators_size_ = ComputeNumNullIndicatorBytes(block_mgr_->max_block_size()); - if (UNLIKELY(max_null_indicators_size_ < 0)) { - // The block cannot even fit in a row of tuples so just assume there is one row. - int null_indicators_size = - BitUtil::RoundUpNumi64(desc_->tuple_descriptors().size()) * 8; - return Status(TErrorCode::BTS_BLOCK_OVERFLOW, - PrettyPrinter::Print(fixed_tuple_row_size_, TUnit::BYTES), - PrettyPrinter::Print(null_indicators_size, TUnit::BYTES)); - } - - if (block_mgr_->max_block_size() < INITIAL_BLOCK_SIZES[0]) { - use_small_buffers_ = false; - } - if (!pinned) RETURN_IF_ERROR(UnpinStream(UNPIN_ALL_EXCEPT_CURRENT)); - return Status::OK(); -} - -Status BufferedTupleStream::PrepareForWrite(bool* got_buffer) { - DCHECK(write_block_ == NULL); - return NewWriteBlockForRow(fixed_tuple_row_size_, got_buffer); -} - -Status BufferedTupleStream::SwitchToIoBuffers(bool* got_buffer) { - if (!use_small_buffers_) { - *got_buffer = (write_block_ != NULL); - return Status::OK(); - } - use_small_buffers_ = false; - Status status = - NewWriteBlock(block_mgr_->max_block_size(), max_null_indicators_size_, got_buffer); - // IMPALA-2330: Set the flag using small buffers back to false in case it failed to - // got a buffer. - DCHECK(status.ok() || !*got_buffer) << status.ok() << " " << *got_buffer; - use_small_buffers_ = !*got_buffer; - return status; -} - -void BufferedTupleStream::Close(RowBatch* batch, RowBatch::FlushMode flush) { - for (BufferedBlockMgr::Block* block : blocks_) { - if (batch != NULL && block->is_pinned()) { - batch->AddBlock(block, flush); - } else { - block->Delete(); - } - } - blocks_.clear(); - num_pinned_ = 0; - DCHECK_EQ(num_pinned_, NumPinned(blocks_)); - closed_ = true; -} - -int64_t BufferedTupleStream::bytes_in_mem(bool ignore_current) const { - int64_t result = 0; - for (BufferedBlockMgr::Block* block : blocks_) { - if (!block->is_pinned()) continue; - if (!block->is_max_size()) continue; - if (block == write_block_ && ignore_current) continue; - result += block->buffer_len(); - } - return result; -} - -Status BufferedTupleStream::UnpinBlock(BufferedBlockMgr::Block* block) { - SCOPED_TIMER(unpin_timer_); - DCHECK(block->is_pinned()); - if (!block->is_max_size()) return Status::OK(); - RETURN_IF_ERROR(block->Unpin()); - --num_pinned_; - DCHECK_EQ(num_pinned_, NumPinned(blocks_)); - return Status::OK(); -} - -Status BufferedTupleStream::NewWriteBlock( - int64_t block_len, int64_t null_indicators_size, bool* got_block) noexcept { - DCHECK(!closed_); - DCHECK_GE(null_indicators_size, 0); - *got_block = false; - - BufferedBlockMgr::Block* unpin_block = write_block_; - if (write_block_ != NULL) { - DCHECK(write_block_->is_pinned()); - if (pinned_ || write_block_ == *read_block_ || !write_block_->is_max_size()) { - // In these cases, don't unpin the current write block. - unpin_block = NULL; - } - } - - BufferedBlockMgr::Block* new_block = NULL; - { - SCOPED_TIMER(get_new_block_timer_); - RETURN_IF_ERROR(block_mgr_->GetNewBlock( - block_mgr_client_, unpin_block, &new_block, block_len)); - } - *got_block = new_block != NULL; - - if (!*got_block) { - DCHECK(unpin_block == NULL); - return Status::OK(); - } - - if (unpin_block != NULL) { - DCHECK(unpin_block == write_block_); - DCHECK(!write_block_->is_pinned()); - --num_pinned_; - DCHECK_EQ(num_pinned_, NumPinned(blocks_)); - } - - // Mark the entire block as containing valid data to avoid updating it as we go. - new_block->Allocate<uint8_t>(block_len); - - // Compute and allocate the block header with the null indicators. - DCHECK_EQ(null_indicators_size, ComputeNumNullIndicatorBytes(block_len)); - write_block_null_indicators_size_ = null_indicators_size; - write_tuple_idx_ = 0; - write_ptr_ = new_block->buffer() + write_block_null_indicators_size_; - write_end_ptr_ = new_block->buffer() + block_len; - - blocks_.push_back(new_block); - block_start_idx_.push_back(new_block->buffer()); - write_block_ = new_block; - DCHECK(write_block_->is_pinned()); - DCHECK_EQ(write_block_->num_rows(), 0); - if (write_block_->is_max_size()) { - ++num_pinned_; - DCHECK_EQ(num_pinned_, NumPinned(blocks_)); - } else { - ++num_small_blocks_; - } - total_byte_size_ += block_len; - return Status::OK(); -} - -Status BufferedTupleStream::NewWriteBlockForRow( - int64_t row_size, bool* got_block) noexcept { - int64_t block_len = 0; - int64_t null_indicators_size = 0; - if (use_small_buffers_) { - *got_block = false; - if (blocks_.size() < NUM_SMALL_BLOCKS) { - block_len = INITIAL_BLOCK_SIZES[blocks_.size()]; - null_indicators_size = ComputeNumNullIndicatorBytes(block_len); - // Use small buffer only if: - // 1. the small buffer's size is smaller than the configured max block size. - // 2. a single row of tuples and null indicators (if any) fit in the small buffer. - // - // If condition 2 above is not met, we will bail. An alternative would be - // to try the next larger small buffer. - *got_block = block_len < block_mgr_->max_block_size() && - null_indicators_size >= 0 && row_size + null_indicators_size <= block_len; - } - // Do not switch to IO-buffers automatically. Do not get a buffer. - if (!*got_block) return Status::OK(); - } else { - DCHECK_GE(max_null_indicators_size_, 0); - block_len = block_mgr_->max_block_size(); - null_indicators_size = max_null_indicators_size_; - // Check if the size of row and null indicators exceeds the IO block size. - if (UNLIKELY(row_size + null_indicators_size > block_len)) { - return Status(TErrorCode::BTS_BLOCK_OVERFLOW, - PrettyPrinter::Print(row_size, TUnit::BYTES), - PrettyPrinter::Print(null_indicators_size, TUnit::BYTES)); - } - } - return NewWriteBlock(block_len, null_indicators_size, got_block); -} - -Status BufferedTupleStream::NextReadBlock() { - DCHECK(!closed_); - DCHECK(read_block_ != blocks_.end()); - DCHECK_EQ(num_pinned_, NumPinned(blocks_)) << pinned_; - - // If non-NULL, this will be the current block if we are going to free it while - // grabbing the next block. This will stay NULL if we don't want to free the - // current block. - BufferedBlockMgr::Block* block_to_free = - (!pinned_ || delete_on_read_) ? *read_block_ : NULL; - if (delete_on_read_) { - DCHECK(read_block_ == blocks_.begin()); - DCHECK(*read_block_ != write_block_); - blocks_.pop_front(); - read_block_ = blocks_.begin(); - read_block_idx_ = 0; - if (block_to_free != NULL && !block_to_free->is_max_size()) { - block_to_free->Delete(); - block_to_free = NULL; - DCHECK_EQ(num_pinned_, NumPinned(blocks_)) << DebugString(); - } - } else { - ++read_block_; - ++read_block_idx_; - if (block_to_free != NULL && !block_to_free->is_max_size()) block_to_free = NULL; - } - - bool pinned = false; - if (read_block_ == blocks_.end() || (*read_block_)->is_pinned()) { - // End of the blocks or already pinned, just handle block_to_free - if (block_to_free != NULL) { - SCOPED_TIMER(unpin_timer_); - if (delete_on_read_) { - block_to_free->Delete(); - --num_pinned_; - } else { - RETURN_IF_ERROR(UnpinBlock(block_to_free)); - } - } - } else { - // Call into the block mgr to atomically unpin/delete the old block and pin the - // new block. - SCOPED_TIMER(pin_timer_); - RETURN_IF_ERROR((*read_block_)->Pin(&pinned, block_to_free, !delete_on_read_)); - if (!pinned) { - DCHECK(block_to_free == NULL) << "Should have been able to pin." - << endl << block_mgr_->DebugString(block_mgr_client_);; - } - if (block_to_free == NULL && pinned) ++num_pinned_; - } - - if (read_block_ != blocks_.end() && (*read_block_)->is_pinned()) { - read_block_null_indicators_size_ = - ComputeNumNullIndicatorBytes((*read_block_)->buffer_len()); - DCHECK_GE(read_block_null_indicators_size_, 0); - read_tuple_idx_ = 0; - read_ptr_ = (*read_block_)->buffer() + read_block_null_indicators_size_; - read_end_ptr_ = (*read_block_)->buffer() + (*read_block_)->buffer_len(); - } - DCHECK_EQ(num_pinned_, NumPinned(blocks_)) << DebugString(); - return Status::OK(); -} - -Status BufferedTupleStream::PrepareForRead(bool delete_on_read, bool* got_buffer) { - DCHECK(!closed_); - if (blocks_.empty()) return Status::OK(); - - if (!read_write_ && write_block_ != NULL) { - DCHECK(write_block_->is_pinned()); - if (!pinned_ && write_block_ != blocks_.front()) { - RETURN_IF_ERROR(UnpinBlock(write_block_)); - } - write_block_ = NULL; - } - - // Walk the blocks and pin the first IO-sized block. - for (BufferedBlockMgr::Block* block : blocks_) { - if (!block->is_pinned()) { - SCOPED_TIMER(pin_timer_); - bool current_pinned; - RETURN_IF_ERROR(block->Pin(¤t_pinned)); - if (!current_pinned) { - *got_buffer = false; - return Status::OK(); - } - ++num_pinned_; - DCHECK_EQ(num_pinned_, NumPinned(blocks_)); - } - if (block->is_max_size()) break; - } - - read_block_ = blocks_.begin(); - DCHECK(read_block_ != blocks_.end()); - read_block_null_indicators_size_ = - ComputeNumNullIndicatorBytes((*read_block_)->buffer_len()); - DCHECK_GE(read_block_null_indicators_size_, 0); - read_tuple_idx_ = 0; - read_ptr_ = (*read_block_)->buffer() + read_block_null_indicators_size_; - read_end_ptr_ = (*read_block_)->buffer() + (*read_block_)->buffer_len(); - rows_returned_ = 0; - read_block_idx_ = 0; - delete_on_read_ = delete_on_read; - *got_buffer = true; - return Status::OK(); -} - -Status BufferedTupleStream::PinStream(bool already_reserved, bool* pinned) { - DCHECK(!closed_); - DCHECK(pinned != NULL); - if (!already_reserved) { - // If we can't get all the blocks, don't try at all. - if (!block_mgr_->TryAcquireTmpReservation(block_mgr_client_, blocks_unpinned())) { - *pinned = false; - return Status::OK(); - } - } - - for (BufferedBlockMgr::Block* block : blocks_) { - if (block->is_pinned()) continue; - { - SCOPED_TIMER(pin_timer_); - RETURN_IF_ERROR(block->Pin(pinned)); - } - if (!*pinned) { - VLOG_QUERY << "Should have been reserved." << endl - << block_mgr_->DebugString(block_mgr_client_); - return Status::OK(); - } - ++num_pinned_; - DCHECK_EQ(num_pinned_, NumPinned(blocks_)); - } - - if (!delete_on_read_) { - // Populate block_start_idx_ on pin. - DCHECK_EQ(block_start_idx_.size(), blocks_.size()); - block_start_idx_.clear(); - for (BufferedBlockMgr::Block* block : blocks_) { - block_start_idx_.push_back(block->buffer()); - } - } - *pinned = true; - pinned_ = true; - return Status::OK(); -} - -Status BufferedTupleStream::UnpinStream(UnpinMode mode) { - DCHECK(!closed_); - DCHECK(mode == UNPIN_ALL || mode == UNPIN_ALL_EXCEPT_CURRENT); - SCOPED_TIMER(unpin_timer_); - - for (BufferedBlockMgr::Block* block: blocks_) { - if (!block->is_pinned()) continue; - if (mode == UNPIN_ALL_EXCEPT_CURRENT - && (block == write_block_ || (read_write_ && block == *read_block_))) { - continue; - } - RETURN_IF_ERROR(UnpinBlock(block)); - } - if (mode == UNPIN_ALL) { - read_block_ = blocks_.end(); - write_block_ = NULL; - } - pinned_ = false; - return Status::OK(); -} - -int BufferedTupleStream::ComputeNumNullIndicatorBytes(int block_size) const { - if (has_nullable_tuple_) { - // We assume that all rows will use their max size, so we may be underutilizing the - // space, i.e. we may have some unused space in case of rows with NULL tuples. - const uint32_t tuples_per_row = desc_->tuple_descriptors().size(); - const uint32_t min_row_size_in_bits = 8 * fixed_tuple_row_size_ + tuples_per_row; - const uint32_t block_size_in_bits = 8 * block_size; - const uint32_t max_num_rows = block_size_in_bits / min_row_size_in_bits; - if (UNLIKELY(max_num_rows == 0)) return -1; - return BitUtil::RoundUpNumi64(max_num_rows * tuples_per_row) * 8; - } else { - // If there are no nullable tuples then no need to waste space for null indicators. - return 0; - } -} - -Status BufferedTupleStream::GetRows(scoped_ptr<RowBatch>* batch, bool* got_rows) { - if (num_rows() > numeric_limits<int>::max()) { - // RowBatch::num_rows_ is a 32-bit int, avoid an overflow. - return Status(Substitute("Trying to read $0 rows into in-memory batch failed. Limit " - "is $1", num_rows(), numeric_limits<int>::max())); - } - RETURN_IF_ERROR(PinStream(false, got_rows)); - if (!*got_rows) return Status::OK(); - bool got_read_buffer; - RETURN_IF_ERROR(PrepareForRead(false, &got_read_buffer)); - DCHECK(got_read_buffer) << "Stream was pinned"; - batch->reset( - new RowBatch(desc_, num_rows(), block_mgr_->get_tracker(block_mgr_client_))); - bool eos = false; - // Loop until GetNext fills the entire batch. Each call can stop at block - // boundaries. We generally want it to stop, so that blocks can be freed - // as we read. It is safe in this case because we pin the entire stream. - while (!eos) { - RETURN_IF_ERROR(GetNext(batch->get(), &eos)); - } - return Status::OK(); -} - -Status BufferedTupleStream::GetNext(RowBatch* batch, bool* eos) { - return GetNextInternal<false>(batch, eos, NULL); -} - -Status BufferedTupleStream::GetNext(RowBatch* batch, bool* eos, - vector<RowIdx>* indices) { - return GetNextInternal<true>(batch, eos, indices); -} - -template <bool FILL_INDICES> -Status BufferedTupleStream::GetNextInternal(RowBatch* batch, bool* eos, - vector<RowIdx>* indices) { - if (has_nullable_tuple_) { - return GetNextInternal<FILL_INDICES, true>(batch, eos, indices); - } else { - return GetNextInternal<FILL_INDICES, false>(batch, eos, indices); - } -} - -template <bool FILL_INDICES, bool HAS_NULLABLE_TUPLE> -Status BufferedTupleStream::GetNextInternal(RowBatch* batch, bool* eos, - vector<RowIdx>* indices) { - DCHECK(!closed_); - DCHECK(batch->row_desc()->LayoutEquals(*desc_)); - *eos = (rows_returned_ == num_rows_); - if (*eos) return Status::OK(); - DCHECK_GE(read_block_null_indicators_size_, 0); - - const uint64_t tuples_per_row = desc_->tuple_descriptors().size(); - DCHECK_LE(read_tuple_idx_ / tuples_per_row, (*read_block_)->num_rows()); - DCHECK_EQ(read_tuple_idx_ % tuples_per_row, 0); - int rows_returned_curr_block = read_tuple_idx_ / tuples_per_row; - - if (UNLIKELY(rows_returned_curr_block == (*read_block_)->num_rows())) { - // Get the next block in the stream. We need to do this at the beginning of - // the GetNext() call to ensure the buffer management semantics. NextReadBlock() - // will recycle the memory for the rows returned from the *previous* call to - // GetNext(). - RETURN_IF_ERROR(NextReadBlock()); - DCHECK(read_block_ != blocks_.end()) << DebugString(); - DCHECK_GE(read_block_null_indicators_size_, 0); - rows_returned_curr_block = 0; - } - - DCHECK(read_block_ != blocks_.end()); - DCHECK((*read_block_)->is_pinned()) << DebugString(); - DCHECK_GE(read_tuple_idx_, 0); - - int rows_left_in_block = (*read_block_)->num_rows() - rows_returned_curr_block; - int rows_to_fill = std::min(batch->capacity() - batch->num_rows(), rows_left_in_block); - DCHECK_GE(rows_to_fill, 1); - batch->AddRows(rows_to_fill); - uint8_t* tuple_row_mem = reinterpret_cast<uint8_t*>(batch->GetRow(batch->num_rows())); - - // Produce tuple rows from the current block and the corresponding position on the - // null tuple indicator. - if (FILL_INDICES) { - DCHECK(indices != NULL); - DCHECK(!delete_on_read_); - DCHECK_EQ(batch->num_rows(), 0); - indices->clear(); - indices->reserve(rows_to_fill); - } - - uint8_t* null_word = NULL; - uint32_t null_pos = 0; - // Start reading from position read_tuple_idx_ in the block. - // IMPALA-2256: Special case if there are no materialized slots. - bool increment_row = RowConsumesMemory(); - uint64_t last_read_row = increment_row * (read_tuple_idx_ / tuples_per_row); - for (int i = 0; i < rows_to_fill; ++i) { - if (FILL_INDICES) { - indices->push_back(RowIdx()); - DCHECK_EQ(indices->size(), i + 1); - (*indices)[i].set(read_block_idx_, read_ptr_ - (*read_block_)->buffer(), - last_read_row); - } - // Copy the row into the output batch. - TupleRow* output_row = reinterpret_cast<TupleRow*>(tuple_row_mem); - if (HAS_NULLABLE_TUPLE) { - for (int j = 0; j < tuples_per_row; ++j) { - // Stitch together the tuples from the block and the NULL ones. - null_word = (*read_block_)->buffer() + (read_tuple_idx_ >> 3); - null_pos = read_tuple_idx_ & 7; - ++read_tuple_idx_; - const bool is_not_null = ((*null_word & (1 << (7 - null_pos))) == 0); - // Copy tuple and advance read_ptr_. If it is a NULL tuple, it calls SetTuple - // with Tuple* being 0x0. To do that we multiply the current read_ptr_ with - // false (0x0). - output_row->SetTuple(j, reinterpret_cast<Tuple*>( - reinterpret_cast<uint64_t>(read_ptr_) * is_not_null)); - read_ptr_ += fixed_tuple_sizes_[j] * is_not_null; - } - } else { - // When we know that there are no nullable tuples we can skip null checks. - for (int j = 0; j < tuples_per_row; ++j) { - output_row->SetTuple(j, reinterpret_cast<Tuple*>(read_ptr_)); - read_ptr_ += fixed_tuple_sizes_[j]; - } - read_tuple_idx_ += tuples_per_row; - } - tuple_row_mem += sizeof(Tuple*) * tuples_per_row; - - // Update string slot ptrs, skipping external strings. - for (int j = 0; j < inlined_string_slots_.size(); ++j) { - Tuple* tuple = output_row->GetTuple(inlined_string_slots_[j].first); - if (HAS_NULLABLE_TUPLE && tuple == NULL) continue; - FixUpStringsForRead(inlined_string_slots_[j].second, tuple); - } - - // Update collection slot ptrs, skipping external collections. We traverse the - // collection structure in the same order as it was written to the stream, allowing - // us to infer the data layout based on the length of collections and strings. - for (int j = 0; j < inlined_coll_slots_.size(); ++j) { - Tuple* tuple = output_row->GetTuple(inlined_coll_slots_[j].first); - if (HAS_NULLABLE_TUPLE && tuple == NULL) continue; - FixUpCollectionsForRead(inlined_coll_slots_[j].second, tuple); - } - last_read_row += increment_row; - } - - batch->CommitRows(rows_to_fill); - rows_returned_ += rows_to_fill; - *eos = (rows_returned_ == num_rows_); - if ((!pinned_ || delete_on_read_) - && rows_returned_curr_block + rows_to_fill == (*read_block_)->num_rows()) { - // No more data in this block. The batch must be immediately returned up the operator - // tree and deep copied so that NextReadBlock() can reuse the read block's buffer. - batch->MarkNeedsDeepCopy(); - } - if (FILL_INDICES) DCHECK_EQ(indices->size(), rows_to_fill); - DCHECK_LE(read_ptr_, read_end_ptr_); - return Status::OK(); -} - -void BufferedTupleStream::FixUpStringsForRead(const vector<SlotDescriptor*>& string_slots, - Tuple* tuple) { - DCHECK(tuple != NULL); - for (int i = 0; i < string_slots.size(); ++i) { - const SlotDescriptor* slot_desc = string_slots[i]; - if (tuple->IsNull(slot_desc->null_indicator_offset())) continue; - - StringValue* sv = tuple->GetStringSlot(slot_desc->tuple_offset()); - DCHECK_LE(sv->len, read_block_bytes_remaining()); - sv->ptr = reinterpret_cast<char*>(read_ptr_); - read_ptr_ += sv->len; - } -} - -void BufferedTupleStream::FixUpCollectionsForRead(const vector<SlotDescriptor*>& collection_slots, - Tuple* tuple) { - DCHECK(tuple != NULL); - for (int i = 0; i < collection_slots.size(); ++i) { - const SlotDescriptor* slot_desc = collection_slots[i]; - if (tuple->IsNull(slot_desc->null_indicator_offset())) continue; - - CollectionValue* cv = tuple->GetCollectionSlot(slot_desc->tuple_offset()); - const TupleDescriptor& item_desc = *slot_desc->collection_item_descriptor(); - int coll_byte_size = cv->num_tuples * item_desc.byte_size(); - DCHECK_LE(coll_byte_size, read_block_bytes_remaining()); - cv->ptr = reinterpret_cast<uint8_t*>(read_ptr_); - read_ptr_ += coll_byte_size; - - if (!item_desc.HasVarlenSlots()) continue; - uint8_t* coll_data = cv->ptr; - for (int j = 0; j < cv->num_tuples; ++j) { - Tuple* item = reinterpret_cast<Tuple*>(coll_data); - FixUpStringsForRead(item_desc.string_slots(), item); - FixUpCollectionsForRead(item_desc.collection_slots(), item); - coll_data += item_desc.byte_size(); - } - } -} - -int64_t BufferedTupleStream::ComputeRowSize(TupleRow* row) const noexcept { - int64_t size = 0; - if (has_nullable_tuple_) { - for (int i = 0; i < fixed_tuple_sizes_.size(); ++i) { - if (row->GetTuple(i) != NULL) size += fixed_tuple_sizes_[i]; - } - } else { - size = fixed_tuple_row_size_; - } - for (int i = 0; i < inlined_string_slots_.size(); ++i) { - Tuple* tuple = row->GetTuple(inlined_string_slots_[i].first); - if (tuple == NULL) continue; - const vector<SlotDescriptor*>& slots = inlined_string_slots_[i].second; - for (auto it = slots.begin(); it != slots.end(); ++it) { - if (tuple->IsNull((*it)->null_indicator_offset())) continue; - size += tuple->GetStringSlot((*it)->tuple_offset())->len; - } - } - - for (int i = 0; i < inlined_coll_slots_.size(); ++i) { - Tuple* tuple = row->GetTuple(inlined_coll_slots_[i].first); - if (tuple == NULL) continue; - const vector<SlotDescriptor*>& slots = inlined_coll_slots_[i].second; - for (auto it = slots.begin(); it != slots.end(); ++it) { - if (tuple->IsNull((*it)->null_indicator_offset())) continue; - CollectionValue* cv = tuple->GetCollectionSlot((*it)->tuple_offset()); - const TupleDescriptor& item_desc = *(*it)->collection_item_descriptor(); - size += cv->num_tuples * item_desc.byte_size(); - - if (!item_desc.HasVarlenSlots()) continue; - for (int j = 0; j < cv->num_tuples; ++j) { - Tuple* item = reinterpret_cast<Tuple*>(&cv->ptr[j * item_desc.byte_size()]); - size += item->VarlenByteSize(item_desc); - } - } - } - return size; -} - -bool BufferedTupleStream::AddRowSlow(TupleRow* row, Status* status) noexcept { - bool got_block; - int64_t row_size = ComputeRowSize(row); - *status = NewWriteBlockForRow(row_size, &got_block); - if (!status->ok() || !got_block) return false; - return DeepCopy(row); -} - -bool BufferedTupleStream::DeepCopy(TupleRow* row) noexcept { - if (has_nullable_tuple_) { - return DeepCopyInternal<true>(row); - } else { - return DeepCopyInternal<false>(row); - } -} - -// TODO: this really needs codegen -// TODO: in case of duplicate tuples, this can redundantly serialize data. -template <bool HasNullableTuple> -bool BufferedTupleStream::DeepCopyInternal(TupleRow* row) noexcept { - if (UNLIKELY(write_block_ == NULL)) return false; - DCHECK_GE(write_block_null_indicators_size_, 0); - DCHECK(write_block_->is_pinned()) << DebugString() << std::endl - << write_block_->DebugString(); - - const uint64_t tuples_per_row = desc_->tuple_descriptors().size(); - uint32_t bytes_remaining = write_block_bytes_remaining(); - if (UNLIKELY((bytes_remaining < fixed_tuple_row_size_) || - (HasNullableTuple && - (write_tuple_idx_ + tuples_per_row > write_block_null_indicators_size_ * 8)))) { - return false; - } - - // Copy the not NULL fixed len tuples. For the NULL tuples just update the NULL tuple - // indicator. - if (HasNullableTuple) { - DCHECK_GT(write_block_null_indicators_size_, 0); - uint8_t* null_word = NULL; - uint32_t null_pos = 0; - for (int i = 0; i < tuples_per_row; ++i) { - null_word = write_block_->buffer() + (write_tuple_idx_ >> 3); // / 8 - null_pos = write_tuple_idx_ & 7; - ++write_tuple_idx_; - const int tuple_size = fixed_tuple_sizes_[i]; - Tuple* t = row->GetTuple(i); - const uint8_t mask = 1 << (7 - null_pos); - if (t != NULL) { - *null_word &= ~mask; - memcpy(write_ptr_, t, tuple_size); - write_ptr_ += tuple_size; - } else { - *null_word |= mask; - } - } - DCHECK_LE(write_tuple_idx_ - 1, write_block_null_indicators_size_ * 8); - } else { - // If we know that there are no nullable tuples no need to set the nullability flags. - DCHECK_EQ(write_block_null_indicators_size_, 0); - for (int i = 0; i < tuples_per_row; ++i) { - const int tuple_size = fixed_tuple_sizes_[i]; - Tuple* t = row->GetTuple(i); - // TODO: Once IMPALA-1306 (Avoid passing empty tuples of non-materialized slots) - // is delivered, the check below should become DCHECK(t != NULL). - DCHECK(t != NULL || tuple_size == 0); - memcpy(write_ptr_, t, tuple_size); - write_ptr_ += tuple_size; - } - } - - // Copy inlined string slots. Note: we do not need to convert the string ptrs to offsets - // on the write path, only on the read. The tuple data is immediately followed - // by the string data so only the len information is necessary. - for (int i = 0; i < inlined_string_slots_.size(); ++i) { - const Tuple* tuple = row->GetTuple(inlined_string_slots_[i].first); - if (HasNullableTuple && tuple == NULL) continue; - if (UNLIKELY(!CopyStrings(tuple, inlined_string_slots_[i].second))) return false; - } - - // Copy inlined collection slots. We copy collection data in a well-defined order so - // we do not need to convert pointers to offsets on the write path. - for (int i = 0; i < inlined_coll_slots_.size(); ++i) { - const Tuple* tuple = row->GetTuple(inlined_coll_slots_[i].first); - if (HasNullableTuple && tuple == NULL) continue; - if (UNLIKELY(!CopyCollections(tuple, inlined_coll_slots_[i].second))) return false; - } - - write_block_->AddRow(); - ++num_rows_; - return true; -} - -bool BufferedTupleStream::CopyStrings(const Tuple* tuple, - const vector<SlotDescriptor*>& string_slots) { - for (int i = 0; i < string_slots.size(); ++i) { - const SlotDescriptor* slot_desc = string_slots[i]; - if (tuple->IsNull(slot_desc->null_indicator_offset())) continue; - const StringValue* sv = tuple->GetStringSlot(slot_desc->tuple_offset()); - if (LIKELY(sv->len > 0)) { - if (UNLIKELY(write_block_bytes_remaining() < sv->len)) return false; - - memcpy(write_ptr_, sv->ptr, sv->len); - write_ptr_ += sv->len; - } - } - return true; -} - -bool BufferedTupleStream::CopyCollections(const Tuple* tuple, - const vector<SlotDescriptor*>& collection_slots) { - for (int i = 0; i < collection_slots.size(); ++i) { - const SlotDescriptor* slot_desc = collection_slots[i]; - if (tuple->IsNull(slot_desc->null_indicator_offset())) continue; - const CollectionValue* cv = tuple->GetCollectionSlot(slot_desc->tuple_offset()); - const TupleDescriptor& item_desc = *slot_desc->collection_item_descriptor(); - if (LIKELY(cv->num_tuples > 0)) { - int coll_byte_size = cv->num_tuples * item_desc.byte_size(); - if (UNLIKELY(write_block_bytes_remaining() < coll_byte_size)) return false; - uint8_t* coll_data = write_ptr_; - memcpy(coll_data, cv->ptr, coll_byte_size); - write_ptr_ += coll_byte_size; - - if (!item_desc.HasVarlenSlots()) continue; - // Copy variable length data when present in collection items. - for (int j = 0; j < cv->num_tuples; ++j) { - const Tuple* item = reinterpret_cast<Tuple*>(coll_data); - if (UNLIKELY(!CopyStrings(item, item_desc.string_slots()))) return false; - if (UNLIKELY(!CopyCollections(item, item_desc.collection_slots()))) return false; - coll_data += item_desc.byte_size(); - } - } - } - return true; -} - -void BufferedTupleStream::GetTupleRow(const RowIdx& idx, TupleRow* row) const { - DCHECK(row != NULL); - DCHECK(!closed_); - DCHECK(is_pinned()); - DCHECK(!delete_on_read_); - DCHECK_EQ(blocks_.size(), block_start_idx_.size()); - DCHECK_LT(idx.block(), blocks_.size()); - - uint8_t* data = block_start_idx_[idx.block()] + idx.offset(); - if (has_nullable_tuple_) { - // Stitch together the tuples from the block and the NULL ones. - const int tuples_per_row = desc_->tuple_descriptors().size(); - uint32_t tuple_idx = idx.idx() * tuples_per_row; - for (int i = 0; i < tuples_per_row; ++i) { - const uint8_t* null_word = block_start_idx_[idx.block()] + (tuple_idx >> 3); - const uint32_t null_pos = tuple_idx & 7; - const bool is_not_null = ((*null_word & (1 << (7 - null_pos))) == 0); - row->SetTuple(i, reinterpret_cast<Tuple*>( - reinterpret_cast<uint64_t>(data) * is_not_null)); - data += desc_->tuple_descriptors()[i]->byte_size() * is_not_null; - ++tuple_idx; - } - } else { - for (int i = 0; i < desc_->tuple_descriptors().size(); ++i) { - row->SetTuple(i, reinterpret_cast<Tuple*>(data)); - data += desc_->tuple_descriptors()[i]->byte_size(); - } - } -}
