This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit a6a244099502329d9193b316ea26d5fd6451b6bd
Author: Riza Suminto <[email protected]>
AuthorDate: Tue Dec 15 15:51:24 2020 -0800

    IMPALA-10374: Limit iteration at BufferedTupleStream::DebugString
    
    BufferedTupleStream::DebugString() iterate std::list<Page> that can
    potentially grow very large. As consequent, the returned string can grow
    large as well and cause a problem as previously happen in IMPALA-9851.
    With this patch, BufferedTupleStream::DebugString() only include maximum
    of 100 first pages of page list.
    
    Testing:
    - Add new be test SimpleTupleStreamTest.ShortDebugString in
      buffered-tuple-stream-test.cc
    - Pass core tests
    
    Change-Id: I6626c8d54f35f303c01f85be1dd9aa54c8ad9a2d
    Reviewed-on: http://gerrit.cloudera.org:8080/16884
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Quanlong Huang <[email protected]>
---
 be/src/runtime/buffered-tuple-stream-test.cc | 62 ++++++++++++++++++++++++++++
 be/src/runtime/buffered-tuple-stream.cc      | 12 ++++--
 be/src/runtime/buffered-tuple-stream.h       |  6 +--
 3 files changed, 73 insertions(+), 7 deletions(-)

diff --git a/be/src/runtime/buffered-tuple-stream-test.cc 
b/be/src/runtime/buffered-tuple-stream-test.cc
index 1c42f07..776b207 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -39,6 +39,7 @@
 #include "service/fe-support.h"
 #include "testutil/desc-tbl-builder.h"
 #include "testutil/gtest-util.h"
+#include "util/error-util.h"
 #include "util/test-info.h"
 
 #include "gen-cpp/ImpalaInternalService_types.h"
