This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new b1320bd1d IMPALA-13075: Cap memory usage for ExprValuesCache at 256KB
b1320bd1d is described below

commit b1320bd1d646eba3f044ef647b7d4497487d4674
Author: Riza Suminto <[email protected]>
AuthorDate: Thu May 23 11:19:33 2024 -0700

    IMPALA-13075: Cap memory usage for ExprValuesCache at 256KB
    
    ExprValuesCache uses BATCH_SIZE as a deciding factor to set its
    capacity. It bounds the capacity such that expr_values_array_ memory
    usage stays below 256KB. This patch tightens that limit to include all
    memory usage from ExprValuesCache::MemUsage() instead of
    expr_values_array_ only. Therefore, setting a very high BATCH_SIZE will
    not push the total memory usage of ExprValuesCache beyond 256KB.
    
    Simplify table dimension creation methods and fix few flake8 warnings in
    test_dimensions.py.
    
    Testing:
    - Add test_join_queries.py::TestExprValueCache.
    - Pass core tests.
    
    Change-Id: Iee27cbbe8d3100301d05a6516b62c45975a8d0e0
    Reviewed-on: http://gerrit.cloudera.org:8080/21455
    Reviewed-by: Riza Suminto <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/hash-table.cc                    | 17 ++++++++++---
 be/src/exec/hash-table.h                     |  2 +-
 bin/rat_exclude_files.txt                    |  3 +++
 testdata/workloads/tpcds_partitioned/queries |  1 +
 tests/common/test_dimensions.py              | 38 +++++++++++++++-------------
 tests/query_test/test_join_queries.py        | 31 +++++++++++++++++++++++
 6 files changed, 70 insertions(+), 22 deletions(-)

diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc
index c8576e335..1de667a52 100644
--- a/be/src/exec/hash-table.cc
+++ b/be/src/exec/hash-table.cc
@@ -366,13 +366,24 @@ Status HashTableCtx::ExprValuesCache::Init(RuntimeState* 
state, MemTracker* trac
     return Status::OK();
   }
   DCHECK_GT(expr_values_bytes_per_row_, 0);
-  // Compute the maximum number of cached rows which can fit in the memory 
budget.
+  // Compute the maximum number of cached rows which can fit in the memory 
budget,
+  // which is 256KB (MAX_EXPR_VALUES_CACHE_BYTES). 'sample_size' is 64 because 
MemUsage
+  // account for Bitmap::MemUsage as well, which cost 8 bytes per 64 entries.
   // TODO: Find the optimal prefetch batch size. This may be something
   // processor dependent so we may need calibration at Impala startup time.
-  capacity_ = std::max(1, std::min(state->batch_size(),
-      MAX_EXPR_VALUES_ARRAY_SIZE / expr_values_bytes_per_row_));
+  const int sample_size = 64;
+  double mem_per_row =
+      (double)MemUsage(sample_size, expr_values_bytes_per_row_, num_exprs_) / 
sample_size;
+  double max_capacity = (MAX_EXPR_VALUES_CACHE_BYTES - 8) / mem_per_row;
+  capacity_ = std::max(1, std::min(state->batch_size(), (int)max_capacity));
 
+  // TODO: Add 'mem_usage' into minimum reservation of PlanNode that use 
HashTableCtx?
   int mem_usage = MemUsage(capacity_, expr_values_bytes_per_row_, num_exprs_);
