This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 6642b75ef IMPALA-13402: Clean up test_tuple_cache dimensions
6642b75ef is described below

commit 6642b75efc3a2535d31e25ec6de8e4673b4ee7c1
Author: Michael Smith <[email protected]>
AuthorDate: Wed Sep 25 15:24:45 2024 -0700

    IMPALA-13402: Clean up test_tuple_cache dimensions
    
    Uses exec_option_dimension to specify exec options, and avoids starting
    a cluster when the test would just be skipped.
    
    Uses other standard helpers to replace custom methods that were less
    flexible.
    
    Change-Id: Ib241f1f1cfaf918dffaddd5aeef3884c70e0a3fb
    Reviewed-on: http://gerrit.cloudera.org:8080/21859
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 tests/custom_cluster/test_tuple_cache.py | 154 +++++++++++++++----------------
 1 file changed, 77 insertions(+), 77 deletions(-)

diff --git a/tests/custom_cluster/test_tuple_cache.py 
b/tests/custom_cluster/test_tuple_cache.py
index df5a3aea3..af30201f7 100644
--- a/tests/custom_cluster/test_tuple_cache.py
+++ b/tests/custom_cluster/test_tuple_cache.py
@@ -22,7 +22,8 @@ import random
 import string
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
-from tests.common.test_vector import ImpalaTestDimension
+from tests.common.test_dimensions import (
+    add_exec_option_dimension, add_mandatory_exec_option)
 
 TABLE_LAYOUT = 'name STRING, age INT, address STRING'
 CACHE_START_ARGS = "--tuple_cache_dir=/tmp --log_level=2"
@@ -53,8 +54,9 @@ def get_cache_keys(profile):
   return cache_keys
 
 
-def assert_deterministic_scan(profile):
-  assert "deterministic scan range assignment: true" in profile
+def assert_deterministic_scan(vector, profile):
+  if vector.get_value('exec_option')['mt_dop'] > 0:
+    assert "deterministic scan range assignment: true" in profile
 
 
 class TestTupleCacheBase(CustomClusterTestSuite):
@@ -62,26 +64,19 @@ class TestTupleCacheBase(CustomClusterTestSuite):
   def get_workload(cls):
     return 'functional-query'
 
-  def cached_query(self, query, mt_dop=1):
-    return self.execute_query(query,
-        {"ENABLE_TUPLE_CACHE": "TRUE", "MT_DOP": str(mt_dop)})
-
-  def cached_query_w_debugaction(self, query, debugaction):
-    query_opts = {
-      "ENABLE_TUPLE_CACHE": "TRUE",
-      "MT_DOP": "1",
-      "DEBUG_ACTION": debugaction
-    }
-    return self.execute_query(query, query_opts)
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestTupleCacheBase, cls).add_test_dimensions()
+    add_mandatory_exec_option(cls, 'enable_tuple_cache', 'true')
 
   # Generates a table containing at least <scale> KB of data.
   def create_table(self, fq_table, scale=1):
-    self.cached_query("CREATE TABLE {0} ({1})".format(fq_table, TABLE_LAYOUT))
+    self.execute_query("CREATE TABLE {0} ({1})".format(fq_table, TABLE_LAYOUT))
     # To make the rows distinct, we keep using a different seed for table_value
     global_index = 0
     for _ in range(scale):
       values = [table_value(i) for i in range(global_index, global_index + 70)]
-      self.cached_query("INSERT INTO {0} VALUES ({1})".format(
+      self.execute_query("INSERT INTO {0} VALUES ({1})".format(
           fq_table, "), (".join(values)))
       global_index += 70
 
@@ -91,14 +86,19 @@ class TestTupleCacheBase(CustomClusterTestSuite):
 
 
 class TestTupleCache(TestTupleCacheBase):
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestTupleCache, cls).add_test_dimensions()
+    add_mandatory_exec_option(cls, 'mt_dop', 1)
 
   @CustomClusterTestSuite.with_args(cluster_size=1)
   @pytest.mark.execute_serially
   def test_cache_disabled(self, vector, unique_database):
+    self.client.set_configuration(vector.get_value('exec_option'))
     fq_table = "{0}.cache_disabled".format(unique_database)
     self.create_table(fq_table)
-    result1 = self.cached_query("SELECT * from {0}".format(fq_table))
-    result2 = self.cached_query("SELECT * from {0}".format(fq_table))
+    result1 = self.execute_query("SELECT * from {0}".format(fq_table))
+    result2 = self.execute_query("SELECT * from {0}".format(fq_table))
 
     assert result1.success
     assert result2.success
@@ -110,10 +110,11 @@ class TestTupleCache(TestTupleCacheBase):
       start_args=CACHE_START_ARGS, cluster_size=1)
   @pytest.mark.execute_serially
   def test_create_and_select(self, vector, unique_database):
