This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 1267fde57b16755956e6a710bbc0543a61249d92 Author: Joe McDonnell <[email protected]> AuthorDate: Tue Oct 22 13:42:34 2024 -0700 IMPALA-13497: Add TupleCacheBytesWritten/Read to the profile This adds counters for the number of bytes written / read from the tuple cache. This gives visibility into whether certain locations have enormous result sizes. This will be used to tune the placement of tuple cache nodes. Tests: - Added checks of the TupleCacheBytesWritten/Read counters to existing tests in test_tuple_cache.py Change-Id: Ib5c9249049d8d46116a65929896832d02c2d9f1f Reviewed-on: http://gerrit.cloudera.org:8080/21991 Reviewed-by: Yida Wu <[email protected]> Reviewed-by: Michael Smith <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/tuple-file-reader.cc | 10 ++++++++-- be/src/exec/tuple-file-reader.h | 2 ++ be/src/exec/tuple-file-writer.cc | 3 +++ be/src/exec/tuple-file-writer.h | 2 ++ tests/custom_cluster/test_tuple_cache.py | 23 +++++++++++++++++++++-- 5 files changed, 36 insertions(+), 4 deletions(-) diff --git a/be/src/exec/tuple-file-reader.cc b/be/src/exec/tuple-file-reader.cc index 04af197a4..dfe638980 100644 --- a/be/src/exec/tuple-file-reader.cc +++ b/be/src/exec/tuple-file-reader.cc @@ -37,7 +37,9 @@ TupleFileReader::TupleFileReader( tracker_(new MemTracker(-1, "TupleFileReader", parent)), read_timer_(profile ? ADD_TIMER(profile, "TupleCacheReadTime") : nullptr), deserialize_timer_( - profile ? ADD_TIMER(profile, "TupleCacheDeserializeTime") : nullptr) {} + profile ? ADD_TIMER(profile, "TupleCacheDeserializeTime") : nullptr), + bytes_read_(profile ? + ADD_COUNTER(profile, "TupleCacheBytesRead", TUnit::BYTES) : nullptr) {} TupleFileReader::~TupleFileReader() { // MemTracker expects an explicit close. @@ -86,6 +88,9 @@ Status TupleFileReader::GetNext(RuntimeState *state, KUDU_RETURN_IF_ERROR(reader_->ReadV(offset_, kudu::ArrayView<kudu::Slice>(chunk_lens_slices)), "Failed to read cache file"); + size_t chunk_lens_slices_size = sizeof(header_len) + sizeof(tuple_data_len) + + sizeof(tuple_offsets_len); + COUNTER_ADD(bytes_read_, chunk_lens_slices_size); // tuple_data_len can be zero, see IMPALA-13411. if (header_len == 0 || tuple_offsets_len == 0) { @@ -95,7 +100,7 @@ Status TupleFileReader::GetNext(RuntimeState *state, DCHECK(false) << err_msg; return Status(Substitute("Invalid tuple cache file: $0", err_msg)); } - offset_ += sizeof(header_len) + sizeof(tuple_data_len) + sizeof(tuple_offsets_len); + offset_ += chunk_lens_slices_size; // Now, we know the total size of the variable-length data, and we can read // it in a single chunk. @@ -124,6 +129,7 @@ Status TupleFileReader::GetNext(RuntimeState *state, KUDU_RETURN_IF_ERROR(reader_->ReadV(offset_, kudu::ArrayView<kudu::Slice>(varlen_data_slices)), "Failed to read tuple cache file"); + COUNTER_ADD(bytes_read_, varlen_size); offset_ += varlen_size; RowBatchHeaderPB header; diff --git a/be/src/exec/tuple-file-reader.h b/be/src/exec/tuple-file-reader.h index 9e60c3ec1..1e4c112d4 100644 --- a/be/src/exec/tuple-file-reader.h +++ b/be/src/exec/tuple-file-reader.h @@ -66,6 +66,8 @@ public: RuntimeProfile::Counter* read_timer_; // Total time spent on deserialization. RuntimeProfile::Counter* deserialize_timer_; + // Total bytes read + RuntimeProfile::Counter* bytes_read_; std::unique_ptr<kudu::RWFile> reader_; size_t offset_ = 0; diff --git a/be/src/exec/tuple-file-writer.cc b/be/src/exec/tuple-file-writer.cc index 2a7d668fd..9965258ed 100644 --- a/be/src/exec/tuple-file-writer.cc +++ b/be/src/exec/tuple-file-writer.cc @@ -45,6 +45,8 @@ TupleFileWriter::TupleFileWriter( tracker_(new MemTracker(-1, "TupleFileWriter", parent)), write_timer_(profile ? ADD_TIMER(profile, "TupleCacheWriteTime") : nullptr), serialize_timer_(profile ? ADD_TIMER(profile, "TupleCacheSerializeTime") : nullptr), + bytes_written_(profile ? + ADD_COUNTER(profile, "TupleCacheBytesWritten", TUnit::BYTES) : nullptr), max_file_size_(max_file_size) {} TupleFileWriter::~TupleFileWriter() { @@ -139,6 +141,7 @@ Status TupleFileWriter::Write(RuntimeState* state, RowBatch* row_batch) { KUDU_RETURN_IF_ERROR( tmp_file_->AppendV(kudu::ArrayView<const kudu::Slice>(slices)), "Failed to write to cache file"); + COUNTER_ADD(bytes_written_, num_bytes_to_write); return Status::OK(); } diff --git a/be/src/exec/tuple-file-writer.h b/be/src/exec/tuple-file-writer.h index b855fc384..bdaf04706 100644 --- a/be/src/exec/tuple-file-writer.h +++ b/be/src/exec/tuple-file-writer.h @@ -95,6 +95,8 @@ private: RuntimeProfile::Counter* write_timer_; // Total time spent on serialization. RuntimeProfile::Counter* serialize_timer_; + // Total bytes written + RuntimeProfile::Counter* bytes_written_; // Maximum size for the resulting file size_t max_file_size_; // True if the file reached the maximum size diff --git a/tests/custom_cluster/test_tuple_cache.py b/tests/custom_cluster/test_tuple_cache.py index 071b6d7f3..6ccac038a 100644 --- a/tests/custom_cluster/test_tuple_cache.py +++ b/tests/custom_cluster/test_tuple_cache.py @@ -47,11 +47,21 @@ def table_value(seed): return '"{0}", {1}, "{2}"'.format(name, age, address) +def getCounterValues(profile, key): + # This matches lines like these: + # NumTupleCacheHits: 1 (1) + # TupleCacheBytesWritten: 123.00 B (123) + # The regex extracts the value inside the parenthesis to get a simple numeric value + # rather than a pretty print of the same value. + counter_str_list = re.findall(r"{0}{1}: .* \((.*)\)".format(NODE_INDENT, key), profile) + return [int(v) for v in counter_str_list] + + def assertCounter(profile, key, val, num_matches): if not isinstance(num_matches, list): num_matches = [num_matches] - assert profile.count("{0}{1}: {2} ".format(NODE_INDENT, key, val)) in num_matches, \ - re.findall(r"{0}{1}: .*".format(NODE_INDENT, key), profile) + values = getCounterValues(profile, key) + assert len([v for v in values if v == val]) in num_matches, values def assertCounters(profile, num_hits, num_halted, num_skipped, num_matches=1): @@ -150,6 +160,11 @@ class TestTupleCache(TestTupleCacheBase): assert result1.data == result2.data assertCounters(result1.runtime_profile, num_hits=0, num_halted=0, num_skipped=0) assertCounters(result2.runtime_profile, num_hits=1, num_halted=0, num_skipped=0) + # Verify that the bytes written by the first profile are the same as the bytes + # read by the second profile. + bytes_written = getCounterValues(result1.runtime_profile, "TupleCacheBytesWritten") + bytes_read = getCounterValues(result2.runtime_profile, "TupleCacheBytesRead") + assert sorted(bytes_written) == sorted(bytes_read) @CustomClusterTestSuite.with_args( start_args=CACHE_START_ARGS + " --tuple_cache_capacity=64MB", cluster_size=1, @@ -167,6 +182,10 @@ class TestTupleCache(TestTupleCacheBase): assert result2.success assert result1.data == result2.data assertCounters(result1.runtime_profile, num_hits=0, num_halted=1, num_skipped=0) + bytes_written = getCounterValues(result1.runtime_profile, "TupleCacheBytesWritten") + # This is running on a single node, so there should be a single location where + # TupleCacheBytesWritten exceeds 0. + assert len([v for v in bytes_written if v > 0]) == 1 assertCounters(result2.runtime_profile, num_hits=0, num_halted=0, num_skipped=1) @CustomClusterTestSuite.with_args(