+  if (UNLIKELY(mem_usage > MAX_EXPR_VALUES_CACHE_BYTES)) {
+    LOG(WARNING) << "HashTableCtx::ExprValuesCache mem_usage (" << mem_usage
+                 << ") exceed MAX_EXPR_VALUES_CACHE_BYTES ("
+                 << MAX_EXPR_VALUES_CACHE_BYTES << "). capacity: " << 
capacity_;
+  }
   if (UNLIKELY(!tracker->TryConsume(mem_usage))) {
     capacity_ = 0;
     string details = Substitute("HashTableCtx::ExprValuesCache failed to 
allocate $0 bytes.",
diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h
index ac89da305..dfe7cc41a 100644
--- a/be/src/exec/hash-table.h
+++ b/be/src/exec/hash-table.h
@@ -377,7 +377,7 @@ class HashTableCtx {
     }
 
     /// Max amount of memory in bytes for caching evaluated expression values.
-    static const int MAX_EXPR_VALUES_ARRAY_SIZE = 256 << 10;
+    static const int MAX_EXPR_VALUES_CACHE_BYTES = 256 << 10;
 
     /// Maximum number of rows of expressions evaluation states which this
     /// ExprValuesCache can cache.
diff --git a/bin/rat_exclude_files.txt b/bin/rat_exclude_files.txt
index bcd7df3a1..0ebe532d9 100644
--- a/bin/rat_exclude_files.txt
+++ b/bin/rat_exclude_files.txt
@@ -219,6 +219,9 @@ tests/shell/test_file_comments.sql
 tests/shell/test_file_no_comments.sql
 tests/shell/test_var_substitution.sql
 
+# symlink to testdata/workloads/tpcds/queries
+testdata/workloads/tpcds_partitioned/queries
+
 # Generated by Apache-licensed software:
 be/src/transport/config.h
 
diff --git a/testdata/workloads/tpcds_partitioned/queries 
b/testdata/workloads/tpcds_partitioned/queries
new file mode 120000
index 000000000..c8ca6a2cd
--- /dev/null
+++ b/testdata/workloads/tpcds_partitioned/queries
@@ -0,0 +1 @@
+../tpcds/queries
\ No newline at end of file
diff --git a/tests/common/test_dimensions.py b/tests/common/test_dimensions.py
index aa2ce2db9..69ef44182 100644
--- a/tests/common/test_dimensions.py
+++ b/tests/common/test_dimensions.py
@@ -45,6 +45,7 @@ FILE_FORMAT_TO_STORED_AS_MAP = {
   'json': "JSONFILE",
 }
 
+
 # Describes the configuration used to execute a single tests. Contains both 
the details
 # of what specific table format to target along with the exec options 
(num_nodes, etc)
 # to use when running the query.
@@ -116,39 +117,34 @@ class TableFormatInfo(object):
       return '_%s_%s' % (self.file_format, self.compression_codec)
 
 
-def create_uncompressed_text_dimension(workload):
+def create_table_format_dimension(workload, table_format_string):
   dataset = get_dataset_from_workload(workload)
   return ImpalaTestDimension('table_format',
-      TableFormatInfo.create_from_string(dataset, 'text/none'))
+      TableFormatInfo.create_from_string(dataset, table_format_string))
+
+
+def create_uncompressed_text_dimension(workload):
+  return create_table_format_dimension(workload, 'text/none')
 
 
 def create_uncompressed_json_dimension(workload):
-  dataset = get_dataset_from_workload(workload)
-  return ImpalaTestDimension('table_format',
-      TableFormatInfo.create_from_string(dataset, 'json/none'))
+  return create_table_format_dimension(workload, 'json/none')
 
 
 def create_parquet_dimension(workload):
-  dataset = get_dataset_from_workload(workload)
-  return ImpalaTestDimension('table_format',
-      TableFormatInfo.create_from_string(dataset, 'parquet/none'))
+  return create_table_format_dimension(workload, 'parquet/none')
 
 
 def create_orc_dimension(workload):
-  dataset = get_dataset_from_workload(workload)
-  return ImpalaTestDimension('table_format',
-      TableFormatInfo.create_from_string(dataset, 'orc/def'))
+  return create_table_format_dimension(workload, 'orc/def')
+
 
 def create_avro_snappy_dimension(workload):
-  dataset = get_dataset_from_workload(workload)
-  return ImpalaTestDimension('table_format',
-      TableFormatInfo.create_from_string(dataset, 'avro/snap/block'))
+  return create_table_format_dimension(workload, 'avro/snap/block')
 
 
 def create_kudu_dimension(workload):
-  dataset = get_dataset_from_workload(workload)
-  return ImpalaTestDimension('table_format',
-      TableFormatInfo.create_from_string(dataset, 'kudu/none'))
+  return create_table_format_dimension(workload, 'kudu/none')
 
 
 def create_client_protocol_dimension():
@@ -200,6 +196,7 @@ def orc_schema_resolution_constraint(v):
   orc_schema_resolution = v.get_value('orc_schema_resolution')
   return file_format == 'orc' or orc_schema_resolution == 0
 
+
 # Common sets of values for the exec option vectors
 ALL_BATCH_SIZES = [0]
 
@@ -210,6 +207,7 @@ SINGLE_NODE_ONLY = [1]
 ALL_NODES_ONLY = [0]
 ALL_DISABLE_CODEGEN_OPTIONS = [True, False]
 
+
 def create_single_exec_option_dimension(num_nodes=0, 
disable_codegen_rows_threshold=5000):
   """Creates an exec_option dimension that will produce a single test vector"""
   return create_exec_option_dimension(cluster_sizes=[num_nodes],
@@ -243,6 +241,7 @@ def 
create_exec_option_dimension(cluster_sizes=ALL_NODES_ONLY,
     exec_option_dimensions['debug_action'] = debug_action_options
   return create_exec_option_dimension_from_dict(exec_option_dimensions)
 
+
 def create_exec_option_dimension_from_dict(exec_option_dimensions):
   """
   Builds a query exec option test dimension
@@ -297,11 +296,13 @@ def extend_exec_option_dimension(test_suite, key, value):
   dim.extend(new_value)
   test_suite.ImpalaTestMatrix.add_dimension(dim)
 
+
 def get_dataset_from_workload(workload):
   # TODO: We need a better way to define the workload -> dataset mapping so we 
can
   # extract it without reading the actual test vector file
   return load_table_info_dimension(workload, 'exhaustive')[0].value.dataset
 
+
 def load_table_info_dimension(workload_name, exploration_strategy, 
file_formats=None,
                       compression_codecs=None):
   """Loads test vector corresponding to the given workload and exploration 
strategy"""
@@ -319,7 +320,7 @@ def load_table_info_dimension(workload_name, 
exploration_strategy, file_formats=
         continue
 
       # Extract each test vector and add them to a dictionary
-      vals = dict((key.strip(), value.strip()) for key, value in\
+      vals = dict((key.strip(), value.strip()) for key, value in
           (item.split(':') for item in line.split(',')))
 
       # If only loading specific file formats skip anything that doesn't match
@@ -332,6 +333,7 @@ def load_table_info_dimension(workload_name, 
exploration_strategy, file_formats=
 
   return ImpalaTestDimension('table_format', *vector_values)
 
+
 def is_supported_insert_format(table_format):
   # Returns true if the given table_format is a supported Impala INSERT format
   return table_format.compression_codec == 'none' and\
diff --git a/tests/query_test/test_join_queries.py 
b/tests/query_test/test_join_queries.py
index 322617696..10939ab4f 100644
--- a/tests/query_test/test_join_queries.py
+++ b/tests/query_test/test_join_queries.py
@@ -24,6 +24,11 @@ from copy import deepcopy
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import SkipIf, SkipIfFS
 from tests.common.test_vector import ImpalaTestDimension
+from tests.common.test_dimensions import (
+    add_mandatory_exec_option,
+    create_single_exec_option_dimension,
+    create_table_format_dimension)
+
 
 class TestJoinQueries(ImpalaTestSuite):
   BATCH_SIZES = [0, 1]
@@ -217,3 +222,29 @@ class TestSpillingHashJoin(ImpalaTestSuite):
     self.run_test_case('QueryTest/create-tables-impala-13138', vector, 
unique_database)
     for i in range(0, 5):
       self.run_test_case('QueryTest/query-impala-13138', vector, 
unique_database)
+
+
+class TestExprValueCache(ImpalaTestSuite):
+  # Test that HashTableCtx::ExprValueCache memory usage stays under 256KB.
+  # Run TPC-DS Q97 with bare minimum memory limit, MT_DOP=1, and max 
BATCH_SIZE.
+  # Before IMPALA-13075, the test query will pass Planner and Admission 
Control,
+  # but later failed during backend execution due to memory limit exceeded.
+
+  @classmethod
+  def get_workload(cls):
+    return 'tpcds_partitioned'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestExprValueCache, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(
+        create_single_exec_option_dimension())
+    cls.ImpalaTestMatrix.add_dimension(
+        create_table_format_dimension(cls.get_workload(), 
'parquet/snap/block'))
+    add_mandatory_exec_option(cls, 'runtime_filter_mode', 'OFF')
+    add_mandatory_exec_option(cls, 'mem_limit', '149mb')
+    add_mandatory_exec_option(cls, 'mt_dop', 1)
+    add_mandatory_exec_option(cls, 'batch_size', 65536)
+
+  def test_expr_value_cache_fits(self, vector):
+    self.run_test_case('tpcds-q97', vector)

Reply via email to