+    self.client.set_configuration(vector.get_value('exec_option'))
     fq_table = "{0}.create_and_select".format(unique_database)
     self.create_table(fq_table)
-    result1 = self.cached_query("SELECT * from {0}".format(fq_table))
-    result2 = self.cached_query("SELECT * from {0}".format(fq_table))
+    result1 = self.execute_query("SELECT * from {0}".format(fq_table))
+    result2 = self.execute_query("SELECT * from {0}".format(fq_table))
 
     assert result1.success
     assert result2.success
@@ -125,12 +126,13 @@ class TestTupleCache(TestTupleCacheBase):
       start_args=CACHE_START_ARGS + " --tuple_cache_capacity=64MB", 
cluster_size=1,
       impalad_args="--cache_force_single_shard")
   @pytest.mark.execute_serially
-  def test_cache_halted_select(self, vector, unique_database):
+  def test_cache_halted_select(self, vector):
     # The cache is set to the minimum cache size, so run a SQL that produces 
enough
     # data to exceed the cache size and halt caching.
+    self.client.set_configuration(vector.get_value('exec_option'))
     big_enough_query = "SELECT o_comment from tpch.orders"
-    result1 = self.cached_query(big_enough_query)
-    result2 = self.cached_query(big_enough_query)
+    result1 = self.execute_query(big_enough_query)
+    result2 = self.execute_query(big_enough_query)
 
     assert result1.success
     assert result2.success
@@ -142,37 +144,42 @@ class TestTupleCache(TestTupleCacheBase):
     start_args=CACHE_START_ARGS, cluster_size=1)
   @pytest.mark.execute_serially
   def test_failpoints(self, vector, unique_database):
-    fq_table = "{0}.create_and_select".format(unique_database)
+    fq_table = "{0}.failpoints".format(unique_database)
     # Scale 20 gets us enough rows to force multiple RowBatches (needed for the
     # the reader GetNext() cases).
     self.create_table(fq_table, scale=20)
     query = "SELECT * from {0}".format(fq_table)
 
+    def execute_debug(query, action):
+      exec_options = dict(vector.get_value('exec_option'))
+      exec_options['debug_action'] = action
+      return self.execute_query(query, exec_options)
+
     # Fail when writing cache entry. All of these are handled and will not 
fail the
     # query.
     # Case 1: fail during Open()
-    result = self.cached_query_w_debugaction(query, 
"TUPLE_FILE_WRITER_OPEN:[email protected]")
+    result = execute_debug(query, "TUPLE_FILE_WRITER_OPEN:[email protected]")
     assert result.success
     assertCounters(result.runtime_profile, num_hits=0, num_halted=0, 
num_skipped=1)
 
     # Case 2: fail during Write()
-    result = self.cached_query_w_debugaction(query, 
"TUPLE_FILE_WRITER_WRITE:[email protected]")
+    result = execute_debug(query, "TUPLE_FILE_WRITER_WRITE:[email protected]")
     assert result.success
     assertCounters(result.runtime_profile, num_hits=0, num_halted=0, 
num_skipped=0)
 
     # Case 3: fail during Commit()
-    result = self.cached_query_w_debugaction(query, 
"TUPLE_FILE_WRITER_COMMIT:[email protected]")
+    result = execute_debug(query, "TUPLE_FILE_WRITER_COMMIT:[email protected]")
     assert result.success
     assertCounters(result.runtime_profile, num_hits=0, num_halted=0, 
num_skipped=0)
 
     # Now, successfully add a cache entry
