This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new b1320bd1d IMPALA-13075: Cap memory usage for ExprValuesCache at 256KB
b1320bd1d is described below
commit b1320bd1d646eba3f044ef647b7d4497487d4674
Author: Riza Suminto <[email protected]>
AuthorDate: Thu May 23 11:19:33 2024 -0700
IMPALA-13075: Cap memory usage for ExprValuesCache at 256KB
ExprValuesCache uses BATCH_SIZE as a deciding factor to set its
capacity. It bounds the capacity such that expr_values_array_ memory
usage stays below 256KB. This patch tightens that limit to include all
memory usage from ExprValuesCache::MemUsage() instead of
expr_values_array_ only. Therefore, setting a very high BATCH_SIZE will
not push the total memory usage of ExprValuesCache beyond 256KB.
Simplify table dimension creation methods and fix few flake8 warnings in
test_dimensions.py.
Testing:
- Add test_join_queries.py::TestExprValueCache.
- Pass core tests.
Change-Id: Iee27cbbe8d3100301d05a6516b62c45975a8d0e0
Reviewed-on: http://gerrit.cloudera.org:8080/21455
Reviewed-by: Riza Suminto <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/exec/hash-table.cc | 17 ++++++++++---
be/src/exec/hash-table.h | 2 +-
bin/rat_exclude_files.txt | 3 +++
testdata/workloads/tpcds_partitioned/queries | 1 +
tests/common/test_dimensions.py | 38 +++++++++++++++-------------
tests/query_test/test_join_queries.py | 31 +++++++++++++++++++++++
6 files changed, 70 insertions(+), 22 deletions(-)
diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc
index c8576e335..1de667a52 100644
--- a/be/src/exec/hash-table.cc
+++ b/be/src/exec/hash-table.cc
@@ -366,13 +366,24 @@ Status HashTableCtx::ExprValuesCache::Init(RuntimeState*
state, MemTracker* trac
return Status::OK();
}
DCHECK_GT(expr_values_bytes_per_row_, 0);
- // Compute the maximum number of cached rows which can fit in the memory
budget.
+ // Compute the maximum number of cached rows which can fit in the memory
budget,
+ // which is 256KB (MAX_EXPR_VALUES_CACHE_BYTES). 'sample_size' is 64 because
MemUsage
+ // account for Bitmap::MemUsage as well, which cost 8 bytes per 64 entries.
// TODO: Find the optimal prefetch batch size. This may be something
// processor dependent so we may need calibration at Impala startup time.
- capacity_ = std::max(1, std::min(state->batch_size(),
- MAX_EXPR_VALUES_ARRAY_SIZE / expr_values_bytes_per_row_));
+ const int sample_size = 64;
+ double mem_per_row =
+ (double)MemUsage(sample_size, expr_values_bytes_per_row_, num_exprs_) /
sample_size;
+ double max_capacity = (MAX_EXPR_VALUES_CACHE_BYTES - 8) / mem_per_row;
+ capacity_ = std::max(1, std::min(state->batch_size(), (int)max_capacity));
+ // TODO: Add 'mem_usage' into minimum reservation of PlanNode that use
HashTableCtx?
int mem_usage = MemUsage(capacity_, expr_values_bytes_per_row_, num_exprs_);
+ if (UNLIKELY(mem_usage > MAX_EXPR_VALUES_CACHE_BYTES)) {
+ LOG(WARNING) << "HashTableCtx::ExprValuesCache mem_usage (" << mem_usage
+ << ") exceed MAX_EXPR_VALUES_CACHE_BYTES ("
+ << MAX_EXPR_VALUES_CACHE_BYTES << "). capacity: " <<
capacity_;
+ }
if (UNLIKELY(!tracker->TryConsume(mem_usage))) {
capacity_ = 0;
string details = Substitute("HashTableCtx::ExprValuesCache failed to
allocate $0 bytes.",
diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h
index ac89da305..dfe7cc41a 100644
--- a/be/src/exec/hash-table.h
+++ b/be/src/exec/hash-table.h
@@ -377,7 +377,7 @@ class HashTableCtx {
}
/// Max amount of memory in bytes for caching evaluated expression values.
- static const int MAX_EXPR_VALUES_ARRAY_SIZE = 256 << 10;
+ static const int MAX_EXPR_VALUES_CACHE_BYTES = 256 << 10;
/// Maximum number of rows of expressions evaluation states which this
/// ExprValuesCache can cache.
diff --git a/bin/rat_exclude_files.txt b/bin/rat_exclude_files.txt
index bcd7df3a1..0ebe532d9 100644
--- a/bin/rat_exclude_files.txt
+++ b/bin/rat_exclude_files.txt
@@ -219,6 +219,9 @@ tests/shell/test_file_comments.sql
tests/shell/test_file_no_comments.sql
tests/shell/test_var_substitution.sql
+# symlink to testdata/workloads/tpcds/queries
+testdata/workloads/tpcds_partitioned/queries
+
# Generated by Apache-licensed software:
be/src/transport/config.h
diff --git a/testdata/workloads/tpcds_partitioned/queries
b/testdata/workloads/tpcds_partitioned/queries
new file mode 120000
index 000000000..c8ca6a2cd
--- /dev/null
+++ b/testdata/workloads/tpcds_partitioned/queries
@@ -0,0 +1 @@
+../tpcds/queries
\ No newline at end of file
diff --git a/tests/common/test_dimensions.py b/tests/common/test_dimensions.py
index aa2ce2db9..69ef44182 100644
--- a/tests/common/test_dimensions.py
+++ b/tests/common/test_dimensions.py
@@ -45,6 +45,7 @@ FILE_FORMAT_TO_STORED_AS_MAP = {
'json': "JSONFILE",
}
+
# Describes the configuration used to execute a single tests. Contains both
the details
# of what specific table format to target along with the exec options
(num_nodes, etc)
# to use when running the query.
@@ -116,39 +117,34 @@ class TableFormatInfo(object):
return '_%s_%s' % (self.file_format, self.compression_codec)
-def create_uncompressed_text_dimension(workload):
+def create_table_format_dimension(workload, table_format_string):
dataset = get_dataset_from_workload(workload)
return ImpalaTestDimension('table_format',
- TableFormatInfo.create_from_string(dataset, 'text/none'))
+ TableFormatInfo.create_from_string(dataset, table_format_string))
+
+
+def create_uncompressed_text_dimension(workload):
+ return create_table_format_dimension(workload, 'text/none')
def create_uncompressed_json_dimension(workload):
- dataset = get_dataset_from_workload(workload)
- return ImpalaTestDimension('table_format',
- TableFormatInfo.create_from_string(dataset, 'json/none'))
+ return create_table_format_dimension(workload, 'json/none')
def create_parquet_dimension(workload):
- dataset = get_dataset_from_workload(workload)
- return ImpalaTestDimension('table_format',
- TableFormatInfo.create_from_string(dataset, 'parquet/none'))
+ return create_table_format_dimension(workload, 'parquet/none')
def create_orc_dimension(workload):
- dataset = get_dataset_from_workload(workload)
- return ImpalaTestDimension('table_format',
- TableFormatInfo.create_from_string(dataset, 'orc/def'))
+ return create_table_format_dimension(workload, 'orc/def')
+
def create_avro_snappy_dimension(workload):
- dataset = get_dataset_from_workload(workload)
- return ImpalaTestDimension('table_format',
- TableFormatInfo.create_from_string(dataset, 'avro/snap/block'))
+ return create_table_format_dimension(workload, 'avro/snap/block')
def create_kudu_dimension(workload):
- dataset = get_dataset_from_workload(workload)
- return ImpalaTestDimension('table_format',
- TableFormatInfo.create_from_string(dataset, 'kudu/none'))
+ return create_table_format_dimension(workload, 'kudu/none')
def create_client_protocol_dimension():
@@ -200,6 +196,7 @@ def orc_schema_resolution_constraint(v):
orc_schema_resolution = v.get_value('orc_schema_resolution')
return file_format == 'orc' or orc_schema_resolution == 0
+
# Common sets of values for the exec option vectors
ALL_BATCH_SIZES = [0]
@@ -210,6 +207,7 @@ SINGLE_NODE_ONLY = [1]
ALL_NODES_ONLY = [0]
ALL_DISABLE_CODEGEN_OPTIONS = [True, False]
+
def create_single_exec_option_dimension(num_nodes=0,
disable_codegen_rows_threshold=5000):
"""Creates an exec_option dimension that will produce a single test vector"""
return create_exec_option_dimension(cluster_sizes=[num_nodes],
@@ -243,6 +241,7 @@ def
create_exec_option_dimension(cluster_sizes=ALL_NODES_ONLY,
exec_option_dimensions['debug_action'] = debug_action_options
return create_exec_option_dimension_from_dict(exec_option_dimensions)
+
def create_exec_option_dimension_from_dict(exec_option_dimensions):
"""
Builds a query exec option test dimension
@@ -297,11 +296,13 @@ def extend_exec_option_dimension(test_suite, key, value):
dim.extend(new_value)
test_suite.ImpalaTestMatrix.add_dimension(dim)
+
def get_dataset_from_workload(workload):
# TODO: We need a better way to define the workload -> dataset mapping so we
can
# extract it without reading the actual test vector file
return load_table_info_dimension(workload, 'exhaustive')[0].value.dataset
+
def load_table_info_dimension(workload_name, exploration_strategy,
file_formats=None,
compression_codecs=None):
"""Loads test vector corresponding to the given workload and exploration
strategy"""
@@ -319,7 +320,7 @@ def load_table_info_dimension(workload_name,
exploration_strategy, file_formats=
continue
# Extract each test vector and add them to a dictionary
- vals = dict((key.strip(), value.strip()) for key, value in\
+ vals = dict((key.strip(), value.strip()) for key, value in
(item.split(':') for item in line.split(',')))
# If only loading specific file formats skip anything that doesn't match
@@ -332,6 +333,7 @@ def load_table_info_dimension(workload_name,
exploration_strategy, file_formats=
return ImpalaTestDimension('table_format', *vector_values)
+
def is_supported_insert_format(table_format):
# Returns true if the given table_format is a supported Impala INSERT format
return table_format.compression_codec == 'none' and\
diff --git a/tests/query_test/test_join_queries.py
b/tests/query_test/test_join_queries.py
index 322617696..10939ab4f 100644
--- a/tests/query_test/test_join_queries.py
+++ b/tests/query_test/test_join_queries.py
@@ -24,6 +24,11 @@ from copy import deepcopy
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.skip import SkipIf, SkipIfFS
from tests.common.test_vector import ImpalaTestDimension
+from tests.common.test_dimensions import (
+ add_mandatory_exec_option,
+ create_single_exec_option_dimension,
+ create_table_format_dimension)
+
class TestJoinQueries(ImpalaTestSuite):
BATCH_SIZES = [0, 1]
@@ -217,3 +222,29 @@ class TestSpillingHashJoin(ImpalaTestSuite):
self.run_test_case('QueryTest/create-tables-impala-13138', vector,
unique_database)
for i in range(0, 5):
self.run_test_case('QueryTest/query-impala-13138', vector,
unique_database)
+
+
+class TestExprValueCache(ImpalaTestSuite):
+ # Test that HashTableCtx::ExprValueCache memory usage stays under 256KB.
+ # Run TPC-DS Q97 with bare minimum memory limit, MT_DOP=1, and max
BATCH_SIZE.
+ # Before IMPALA-13075, the test query will pass Planner and Admission
Control,
+ # but later failed during backend execution due to memory limit exceeded.
+
+ @classmethod
+ def get_workload(cls):
+ return 'tpcds_partitioned'
+
+ @classmethod
+ def add_test_dimensions(cls):
+ super(TestExprValueCache, cls).add_test_dimensions()
+ cls.ImpalaTestMatrix.add_dimension(
+ create_single_exec_option_dimension())
+ cls.ImpalaTestMatrix.add_dimension(
+ create_table_format_dimension(cls.get_workload(),
'parquet/snap/block'))
+ add_mandatory_exec_option(cls, 'runtime_filter_mode', 'OFF')
+ add_mandatory_exec_option(cls, 'mem_limit', '149mb')
+ add_mandatory_exec_option(cls, 'mt_dop', 1)
+ add_mandatory_exec_option(cls, 'batch_size', 65536)
+
+ def test_expr_value_cache_fits(self, vector):
+ self.run_test_case('tpcds-q97', vector)