This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 36640548e IMPALA-12545: Use 'unique_database' in 
test_restart_services.py::TestRestart
36640548e is described below

commit 36640548e6c8ca2638a170cbbb8b10af468003e9
Author: Daniel Becker <[email protected]>
AuthorDate: Mon Nov 6 16:21:32 2023 +0100

    IMPALA-12545: Use 'unique_database' in test_restart_services.py::TestRestart
    
    Some tests in 'test_restart_services.py::TestRestart' create tables but
    don't use a 'unique_database' for it. The table and column names are
    sometimes the same. The tests are run serially but if a table is not
    deleted successfully it may interfere with other tests.
    
    This change introduces 'unique_database' in tests that create tables.
    After this, explicitly deleting the tables is no longer necessary as the
    framework takes care of it.
    
    Testing:
     - all affected tests pass
    
    Change-Id: Id4c2cfb669d5ff9e4d8110a0035bae1147e2db5a
    Reviewed-on: http://gerrit.cloudera.org:8080/20662
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 tests/custom_cluster/test_restart_services.py | 91 ++++++++++++++-------------
 1 file changed, 46 insertions(+), 45 deletions(-)

diff --git a/tests/custom_cluster/test_restart_services.py 
b/tests/custom_cluster/test_restart_services.py
index 56533b2c8..cd0f76522 100644
--- a/tests/custom_cluster/test_restart_services.py
+++ b/tests/custom_cluster/test_restart_services.py
@@ -169,9 +169,10 @@ class TestRestart(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args(
     statestored_args="--statestore_update_frequency_ms=5000 "
                      "--statestore_heartbeat_frequency_ms=10000")
-  def test_restart_catalogd(self):
-    self.execute_query_expect_success(self.client, "drop table if exists 
join_aa")
-    self.execute_query_expect_success(self.client, "create table join_aa(id 
int)")
+  def test_restart_catalogd(self, unique_database):
+    tbl_name = unique_database + ".join_aa"
+    self.execute_query_expect_success(
+        self.client, "create table {}(id int)".format(tbl_name))
     # Make the catalog object version grow large enough
     self.execute_query_expect_success(self.client, "invalidate metadata")
 
@@ -179,7 +180,7 @@ class TestRestart(CustomClusterTestSuite):
     # the local catalog cache of impalad out of sync
     for i in range(0, 10):
       try:
-        query = "alter table join_aa add columns (age" + str(i) + " int)"
+        query = "alter table {} add columns (age{} int)".format(tbl_name, i)
         self.execute_query_async(query)
       except Exception as e:
         LOG.info(str(e))
@@ -187,9 +188,8 @@ class TestRestart(CustomClusterTestSuite):
         self.cluster.catalogd.restart()
 
     self.execute_query_expect_success(self.client,
-        "alter table join_aa add columns (name string)")
-    self.execute_query_expect_success(self.client, "select name from join_aa")
-    self.execute_query_expect_success(self.client, "drop table join_aa")
+        "alter table {} add columns (name string)".format(tbl_name))
+    self.execute_query_expect_success(self.client, "select name from 
{}".format(tbl_name))
 
   WAIT_FOR_CATALOG_UPDATE_TIMEOUT_SEC = 5
 
@@ -199,7 +199,8 @@ class TestRestart(CustomClusterTestSuite):
     impalad_args=("--wait_for_new_catalog_service_id_timeout_sec={} \
                   --wait_for_new_catalog_service_id_max_iterations=-1"
                   .format(WAIT_FOR_CATALOG_UPDATE_TIMEOUT_SEC)))
-  def test_restart_catalogd_while_handling_rpc_response_with_timeout(self):
+  def test_restart_catalogd_while_handling_rpc_response_with_timeout(self,
+      unique_database):
     """Regression test for IMPALA-12267. We'd like to cause a situation where
          - The coordinator issues a DDL or DML query
          - Catalogd sends a response RPC
@@ -209,8 +210,9 @@ class TestRestart(CustomClusterTestSuite):
     Before IMPALA-12267 the coordinator hung infinitely in this situation, 
waiting for a
     statestore update with a new catalog service ID assuming the service ID it 
had was
     stale, but it already had the most recent one."""
-    self.execute_query_expect_success(self.client, "drop table if exists 
join_aa")
-    self.execute_query_expect_success(self.client, "create table join_aa(id 
int)")
+    tbl_name = unique_database + ".handling_rpc_response_with_timeout"
+    self.execute_query_expect_success(
+        self.client, "create table {}(id int)".format(tbl_name))
     # Make the catalog object version grow large enough
     self.execute_query_expect_success(self.client, "invalidate metadata")
 
@@ -218,7 +220,7 @@ class TestRestart(CustomClusterTestSuite):
     DEBUG_ACTION = ("WAIT_BEFORE_PROCESSING_CATALOG_UPDATE:SLEEP@{}"
                     .format(debug_action_sleep_time_sec * 1000))
 
-    query = "alter table join_aa add columns (age" + " int)"
+    query = "alter table {} add columns (age int)".format(tbl_name)
     handle = self.execute_query_async(query, query_options={"debug_action": 
DEBUG_ACTION})
 
     # Wait a bit so the RPC from the catalogd arrives to the coordinator.
@@ -236,12 +238,11 @@ class TestRestart(CustomClusterTestSuite):
     self.assert_impalad_log_contains("WARNING",
         "Ignoring catalog update result of catalog service ID")
 
-    self.execute_query_expect_success(self.client, "select age from join_aa")
+    self.execute_query_expect_success(self.client, "select age from 
{}".format(tbl_name))
 
     self.execute_query_expect_success(self.client,
-        "alter table join_aa add columns (name string)")
-    self.execute_query_expect_success(self.client, "select name from join_aa")
-    self.execute_query_expect_success(self.client, "drop table join_aa")
+        "alter table {} add columns (name string)".format(tbl_name))
+    self.execute_query_expect_success(self.client, "select name from 
{}".format(tbl_name))
 
   WAIT_FOR_CATALOG_UPDATE_MAX_ITERATIONS = 3
   STATESTORE_UPDATE_FREQ_SEC = 2
@@ -253,14 +254,16 @@ class TestRestart(CustomClusterTestSuite):
     impalad_args=("--wait_for_new_catalog_service_id_timeout_sec=-1 \
                   --wait_for_new_catalog_service_id_max_iterations={}"
                   .format(WAIT_FOR_CATALOG_UPDATE_MAX_ITERATIONS)))
-  def test_restart_catalogd_while_handling_rpc_response_with_max_iters(self):
+  def test_restart_catalogd_while_handling_rpc_response_with_max_iters(self,
+      unique_database):
     """We create the same situation as described in
     'test_restart_catalogd_while_handling_rpc_response_with_timeout()' but we 
get out of
     it not by timing out but by giving up waiting after receiving
     'WAIT_FOR_CATALOG_UPDATE_MAX_ITERATIONS' updates from the statestore that 
don't change
     the catalog service ID."""
-    self.execute_query_expect_success(self.client, "drop table if exists 
join_aa")
-    self.execute_query_expect_success(self.client, "create table join_aa(id 
int)")
+    tbl_name = unique_database + ".handling_rpc_response_with_max_iters"
+    self.execute_query_expect_success(
+        self.client, "create table {}(id int)".format(tbl_name))
     # Make the catalog object version grow large enough
     self.execute_query_expect_success(self.client, "invalidate metadata")
 
@@ -268,7 +271,7 @@ class TestRestart(CustomClusterTestSuite):
     DEBUG_ACTION = ("WAIT_BEFORE_PROCESSING_CATALOG_UPDATE:SLEEP@{}"
                     .format(debug_action_sleep_time_sec * 1000))
 
-    query = "alter table join_aa add columns (age" + " int)"
+    query = "alter table {} add columns (age int)".format(tbl_name)
     handle = self.execute_query_async(query, query_options={"debug_action": 
DEBUG_ACTION})
 
     # Wait a bit so the RPC from the catalogd arrives to the coordinator.
@@ -283,7 +286,7 @@ class TestRestart(CustomClusterTestSuite):
     # Issue DML queries so that the coordinator receives catalog updates.
     for i in range(self.WAIT_FOR_CATALOG_UPDATE_MAX_ITERATIONS):
       try:
-        query = "alter table join_aa add columns (age" + str(i) + " int)"
+        query = "alter table {} add columns (age{} int)".format(tbl_name, i)
         self.execute_query_async(query)
         time.sleep(self.STATESTORE_UPDATE_FREQ_SEC)
       except Exception as e:
@@ -302,19 +305,19 @@ class TestRestart(CustomClusterTestSuite):
     self.assert_impalad_log_contains("WARNING",
         "Ignoring catalog update result of catalog service ID")
 
-    self.execute_query_expect_success(self.client, "select age from join_aa")
+    self.execute_query_expect_success(self.client, "select age from 
{}".format(tbl_name))
 
     self.execute_query_expect_success(self.client,
-        "alter table join_aa add columns (name string)")
-    self.execute_query_expect_success(self.client, "select name from join_aa")
-    self.execute_query_expect_success(self.client, "drop table join_aa")
+        "alter table {} add columns (name string)".format(tbl_name))
+    self.execute_query_expect_success(self.client, "select name from 
{}".format(tbl_name))
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
     statestored_args="--statestore_update_frequency_ms=5000")
-  def test_restart_catalogd_sync_ddl(self):
-    self.execute_query_expect_success(self.client, "drop table if exists 
join_aa")
-    self.execute_query_expect_success(self.client, "create table join_aa(id 
int)")
+  def test_restart_catalogd_sync_ddl(self, unique_database):
+    tbl_name = unique_database + ".join_aa"
+    self.execute_query_expect_success(
+        self.client, "create table {}(id int)".format(tbl_name))
     # Make the catalog object version grow large enough
     self.execute_query_expect_success(self.client, "invalidate metadata")
     query_options = {"sync_ddl": "true"}
@@ -323,7 +326,7 @@ class TestRestart(CustomClusterTestSuite):
     # the local catalog catche of impalad out of sync
     for i in range(0, 10):
       try:
-        query = "alter table join_aa add columns (age" + str(i) + " int)"
+        query = "alter table {} add columns (age{} int)".format(tbl_name, i)
         self.execute_query_async(query, query_options)
       except Exception as e:
         LOG.info(str(e))
@@ -331,9 +334,8 @@ class TestRestart(CustomClusterTestSuite):
         self.cluster.catalogd.restart()
 
     self.execute_query_expect_success(self.client,
-        "alter table join_aa add columns (name string)", query_options)
-    self.execute_query_expect_success(self.client, "select name from join_aa")
-    self.execute_query_expect_success(self.client, "drop table join_aa")
+        "alter table {} add columns (name string)".format(tbl_name), 
query_options)
+    self.execute_query_expect_success(self.client, "select name from 
{}".format(tbl_name))
 
   UPDATE_FREQUENCY_S = 10
 
@@ -341,10 +343,10 @@ class TestRestart(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args(
     statestored_args="--statestore_update_frequency_ms={frequency_ms}"
     .format(frequency_ms=(UPDATE_FREQUENCY_S * 1000)))
-  def test_restart_catalogd_twice(self):
-    self.execute_query_expect_success(self.client, "drop table if exists 
join_aa")
+  def test_restart_catalogd_twice(self, unique_database):
+    tbl_name = unique_database + ".join_aa"
     self.cluster.catalogd.restart()
-    query = "create table join_aa(id int)"
+    query = "create table {}(id int)".format(tbl_name)
     query_handle = []
 
     def execute_query_async():
@@ -356,18 +358,18 @@ class TestRestart(CustomClusterTestSuite):
     self.cluster.catalogd.restart()
     thread.join()
     self.execute_query_expect_success(self.client,
-        "alter table join_aa add columns (name string)")
-    self.execute_query_expect_success(self.client, "select name from join_aa")
-    self.execute_query_expect_success(self.client, "drop table join_aa")
+        "alter table {} add columns (name string)".format(tbl_name))
+    self.execute_query_expect_success(self.client, "select name from 
{}".format(tbl_name))
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args="--use_local_catalog=true",
       catalogd_args="--catalog_topic_mode=minimal",
       statestored_args="--statestore_update_frequency_ms=5000")
-  def test_restart_catalogd_with_local_catalog(self):
-    self.execute_query_expect_success(self.client, "drop table if exists 
join_aa")
-    self.execute_query_expect_success(self.client, "create table join_aa(id 
int)")
+  def test_restart_catalogd_with_local_catalog(self, unique_database):
+    tbl_name = unique_database + ".join_aa"
+    self.execute_query_expect_success(
+        self.client, "create table {}(id int)".format(tbl_name))
     # Make the catalog object version grow large enough
     self.execute_query_expect_success(self.client, "invalidate metadata")
 
@@ -375,7 +377,7 @@ class TestRestart(CustomClusterTestSuite):
     # the local catalog catche of impalad out of sync
     for i in range(0, 10):
       try:
-        query = "alter table join_aa add columns (age" + str(i) + " int)"
+        query = "alter table {} add columns (age{} int)".format(tbl_name, i)
         self.execute_query_async(query)
       except Exception as e:
         LOG.info(str(e))
@@ -383,10 +385,9 @@ class TestRestart(CustomClusterTestSuite):
         self.cluster.catalogd.restart()
 
     self.execute_query_expect_success(self.client,
-        "alter table join_aa add columns (name string)")
-    self.execute_query_expect_success(self.client, "select name from join_aa")
-    self.execute_query_expect_success(self.client, "select age0 from join_aa")
-    self.execute_query_expect_success(self.client, "drop table join_aa")
+        "alter table {} add columns (name string)".format(tbl_name))
+    self.execute_query_expect_success(self.client, "select name from 
{}".format(tbl_name))
+    self.execute_query_expect_success(self.client, "select age0 from 
{}".format(tbl_name))
 
   SUBSCRIBER_TIMEOUT_S = 2
   CANCELLATION_GRACE_PERIOD_S = 5

Reply via email to