-    result1 = self.cached_query(query)
+    result1 = self.execute_query(query, vector.get_value('exec_option'))
     assert result1.success
     assertCounters(result1.runtime_profile, num_hits=0, num_halted=0, 
num_skipped=0)
 
     # Fail when reading a cache entry
     # Case 1: fail during Open()
-    result = self.cached_query_w_debugaction(query, 
"TUPLE_FILE_READER_OPEN:[email protected]")
+    result = execute_debug(query, "TUPLE_FILE_READER_OPEN:[email protected]")
     assert result.success
     # Do an unordered compare (the rows are unique)
     assert set(result.data) == set(result1.data)
@@ -180,8 +187,7 @@ class TestTupleCache(TestTupleCacheBase):
     assertCounters(result.runtime_profile, num_hits=0, num_halted=0, 
num_skipped=1)
 
     # Case 2: fail during the first GetNext() call
-    result = self.cached_query_w_debugaction(query,
-        "TUPLE_FILE_READER_FIRST_GETNEXT:[email protected]")
+    result = execute_debug(query, "TUPLE_FILE_READER_FIRST_GETNEXT:[email protected]")
     assert result.success
     # Do an unordered compare (the rows are unique)
     assert set(result.data) == set(result1.data)
@@ -193,20 +199,20 @@ class TestTupleCache(TestTupleCacheBase):
     # has already returned cached rows
     hit_error = False
     try:
-      result = self.cached_query_w_debugaction(query,
-          "TUPLE_FILE_READER_SECOND_GETNEXT:[email protected]")
+      result = execute_debug(query, 
"TUPLE_FILE_READER_SECOND_GETNEXT:[email protected]")
     except Exception:
       hit_error = True
 
     assert hit_error
 
 
-class TestTupleCacheRuntimeKeys(TestTupleCacheBase):
+class TestTupleCacheRuntimeKeysBasic(TestTupleCacheBase):
+  """Simpler tests that run on a single node with mt_dop=0 or mt_dop=1."""
 
   @classmethod
   def add_test_dimensions(cls):
-    super(TestTupleCacheRuntimeKeys, cls).add_test_dimensions()
-    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('mt_dop', *[0, 1, 
2]))
+    super(TestTupleCacheRuntimeKeysBasic, cls).add_test_dimensions()
+    add_exec_option_dimension(cls, 'mt_dop', [0, 1])
 
   @CustomClusterTestSuite.with_args(
     start_args=CACHE_START_ARGS, cluster_size=1)
@@ -214,14 +220,9 @@ class TestTupleCacheRuntimeKeys(TestTupleCacheBase):
   def test_scan_range_basics(self, vector, unique_database):
     """
     This tests that adding/removing files to a table results in different keys.
-    This runs on a single node with mt_dop=0 or mt_dop=1, so it is the simplest
-    test.
     """
-    mt_dop = vector.get_value('mt_dop')
-    # To keep this simple, we skip mt_dop > 1.
-    if mt_dop > 1:
-      pytest.skip()
-    fq_table = "{0}.scan_range_basics_mtdop{1}".format(unique_database, mt_dop)
+    self.client.set_configuration(vector.get_value('exec_option'))
+    fq_table = "{0}.scan_range_basics".format(unique_database)
     query = "SELECT * from {0}".format(fq_table)
 
     # Create an empty table
@@ -230,21 +231,20 @@ class TestTupleCacheRuntimeKeys(TestTupleCacheBase):
     # When there are no scan ranges, then fragment instance key is 0. This is
     # somewhat of a toy case and we probably want to avoid caching in this
     # case. Nonetheless, it is a good sanity check.
-    empty_result = self.cached_query(query, mt_dop=mt_dop)
+    empty_result = self.execute_query(query)
     cache_keys = get_cache_keys(empty_result.runtime_profile)
     assert len(cache_keys) == 1
     empty_table_compile_key, empty_table_finst_key = cache_keys[0].split("_")
     assert empty_table_finst_key == "0"
     assert len(empty_result.data) == 0
