This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 1267fde57b16755956e6a710bbc0543a61249d92
Author: Joe McDonnell <[email protected]>
AuthorDate: Tue Oct 22 13:42:34 2024 -0700

    IMPALA-13497: Add TupleCacheBytesWritten/Read to the profile
    
    This adds counters for the number of bytes written / read
    from the tuple cache. This gives visibility into whether
    certain locations have enormous result sizes. This will be
    used to tune the placement of tuple cache nodes.
    
    Tests:
     - Added checks of the TupleCacheBytesWritten/Read counters
       to existing tests in test_tuple_cache.py
    
    Change-Id: Ib5c9249049d8d46116a65929896832d02c2d9f1f
    Reviewed-on: http://gerrit.cloudera.org:8080/21991
    Reviewed-by: Yida Wu <[email protected]>
    Reviewed-by: Michael Smith <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/tuple-file-reader.cc         | 10 ++++++++--
 be/src/exec/tuple-file-reader.h          |  2 ++
 be/src/exec/tuple-file-writer.cc         |  3 +++
 be/src/exec/tuple-file-writer.h          |  2 ++
 tests/custom_cluster/test_tuple_cache.py | 23 +++++++++++++++++++++--
 5 files changed, 36 insertions(+), 4 deletions(-)

diff --git a/be/src/exec/tuple-file-reader.cc b/be/src/exec/tuple-file-reader.cc
index 04af197a4..dfe638980 100644
--- a/be/src/exec/tuple-file-reader.cc
+++ b/be/src/exec/tuple-file-reader.cc
@@ -37,7 +37,9 @@ TupleFileReader::TupleFileReader(
     tracker_(new MemTracker(-1, "TupleFileReader", parent)),
     read_timer_(profile ? ADD_TIMER(profile, "TupleCacheReadTime") : nullptr),
     deserialize_timer_(
-        profile ? ADD_TIMER(profile, "TupleCacheDeserializeTime") : nullptr) {}
+        profile ? ADD_TIMER(profile, "TupleCacheDeserializeTime") : nullptr),
+    bytes_read_(profile ?
+        ADD_COUNTER(profile, "TupleCacheBytesRead", TUnit::BYTES) : nullptr) {}
 
 TupleFileReader::~TupleFileReader() {
   // MemTracker expects an explicit close.
@@ -86,6 +88,9 @@ Status TupleFileReader::GetNext(RuntimeState *state,
   KUDU_RETURN_IF_ERROR(reader_->ReadV(offset_,
       kudu::ArrayView<kudu::Slice>(chunk_lens_slices)),
       "Failed to read cache file");
+  size_t chunk_lens_slices_size = sizeof(header_len) + sizeof(tuple_data_len) +
+      sizeof(tuple_offsets_len);
+  COUNTER_ADD(bytes_read_, chunk_lens_slices_size);
 
   // tuple_data_len can be zero, see IMPALA-13411.
   if (header_len == 0 || tuple_offsets_len == 0) {
@@ -95,7 +100,7 @@ Status TupleFileReader::GetNext(RuntimeState *state,
     DCHECK(false) << err_msg;
     return Status(Substitute("Invalid tuple cache file: $0", err_msg));
   }
-  offset_ += sizeof(header_len) + sizeof(tuple_data_len) + 
sizeof(tuple_offsets_len);
+  offset_ += chunk_lens_slices_size;
 
   // Now, we know the total size of the variable-length data, and we can read
   // it in a single chunk.
@@ -124,6 +129,7 @@ Status TupleFileReader::GetNext(RuntimeState *state,
   KUDU_RETURN_IF_ERROR(reader_->ReadV(offset_,
       kudu::ArrayView<kudu::Slice>(varlen_data_slices)),
       "Failed to read tuple cache file");
+  COUNTER_ADD(bytes_read_, varlen_size);
   offset_ += varlen_size;
 
   RowBatchHeaderPB header;
diff --git a/be/src/exec/tuple-file-reader.h b/be/src/exec/tuple-file-reader.h
index 9e60c3ec1..1e4c112d4 100644
--- a/be/src/exec/tuple-file-reader.h
+++ b/be/src/exec/tuple-file-reader.h
@@ -66,6 +66,8 @@ public:
   RuntimeProfile::Counter* read_timer_;
   // Total time spent on deserialization.
   RuntimeProfile::Counter* deserialize_timer_;
+  // Total bytes read
+  RuntimeProfile::Counter* bytes_read_;
 
   std::unique_ptr<kudu::RWFile> reader_;
   size_t offset_ = 0;
diff --git a/be/src/exec/tuple-file-writer.cc b/be/src/exec/tuple-file-writer.cc
index 2a7d668fd..9965258ed 100644
--- a/be/src/exec/tuple-file-writer.cc
+++ b/be/src/exec/tuple-file-writer.cc
@@ -45,6 +45,8 @@ TupleFileWriter::TupleFileWriter(
     tracker_(new MemTracker(-1, "TupleFileWriter", parent)),
     write_timer_(profile ? ADD_TIMER(profile, "TupleCacheWriteTime") : 
nullptr),
     serialize_timer_(profile ? ADD_TIMER(profile, "TupleCacheSerializeTime") : 
nullptr),
+    bytes_written_(profile ?
+        ADD_COUNTER(profile, "TupleCacheBytesWritten", TUnit::BYTES) : 
nullptr),
     max_file_size_(max_file_size) {}
 
 TupleFileWriter::~TupleFileWriter() {
@@ -139,6 +141,7 @@ Status TupleFileWriter::Write(RuntimeState* state, 
RowBatch* row_batch) {
   KUDU_RETURN_IF_ERROR(
       tmp_file_->AppendV(kudu::ArrayView<const kudu::Slice>(slices)),
       "Failed to write to cache file");
+  COUNTER_ADD(bytes_written_, num_bytes_to_write);
 
   return Status::OK();
 }
diff --git a/be/src/exec/tuple-file-writer.h b/be/src/exec/tuple-file-writer.h
index b855fc384..bdaf04706 100644
--- a/be/src/exec/tuple-file-writer.h
+++ b/be/src/exec/tuple-file-writer.h
@@ -95,6 +95,8 @@ private:
   RuntimeProfile::Counter* write_timer_;
   // Total time spent on serialization.
   RuntimeProfile::Counter* serialize_timer_;
+  // Total bytes written
+  RuntimeProfile::Counter* bytes_written_;
   // Maximum size for the resulting file
   size_t max_file_size_;
   // True if the file reached the maximum size
diff --git a/tests/custom_cluster/test_tuple_cache.py 
b/tests/custom_cluster/test_tuple_cache.py
index 071b6d7f3..6ccac038a 100644
--- a/tests/custom_cluster/test_tuple_cache.py
+++ b/tests/custom_cluster/test_tuple_cache.py
@@ -47,11 +47,21 @@ def table_value(seed):
   return '"{0}", {1}, "{2}"'.format(name, age, address)
 
 
+def getCounterValues(profile, key):
+  # This matches lines like these:
+  #     NumTupleCacheHits: 1 (1)
+  #     TupleCacheBytesWritten: 123.00 B (123)
+  # The regex extracts the value inside the parenthesis to get a simple 
numeric value
+  # rather than a pretty print of the same value.
+  counter_str_list = re.findall(r"{0}{1}: .* \((.*)\)".format(NODE_INDENT, 
key), profile)
+  return [int(v) for v in counter_str_list]
+
+
 def assertCounter(profile, key, val, num_matches):
   if not isinstance(num_matches, list):
     num_matches = [num_matches]
-  assert profile.count("{0}{1}: {2} ".format(NODE_INDENT, key, val)) in 
num_matches, \
-      re.findall(r"{0}{1}: .*".format(NODE_INDENT, key), profile)
+  values = getCounterValues(profile, key)
+  assert len([v for v in values if v == val]) in num_matches, values
 
 
 def assertCounters(profile, num_hits, num_halted, num_skipped, num_matches=1):
@@ -150,6 +160,11 @@ class TestTupleCache(TestTupleCacheBase):
     assert result1.data == result2.data
     assertCounters(result1.runtime_profile, num_hits=0, num_halted=0, 
num_skipped=0)
     assertCounters(result2.runtime_profile, num_hits=1, num_halted=0, 
num_skipped=0)
+    # Verify that the bytes written by the first profile are the same as the 
bytes
+    # read by the second profile.
+    bytes_written = getCounterValues(result1.runtime_profile, 
"TupleCacheBytesWritten")
+    bytes_read = getCounterValues(result2.runtime_profile, 
"TupleCacheBytesRead")
+    assert sorted(bytes_written) == sorted(bytes_read)
 
   @CustomClusterTestSuite.with_args(
       start_args=CACHE_START_ARGS + " --tuple_cache_capacity=64MB", 
cluster_size=1,
@@ -167,6 +182,10 @@ class TestTupleCache(TestTupleCacheBase):
     assert result2.success
     assert result1.data == result2.data
     assertCounters(result1.runtime_profile, num_hits=0, num_halted=1, 
num_skipped=0)
+    bytes_written = getCounterValues(result1.runtime_profile, 
"TupleCacheBytesWritten")
+    # This is running on a single node, so there should be a single location 
where
+    # TupleCacheBytesWritten exceeds 0.
+    assert len([v for v in bytes_written if v > 0]) == 1
     assertCounters(result2.runtime_profile, num_hits=0, num_halted=0, 
num_skipped=1)
 
   @CustomClusterTestSuite.with_args(

Reply via email to