@@ -61,6 +62,8 @@ static const uint32_t PRIME = 479001599;
 
 namespace impala {
 
+constexpr int ErrorMsg::MAX_ERROR_MESSAGE_LEN;
+
 static const StringValue STRINGS[] = {
     StringValue("ABC"), StringValue("HELLO"), StringValue("123456789"),
     StringValue("FOOBAR"), StringValue("ONE"), StringValue("THREE"),
@@ -1454,6 +1457,65 @@ TEST_F(SimpleTupleStreamTest, ConcurrentReaders) {
   stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
 }
 
+TEST_F(SimpleTupleStreamTest, ShortDebugString) {
+  Init(BUFFER_POOL_LIMIT);
+
+  int num_batches = 50;
+  RowDescriptor* desc = int_desc_;
+  bool gen_null = false;
+  int64_t default_page_len = 128 * sizeof(int);
+  int64_t max_page_len = default_page_len;
+  int num_rows = BATCH_SIZE;
+
+  BufferedTupleStream stream(
+      runtime_state_, desc, &client_, default_page_len, max_page_len);
+  ASSERT_OK(stream.Init("SimpleTupleStreamTest::ShortDebugString", true));
+  bool got_write_reservation;
+  ASSERT_OK(stream.PrepareForWrite(&got_write_reservation));
+  ASSERT_TRUE(got_write_reservation);
+
+  // Add rows to the stream
+  int offset = 0;
+  for (int i = 0; i < num_batches; ++i) {
+    RowBatch* batch = nullptr;
+
+    Status status;
+    batch = CreateBatch(desc, offset, num_rows, gen_null);
+    for (int j = 0; j < batch->num_rows(); ++j) {
+      bool b = stream.AddRow(batch->GetRow(j), &status);
+      ASSERT_OK(status);
+      ASSERT_TRUE(b);
+    }
+    offset += batch->num_rows();
+    // Reset the batch to make sure the stream handles the memory correctly.
+    batch->Reset();
+  }
+
+  bool got_read_reservation;
+  ASSERT_OK(stream.PrepareForRead(false, &got_read_reservation));
+  ASSERT_TRUE(got_read_reservation);
+
+  // Read all the rows back
+  vector<int> results;
+  ReadValues(&stream, desc, &results);
+
+  // Verify result
+  VerifyResults<int>(*desc, results, num_rows * num_batches, gen_null);
+
+  // Verify that stream contains more than MAX_PAGE_ITER_DEBUG pages and only 
subset of
+  // pages are included in DebugString().
+  DCHECK_GT(stream.num_pages_, BufferedTupleStream::MAX_PAGE_ITER_DEBUG);
+  string page_count_substr = Substitute(
+      "$0 out of $1 pages=", BufferedTupleStream::MAX_PAGE_ITER_DEBUG, 
stream.num_pages_);
+  string debug_string = stream.DebugString();
+  ASSERT_NE(debug_string.find(page_count_substr), string::npos)
+      << page_count_substr << " not found at 
BufferedTupleStream::DebugString(). "
+      << debug_string;
+  ASSERT_LE(debug_string.length(), ErrorMsg::MAX_ERROR_MESSAGE_LEN);
+
+  stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+}
+
 // Basic API test. No data should be going to disk.
 TEST_F(SimpleNullStreamTest, Basic) {
   Init(BUFFER_POOL_LIMIT);
diff --git a/be/src/runtime/buffered-tuple-stream.cc 
b/be/src/runtime/buffered-tuple-stream.cc
index 5d12fdf..a4b76a2 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -50,6 +50,8 @@ using namespace strings;
 using BufferHandle = BufferPool::BufferHandle;
 using FlushMode = RowBatch::FlushMode;
 
+constexpr int64_t BufferedTupleStream::MAX_PAGE_ITER_DEBUG;
+
 BufferedTupleStream::BufferedTupleStream(RuntimeState* state,
     const RowDescriptor* row_desc, BufferPool::ClientHandle* 
buffer_pool_client,
     int64_t default_page_len, int64_t max_page_len, const set<SlotId>& 
ext_varlen_slots)
@@ -183,10 +185,12 @@ string BufferedTupleStream::DebugString() const {
   } else {
     ss << write_page_reservation_.GetReservation();
   }
-  ss << "\n # pages=" << num_pages_ << " pages=[\n";
-  for (const Page& page : pages_) {
-    ss << "{" << page.DebugString() << "}";
-    if (&page != &pages_.back()) ss << ",\n";
+  int64_t max_page = min(num_pages_, BufferedTupleStream::MAX_PAGE_ITER_DEBUG);
+  ss << "\n " << max_page << " out of " << num_pages_ << " pages=[\n";
+  for (auto page = pages_.begin(); (page != pages_.end()) && (max_page > 0); 
++page) {
+    ss << "{" << page->DebugString() << "}";
+    max_page--;
+    if (max_page > 0) ss << ",\n";
   }
   ss << "]";
   return ss.str();
diff --git a/be/src/runtime/buffered-tuple-stream.h 
b/be/src/runtime/buffered-tuple-stream.h
index 5a22bfb..4f76cba 100644
--- a/be/src/runtime/buffered-tuple-stream.h
+++ b/be/src/runtime/buffered-tuple-stream.h
@@ -417,6 +417,8 @@ class BufferedTupleStream {
 
   std::string DebugString() const;
 
+  static constexpr int64_t MAX_PAGE_ITER_DEBUG = 100;
+
  private:
   /// Wrapper around BufferPool::PageHandle that tracks additional info about 
the page.
   struct Page {
@@ -558,9 +560,7 @@ class BufferedTupleStream {
 
  private:
   DISALLOW_COPY_AND_ASSIGN(BufferedTupleStream);
-  friend class ArrayTupleStreamTest_TestArrayDeepCopy_Test;
-  friend class ArrayTupleStreamTest_TestComputeRowSize_Test;
-  friend class MultiNullableTupleStreamTest_TestComputeRowSize_Test;
+  friend class SimpleTupleStreamTest_ShortDebugString_Test;
 
   /// Runtime state instance used to check for cancellation. Not owned.
   RuntimeState* const state_;

Reply via email to