This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit e4c05fce5998a1c2d10e111cf72fed00995ec85f Author: Xuebin Su <[email protected]> AuthorDate: Sat Aug 31 00:38:58 2024 +0800 IMPALA-13286: Make stream row counters 64-bit Previously, adding a large number of zero-sized rows to a BufferedTupleStream could cause impalad to crash. This was because the row counters for a page include - Page::num_rows, and - ReadIterator::read_page_rows_returned_ were 32-bit integers. When the row size is zero, all rows are added to one single page. And if the number of rows in the whole stream exceeds the limit, the two counters will overflow, making the DCHECK failed. This patch fixes the issue by making the row counters 64-bit integers so that they will not easily overflow even when row size is zero. DCHECKs are also added to make sure that overflow will not happen. Testing: - Added unit test StreamStateTest::TestAddAndGetZeroSizedRows to ensure it is OK to add and get rows when the number of rows in the stream exceeds INT_MAX or UINT_MAX. - Ran the previously failed query manually and it completed with the correct result after the patch. Change-Id: I4d4cf8f424360717de0c4a5571a638a4c11b9606 Reviewed-on: http://gerrit.cloudera.org:8080/21741 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/runtime/buffered-tuple-stream-test.cc | 96 ++++++++++++++++++++++++++++ be/src/runtime/buffered-tuple-stream.cc | 11 +++- be/src/runtime/buffered-tuple-stream.h | 11 ++-- 3 files changed, 112 insertions(+), 6 deletions(-) diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc index 55dce370d..80b2069bb 100644 --- a/be/src/runtime/buffered-tuple-stream-test.cc +++ b/be/src/runtime/buffered-tuple-stream-test.cc @@ -91,6 +91,12 @@ class SimpleTupleStreamTest : public testing::Test { string_desc_ = pool_.Add(new RowDescriptor(*string_builder.Build(), tuple_ids, nullable_tuples)); + DescriptorTblBuilder zero_sized_row_builder( + test_env_->exec_env()->frontend(), &pool_); + zero_sized_row_builder.DeclareTuple(); + zero_sized_row_desc_ = pool_.Add( + new RowDescriptor(*zero_sized_row_builder.Build(), tuple_ids, nullable_tuples)); + // Construct descriptors for big rows with and without nullable tuples. // Each tuple contains 8 slots of TYPE_INT and a single byte for null indicator. DescriptorTblBuilder big_row_builder(test_env_->exec_env()->frontend(), &pool_); @@ -454,6 +460,7 @@ class SimpleTupleStreamTest : public testing::Test { ObjectPool pool_; RowDescriptor* int_desc_; RowDescriptor* string_desc_; + RowDescriptor* zero_sized_row_desc_; static const int64_t BIG_ROW_BYTES = 16 * 1024; RowDescriptor* big_row_desc_; @@ -601,6 +608,15 @@ class StreamStateTest : public SimpleTupleStreamTest { // Test that stream's debug string is capped only for the first // BufferedTupleStream::MAX_PAGE_ITER_DEBUG. void TestShortDebugString(); + + // Helper method to add one zero-sized row to the stream. + void TestAddOneZeroSizedRow(BufferedTupleStream& stream, RowBatch& batch); + + // Helper method to get one zero-sized row from the stream. + void TestGetOneZeroSizedRow(BufferedTupleStream& stream); + + // Test adding more than INT_MAX or UINT_MAX zero-size rows to a stream. + void TestAddAndGetZeroSizedRows(); }; // Basic API test. No data should be going to disk. @@ -2243,6 +2259,86 @@ TEST_F(StreamStateTest, UnpinFullyExhaustedReadPageOnReadOnlyStreamNoAttach) { TEST_F(StreamStateTest, ShortDebugString) { TestShortDebugString(); } + +void StreamStateTest::TestAddOneZeroSizedRow( + BufferedTupleStream& stream, RowBatch& batch) { + Status status; + uint8_t* write_ptr_before = stream.write_ptr_; + int64_t num_rows_before = + stream.write_page_ == nullptr ? 0 : stream.write_page_->num_rows; + bool b = stream.AddRow(batch.GetRow(0), &status); + ASSERT_OK(status); + ASSERT_TRUE(b); + if (write_ptr_before != nullptr) { + ASSERT_EQ(stream.write_ptr_, write_ptr_before); + } + ASSERT_EQ(stream.write_page_->num_rows, num_rows_before + 1); +} + +void StreamStateTest::TestGetOneZeroSizedRow(BufferedTupleStream& stream) { + RowBatch read_batch(zero_sized_row_desc_, 1, &tracker_); + uint8_t* read_ptr_before = stream.read_it_.read_ptr_; + int64_t num_rows_before = stream.read_it_.read_page_rows_returned_; + bool eos = false; + Status status = stream.GetNext(&read_batch, &eos); + ASSERT_OK(status); + if (read_ptr_before != nullptr) { + ASSERT_EQ(stream.read_it_.read_ptr_, read_ptr_before); + } + ASSERT_EQ(stream.read_it_.read_page_rows_returned_, num_rows_before + 1); +} + +void StreamStateTest::TestAddAndGetZeroSizedRows() { + Init(BUFFER_POOL_LIMIT); + BufferedTupleStream stream( + runtime_state_, zero_sized_row_desc_, &client_, PAGE_LEN, PAGE_LEN); + ASSERT_OK(stream.Init("StreamStateTest::TestAddAndGetZeroSizedRows", false)); + bool got_reservation = false; + ASSERT_OK(stream.PrepareForReadWrite(true, &got_reservation)); + ASSERT_TRUE(got_reservation); + + RowBatch write_batch(zero_sized_row_desc_, 1, &tracker_); + write_batch.CommitRows(1); + + // Adding 1 row to initialize the row counters + TestAddOneZeroSizedRow(stream, write_batch); + + // Set the row counters to mock a stream with INT_MAX rows + stream.num_rows_ = INT_MAX; + stream.write_page_->num_rows = INT_MAX; + + // Test if the stream can hold more than INT_MAX rows. + TestAddOneZeroSizedRow(stream, write_batch); + + // Set the row counters to mock a stream with UINT_MAX rows + stream.num_rows_ = UINT_MAX; + stream.write_page_->num_rows = UINT_MAX; + + // Test if the stream can hold more than UINT_MAX rows. + TestAddOneZeroSizedRow(stream, write_batch); + + stream.DoneWriting(); + + // Test if we can get 1 row after getting 0 rows. + TestGetOneZeroSizedRow(stream); + + // Test if we can get 1 row after getting INT_MAX rows. + stream.read_it_.IncrRowsReturned(INT_MAX - 1); + ASSERT_EQ(stream.read_it_.read_page_rows_returned_, INT_MAX); + TestGetOneZeroSizedRow(stream); + + // Test if we can get 1 row after getting UINT_MAX rows. + stream.read_it_.IncrRowsReturned(UINT_MAX - INT_MAX - 1); + ASSERT_EQ(stream.read_it_.read_page_rows_returned_, UINT_MAX); + TestGetOneZeroSizedRow(stream); + + ASSERT_EQ(stream.read_it_.GetRowsLeftInPage(), 0); + stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES); +} + +TEST_F(StreamStateTest, AddAndGetZeroSizedRows) { + TestAddAndGetZeroSizedRows(); +} } int main(int argc, char** argv) { diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc index 19dcb9ff6..2202b6124 100644 --- a/be/src/runtime/buffered-tuple-stream.cc +++ b/be/src/runtime/buffered-tuple-stream.cc @@ -832,8 +832,13 @@ Status BufferedTupleStream::GetNextInternal(ReadIterator* RESTRICT read_iter, DCHECK(read_iter->read_page_->is_pinned()) << DebugString(); DCHECK_GE(read_iter->read_page_rows_returned_, 0); - int rows_left_in_page = read_iter->GetRowsLeftInPage(); - int rows_to_fill = std::min(batch->capacity() - batch->num_rows(), rows_left_in_page); + int64_t rows_left_in_page = read_iter->GetRowsLeftInPage(); + // We are casting an int64_t to int here but this is OK because rows_to_fill is + // no greater than batch->capacity(), which in turn is no greater than INT_MAX. + int64_t rows_to_fill_temp = std::min( + static_cast<int64_t>(batch->capacity() - batch->num_rows()), rows_left_in_page); + DCHECK_LE(rows_to_fill_temp, INT_MAX); + int rows_to_fill = static_cast<int>(rows_to_fill_temp); DCHECK_GE(rows_to_fill, 1); uint8_t* tuple_row_mem = reinterpret_cast<uint8_t*>(batch->GetRow(batch->num_rows())); @@ -1034,6 +1039,8 @@ bool BufferedTupleStream::AddRow(TupleRow* row, Status* status) noexcept { if (UNLIKELY(write_page_ == nullptr || !DeepCopy(row, &write_ptr_, write_end_ptr_))) { return AddRowSlow(row, status); } + DCHECK_LT(num_rows_, INT64_MAX); + DCHECK_LT(write_page_->num_rows, INT64_MAX); ++num_rows_; ++write_page_->num_rows; return true; diff --git a/be/src/runtime/buffered-tuple-stream.h b/be/src/runtime/buffered-tuple-stream.h index 6002d112d..0191d96de 100644 --- a/be/src/runtime/buffered-tuple-stream.h +++ b/be/src/runtime/buffered-tuple-stream.h @@ -450,8 +450,10 @@ class BufferedTupleStream { BufferPool::PageHandle handle; - /// Number of rows written to the page. - int num_rows = 0; + /// Number of rows written to the page. Its type is int64_t since when the row size + /// is zero, all rows will be added to one page. And the number of rows in the whole + /// stream can exceed INT_MAX. + int64_t num_rows = 0; /// Whether we called GetBuffer() on the page since it was last pinned. This means /// that GetBuffer() and ExtractBuffer() cannot fail and that GetNext() may have @@ -506,8 +508,9 @@ class BufferedTupleStream { /// Total number of rows returned via this read iterator since Init() was called. int64_t rows_returned_ = 0; - /// Number of rows returned from the current 'read_page_'. - uint32_t read_page_rows_returned_ = -1; + /// Number of rows returned from the current 'read_page_'. The type needs to be + /// compatible with 'Page::num_rows' for them to compare. + int64_t read_page_rows_returned_ = -1; /// Pointer into 'read_page_' to the byte after the last row read. uint8_t* read_ptr_ = nullptr;