-    if mt_dop > 0:
-      assert_deterministic_scan(empty_result.runtime_profile)
+    assert_deterministic_scan(vector, empty_result.runtime_profile)
 
     # Insert a row, which creates a file / scan range
-    self.cached_query("INSERT INTO {0} VALUES ({1})".format(
+    self.execute_query("INSERT INTO {0} VALUES ({1})".format(
         fq_table, table_value(0)))
 
     # Now, there is a scan range, so the fragment instance key should be 
non-zero.
-    one_file_result = self.cached_query(query, mt_dop=mt_dop)
+    one_file_result = self.execute_query(query)
     cache_keys = get_cache_keys(one_file_result.runtime_profile)
     assert len(cache_keys) == 1
     one_file_compile_key, one_file_finst_key = cache_keys[0].split("_")
@@ -252,18 +252,17 @@ class TestTupleCacheRuntimeKeys(TestTupleCacheBase):
     # This should be a cache miss
     assertCounters(one_file_result.runtime_profile, 0, 0, 0)
     assert len(one_file_result.data) == 1
-    if mt_dop > 0:
-      assert_deterministic_scan(one_file_result.runtime_profile)
+    assert_deterministic_scan(vector, one_file_result.runtime_profile)
 
     # The new scan range did not change the compile-time key
     assert empty_table_compile_key == one_file_compile_key
 
     # Insert another row, which creates a file / scan range
-    self.cached_query("INSERT INTO {0} VALUES ({1})".format(
+    self.execute_query("INSERT INTO {0} VALUES ({1})".format(
         fq_table, table_value(1)))
 
     # There is a second scan range, so the fragment instance key should change 
again
-    two_files_result = self.cached_query(query, mt_dop=mt_dop)
+    two_files_result = self.execute_query(query)
     cache_keys = get_cache_keys(two_files_result.runtime_profile)
     assert len(cache_keys) == 1
     two_files_compile_key, two_files_finst_key = cache_keys[0].split("_")
@@ -273,15 +272,14 @@ class TestTupleCacheRuntimeKeys(TestTupleCacheBase):
     assert one_file_finst_key != two_files_finst_key
     overlapping_rows = 
set(one_file_result.data).intersection(set(two_files_result.data))
     assert len(overlapping_rows) == 1
-    if mt_dop > 0:
-      assert_deterministic_scan(two_files_result.runtime_profile)
+    assert_deterministic_scan(vector, two_files_result.runtime_profile)
 
     # The new scan range did not change the compile-time key
     assert one_file_compile_key == two_files_compile_key
 
     # Invalidate metadata and rerun the last query. The keys should stay the 
same.
-    self.cached_query("invalidate metadata")
-    rerun_two_files_result = self.cached_query(query, mt_dop=mt_dop)
+    self.execute_query("invalidate metadata")
+    rerun_two_files_result = self.execute_query(query)
     # Verify that this is a cache hit
     assertCounters(rerun_two_files_result.runtime_profile, 1, 0, 0)
     cache_keys = get_cache_keys(rerun_two_files_result.runtime_profile)
@@ -294,24 +292,20 @@ class TestTupleCacheRuntimeKeys(TestTupleCacheBase):
   @CustomClusterTestSuite.with_args(
     start_args=CACHE_START_ARGS, cluster_size=1)
   @pytest.mark.execute_serially
-  def test_scan_range_partitioned(self, vector, unique_database):
+  def test_scan_range_partitioned(self, vector):
     """
     This tests a basic partitioned case where the query is identical except 
that
-    it operates on different partitions (and thus different scan ranges)
-    This runs on a single node with mt_dop=0 or mt_dop=1 to keep it simple.
+    it operates on different partitions (and thus different scan ranges).
     """
-    mt_dop = vector.get_value('mt_dop')
-    # To keep this simple, we skip mt_dop > 1.
-    if mt_dop > 1:
-      pytest.skip()
-    year2009_result = self.cached_query(
-        "select * from functional.alltypes where year=2009", mt_dop=mt_dop)
+    self.client.set_configuration(vector.get_value('exec_option'))
+    year2009_result = self.execute_query(
+        "select * from functional.alltypes where year=2009")
     cache_keys = get_cache_keys(year2009_result.runtime_profile)
     assert len(cache_keys) == 1
     year2009_compile_key, year2009_finst_key = cache_keys[0].split("_")
 
-    year2010_result = self.cached_query(
-        "select * from functional.alltypes where year=2010", mt_dop=mt_dop)
+    year2010_result = self.execute_query(
+        "select * from functional.alltypes where year=2010")
     cache_keys = get_cache_keys(year2010_result.runtime_profile)
     assert len(cache_keys) == 1
     year2010_compile_key, year2010_finst_key = cache_keys[0].split("_")
@@ -334,6 +328,14 @@ class TestTupleCacheRuntimeKeys(TestTupleCacheBase):
     assert year2010_result.data[0].find("2010") != -1
     assert year2010_result.data[0].find("2009") == -1
 
+
+class TestTupleCacheRuntimeKeys(TestTupleCacheBase):
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestTupleCacheRuntimeKeys, cls).add_test_dimensions()
+    add_exec_option_dimension(cls, 'mt_dop', [0, 1, 2])
+
   @CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS)
   @pytest.mark.execute_serially
   def test_scan_range_distributed(self, vector, unique_database):
@@ -343,9 +345,9 @@ class TestTupleCacheRuntimeKeys(TestTupleCacheBase):
     distinct cache key. When adding a scan range, at least one fragment 
instance
     cache key should change.
     """
-
-    mt_dop = vector.get_value('mt_dop')
-    fq_table = "{0}.scan_range_basics_mtdop{1}".format(unique_database, mt_dop)
+    self.client.set_configuration(vector.get_value('exec_option'))
+    mt_dop = vector.get_value('exec_option')['mt_dop']
+    fq_table = "{0}.scan_range_distributed".format(unique_database)
     query = "SELECT * from {0}".format(fq_table)
 
     # Create a table with several files so that we always have enough work for 
multiple
@@ -354,7 +356,7 @@ class TestTupleCacheRuntimeKeys(TestTupleCacheBase):
 
     # We run a simple select. This is running with multiple impalads, so there 
are
     # always multiple fragment instances
-    before_result = self.cached_query(query, mt_dop=mt_dop)
+    before_result = self.execute_query(query)
     cache_keys = get_cache_keys(before_result.runtime_profile)
     expected_num_keys = 3 * max(mt_dop, 1)
     assert len(cache_keys) == expected_num_keys
@@ -370,17 +372,16 @@ class TestTupleCacheRuntimeKeys(TestTupleCacheBase):
     for impalad in self.cluster.impalads:
       entries_in_use = self.get_tuple_cache_metric(impalad.service, 
"entries-in-use")
       assert entries_in_use == max(mt_dop, 1)
-    if mt_dop > 0:
-      assert_deterministic_scan(before_result.runtime_profile)
+    assert_deterministic_scan(vector, before_result.runtime_profile)
 
     # Insert another row, which creates a file / scan range
     # This uses a very large seed for table_value() to get a unique row that 
isn't
     # already in the table.
-    self.cached_query("INSERT INTO {0} VALUES ({1})".format(
+    self.execute_query("INSERT INTO {0} VALUES ({1})".format(
         fq_table, table_value(1000000)))
 
     # Rerun the query with the extra scan range
-    after_insert_result = self.cached_query(query, mt_dop=mt_dop)
+    after_insert_result = self.execute_query(query)
     cache_keys = get_cache_keys(after_insert_result.runtime_profile)
     expected_num_keys = 3 * max(mt_dop, 1)
     assert len(cache_keys) == expected_num_keys
@@ -409,8 +410,7 @@ class TestTupleCacheRuntimeKeys(TestTupleCacheBase):
       assert entries_in_use <= (2 * max(mt_dop, 1))
       total_entries_in_use += entries_in_use
     assert total_entries_in_use >= len(all_cache_keys)
-    if mt_dop > 0:
-      assert_deterministic_scan(after_insert_result.runtime_profile)
+    assert_deterministic_scan(vector, after_insert_result.runtime_profile)
 
     # The extra scan range means that at least one fragment instance key 
changed
     # Since scheduling can change completely with the addition of a single 
scan range,

Reply via email to