This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit a6a244099502329d9193b316ea26d5fd6451b6bd Author: Riza Suminto <[email protected]> AuthorDate: Tue Dec 15 15:51:24 2020 -0800 IMPALA-10374: Limit iteration at BufferedTupleStream::DebugString BufferedTupleStream::DebugString() iterate std::list<Page> that can potentially grow very large. As consequent, the returned string can grow large as well and cause a problem as previously happen in IMPALA-9851. With this patch, BufferedTupleStream::DebugString() only include maximum of 100 first pages of page list. Testing: - Add new be test SimpleTupleStreamTest.ShortDebugString in buffered-tuple-stream-test.cc - Pass core tests Change-Id: I6626c8d54f35f303c01f85be1dd9aa54c8ad9a2d Reviewed-on: http://gerrit.cloudera.org:8080/16884 Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Quanlong Huang <[email protected]> --- be/src/runtime/buffered-tuple-stream-test.cc | 62 ++++++++++++++++++++++++++++ be/src/runtime/buffered-tuple-stream.cc | 12 ++++-- be/src/runtime/buffered-tuple-stream.h | 6 +-- 3 files changed, 73 insertions(+), 7 deletions(-) diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc index 1c42f07..776b207 100644 --- a/be/src/runtime/buffered-tuple-stream-test.cc +++ b/be/src/runtime/buffered-tuple-stream-test.cc @@ -39,6 +39,7 @@ #include "service/fe-support.h" #include "testutil/desc-tbl-builder.h" #include "testutil/gtest-util.h" +#include "util/error-util.h" #include "util/test-info.h" #include "gen-cpp/ImpalaInternalService_types.h" @@ -61,6 +62,8 @@ static const uint32_t PRIME = 479001599; namespace impala { +constexpr int ErrorMsg::MAX_ERROR_MESSAGE_LEN; + static const StringValue STRINGS[] = { StringValue("ABC"), StringValue("HELLO"), StringValue("123456789"), StringValue("FOOBAR"), StringValue("ONE"), StringValue("THREE"), @@ -1454,6 +1457,65 @@ TEST_F(SimpleTupleStreamTest, ConcurrentReaders) { stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES); } +TEST_F(SimpleTupleStreamTest, ShortDebugString) { + Init(BUFFER_POOL_LIMIT); + + int num_batches = 50; + RowDescriptor* desc = int_desc_; + bool gen_null = false; + int64_t default_page_len = 128 * sizeof(int); + int64_t max_page_len = default_page_len; + int num_rows = BATCH_SIZE; + + BufferedTupleStream stream( + runtime_state_, desc, &client_, default_page_len, max_page_len); + ASSERT_OK(stream.Init("SimpleTupleStreamTest::ShortDebugString", true)); + bool got_write_reservation; + ASSERT_OK(stream.PrepareForWrite(&got_write_reservation)); + ASSERT_TRUE(got_write_reservation); + + // Add rows to the stream + int offset = 0; + for (int i = 0; i < num_batches; ++i) { + RowBatch* batch = nullptr; + + Status status; + batch = CreateBatch(desc, offset, num_rows, gen_null); + for (int j = 0; j < batch->num_rows(); ++j) { + bool b = stream.AddRow(batch->GetRow(j), &status); + ASSERT_OK(status); + ASSERT_TRUE(b); + } + offset += batch->num_rows(); + // Reset the batch to make sure the stream handles the memory correctly. + batch->Reset(); + } + + bool got_read_reservation; + ASSERT_OK(stream.PrepareForRead(false, &got_read_reservation)); + ASSERT_TRUE(got_read_reservation); + + // Read all the rows back + vector<int> results; + ReadValues(&stream, desc, &results); + + // Verify result + VerifyResults<int>(*desc, results, num_rows * num_batches, gen_null); + + // Verify that stream contains more than MAX_PAGE_ITER_DEBUG pages and only subset of + // pages are included in DebugString(). + DCHECK_GT(stream.num_pages_, BufferedTupleStream::MAX_PAGE_ITER_DEBUG); + string page_count_substr = Substitute( + "$0 out of $1 pages=", BufferedTupleStream::MAX_PAGE_ITER_DEBUG, stream.num_pages_); + string debug_string = stream.DebugString(); + ASSERT_NE(debug_string.find(page_count_substr), string::npos) + << page_count_substr << " not found at BufferedTupleStream::DebugString(). " + << debug_string; + ASSERT_LE(debug_string.length(), ErrorMsg::MAX_ERROR_MESSAGE_LEN); + + stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES); +} + // Basic API test. No data should be going to disk. TEST_F(SimpleNullStreamTest, Basic) { Init(BUFFER_POOL_LIMIT); diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc index 5d12fdf..a4b76a2 100644 --- a/be/src/runtime/buffered-tuple-stream.cc +++ b/be/src/runtime/buffered-tuple-stream.cc @@ -50,6 +50,8 @@ using namespace strings; using BufferHandle = BufferPool::BufferHandle; using FlushMode = RowBatch::FlushMode; +constexpr int64_t BufferedTupleStream::MAX_PAGE_ITER_DEBUG; + BufferedTupleStream::BufferedTupleStream(RuntimeState* state, const RowDescriptor* row_desc, BufferPool::ClientHandle* buffer_pool_client, int64_t default_page_len, int64_t max_page_len, const set<SlotId>& ext_varlen_slots) @@ -183,10 +185,12 @@ string BufferedTupleStream::DebugString() const { } else { ss << write_page_reservation_.GetReservation(); } - ss << "\n # pages=" << num_pages_ << " pages=[\n"; - for (const Page& page : pages_) { - ss << "{" << page.DebugString() << "}"; - if (&page != &pages_.back()) ss << ",\n"; + int64_t max_page = min(num_pages_, BufferedTupleStream::MAX_PAGE_ITER_DEBUG); + ss << "\n " << max_page << " out of " << num_pages_ << " pages=[\n"; + for (auto page = pages_.begin(); (page != pages_.end()) && (max_page > 0); ++page) { + ss << "{" << page->DebugString() << "}"; + max_page--; + if (max_page > 0) ss << ",\n"; } ss << "]"; return ss.str(); diff --git a/be/src/runtime/buffered-tuple-stream.h b/be/src/runtime/buffered-tuple-stream.h index 5a22bfb..4f76cba 100644 --- a/be/src/runtime/buffered-tuple-stream.h +++ b/be/src/runtime/buffered-tuple-stream.h @@ -417,6 +417,8 @@ class BufferedTupleStream { std::string DebugString() const; + static constexpr int64_t MAX_PAGE_ITER_DEBUG = 100; + private: /// Wrapper around BufferPool::PageHandle that tracks additional info about the page. struct Page { @@ -558,9 +560,7 @@ class BufferedTupleStream { private: DISALLOW_COPY_AND_ASSIGN(BufferedTupleStream); - friend class ArrayTupleStreamTest_TestArrayDeepCopy_Test; - friend class ArrayTupleStreamTest_TestComputeRowSize_Test; - friend class MultiNullableTupleStreamTest_TestComputeRowSize_Test; + friend class SimpleTupleStreamTest_ShortDebugString_Test; /// Runtime state instance used to check for cancellation. Not owned. RuntimeState* const state_;
