This is an automated email from the ASF dual-hosted git repository.

jasonmfehr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 28cff4022dac5224532b9f4afb2a69799d47ba24
Author: Riza Suminto <riza.sumi...@cloudera.com>
AuthorDate: Sun Aug 17 10:11:59 2025 -0700

    IMPALA-14333: Run impala-py.test using Python3
    
    Running exhaustive tests with env var IMPALA_USE_PYTHON3_TESTS=true
    reveals some tests that require adjustment. This patch made such
    adjustment, which mostly revolves around encoding differences and string
    vs bytes type in Python3. This patch also switch the default to run
    pytest with Python3 by setting IMPALA_USE_PYTHON3_TESTS=true. The
    following are the details:
    
    Change hash() function in conftest.py to crc32() to produce
    deterministic hash. Hash randomization is enabled by default since
    Python 3.3 (see
    https://docs.python.org/3/reference/datamodel.html#object.__hash__).
    This cause test sharding (like --shard_tests=1/2) produce inconsistent
    set of tests per shard. Always restart minicluster during custom cluster
    tests if --shard_tests argument is set, because test order may change
    and affect test correctness, depending on whether running on fresh
    minicluster or not.
    
    Moved one test case from delimited-latin-text.test to
    test_delimited_text.py for easier binary comparison.
    
    Add bytes_to_str() as a utility function to decode bytes in Python3.
    This is often needed when inspecting the return value of
    subprocess.check_output() as a string.
    
    Implement DataTypeMetaclass.__lt__ to substitute
    DataTypeMetaclass.__cmp__ that is ignored in Python3 (see
    https://peps.python.org/pep-0207/).
    
    Fix WEB_CERT_ERR difference in test_ipv6.py.
    
    Fix trivial integer parsing in test_restart_services.py.
    
    Fix various encoding issues in test_saml2_sso.py,
    test_shell_commandline.py, and test_shell_interactive.py.
    
    Change timeout in Impala.for_each_impalad() from sys.maxsize to 2^31-1.
    
    Switch to binary comparison in test_iceberg.py where needed.
    
    Specify text mode when calling tempfile.NamedTemporaryFile().
    
    Simplify create_impala_shell_executable_dimension to skip testing dev
    and python2 impala-shell when IMPALA_USE_PYTHON3_TESTS=true. The reason
    is that several UTF-8 related tests in test_shell_commandline.py break
    in Python3 pytest + Python2 impala-shell combo. This skipping already
    happen automatically in build OS without system Python2 available like
    RHEL9 (IMPALA_SYSTEM_PYTHON2 env var is empty).
    
    Removed unused vector argument and fixed some trivial flake8 issues.
    
    Several test logic require modification due to intermittent issue in
    Python3 pytest. These include:
    
    Add _run_query_with_client() in test_ranger.py to allow reusing a single
    Impala client for running several queries. Ensure clients are closed
    when the test is done. Mark several tests in test_ranger.py with
    
    SkipIfFS.hive because they run queries through beeline + HiveServer2,
    but Ozone and S3 build environment does not start HiveServer2 by
    default.
    
    Increase the sleep period from 0.1 to 0.5 seconds per iteration in
    test_statestore.py and mark TestStatestore to execute serially. This is
    because TServer appears to shut down more slowly when run concurrently
    with other tests. Handle the deprecation of Thread.setDaemon() as well.
    
    Always force_restart=True each test method in TestLoggingCore,
    TestShellInteractiveReconnect, and TestQueryRetries to prevent them from
    reusing minicluster from previous test method. Some of these tests
    destruct minicluster (kill impalad) and will produce minidump if metrics
    verifier for next tests fail to detect healthy minicluster state.
    
    Testing:
    Pass exhaustive tests with IMPALA_USE_PYTHON3_TESTS=true.
    
    Change-Id: I401a93b6cc7bcd17f41d24e7a310e0c882a550d4
    Reviewed-on: http://gerrit.cloudera.org:8080/23319
    Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 bin/impala-config.sh                               |   2 +-
 shell/impala_shell/impala_shell.py                 |   3 +-
 .../queries/QueryTest/delimited-latin-text.test    |  15 ---
 tests/authorization/test_ranger.py                 | 132 +++++++++++++--------
 tests/common/custom_cluster_test_suite.py          |  10 +-
 tests/common/impala_test_suite.py                  |   3 +-
 tests/common/skip.py                               |   9 +-
 tests/comparison/cluster.py                        |  16 +--
 tests/comparison/db_types.py                       |  28 ++++-
 tests/conftest.py                                  |   4 +-
 tests/custom_cluster/test_ipv6.py                  |   4 +-
 tests/custom_cluster/test_logging.py               |   9 +-
 tests/custom_cluster/test_query_retries.py         |  50 +++++---
 tests/custom_cluster/test_restart_services.py      |   2 +-
 tests/custom_cluster/test_s3a_access.py            |   2 +-
 tests/custom_cluster/test_saml2_sso.py             |  38 ++++--
 tests/custom_cluster/test_scratch_disk.py          |  14 ++-
 .../test_shell_interactive_reconnect.py            |   3 +-
 tests/query_test/test_delimited_text.py            |  18 ++-
 tests/query_test/test_iceberg.py                   |  31 ++---
 tests/query_test/test_queries.py                   |  13 +-
 tests/shell/test_shell_commandline.py              |  80 +++++++------
 tests/shell/test_shell_interactive.py              |  76 ++++++------
 tests/shell/util.py                                |  46 +++----
 tests/statestore/test_statestore.py                |  33 ++++--
 tests/stress/test_update_stress.py                 |   4 +-
 tests/util/hdfs_util.py                            |   5 +-
 tests/util/parse_util.py                           |   9 ++
 tests/util/shell_util.py                           |   8 +-
 29 files changed, 403 insertions(+), 264 deletions(-)

diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index 8a4ec56e6..2bce6db8d 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -347,7 +347,7 @@ export IMPALA_KERBERIZE=false
 unset IMPALA_TOOLCHAIN_KUDU_MAVEN_REPOSITORY
 unset IMPALA_TOOLCHAIN_KUDU_MAVEN_REPOSITORY_ENABLED
 
-export IMPALA_USE_PYTHON3_TESTS=${IMPALA_USE_PYTHON3_TESTS:-false}
+export IMPALA_USE_PYTHON3_TESTS=${IMPALA_USE_PYTHON3_TESTS:-true}
 
 # Source the branch and local config override files here to override any
 # variables above or any variables below that allow overriding via environment
diff --git a/shell/impala_shell/impala_shell.py 
b/shell/impala_shell/impala_shell.py
index 2445e8629..14b329e3d 100755
--- a/shell/impala_shell/impala_shell.py
+++ b/shell/impala_shell/impala_shell.py
@@ -1553,7 +1553,7 @@ class ImpalaShell(cmd.Cmd, object):
       # undecodable elements.
       if self.last_query_handle is not None:
         self.imp_client.close_query(self.last_query_handle)
-      log_exception_with_timestamp(e, "UnicodeDecodeError", "Please check for"
+      log_exception_with_timestamp(e, "UnicodeDecodeError", "Please check for "
          "columns containing binary data to find the possible source of the 
error")
     except QueryStateException as e:
       # an exception occurred while executing the query
@@ -1965,6 +1965,7 @@ class ImpalaShell(cmd.Cmd, object):
         print("Error: OAuth access token not found in json payload")
         sys.exit(1)
 
+
 TIPS = [
   "Press TAB twice to see a list of available commands.",
   "After running a query, type SUMMARY to see a summary of where time was 
spent.",
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/delimited-latin-text.test
 
b/testdata/workloads/functional-query/queries/QueryTest/delimited-latin-text.test
index 004feff57..9df2b2cd5 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/delimited-latin-text.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/delimited-latin-text.test
@@ -1,20 +1,5 @@
 ====
 ---- QUERY
-# test querying text table "extended" ASCII (latin) delimiters:
-# fields terminated by '-2' -- thorn character
-# escaped by '-22' -- lowercase e with circumflex
-# lines terminated by '\n'
-select * from functional.text_thorn_ecirc_newline
----- RESULTS
-'one','two',3,4
-'one\xfeone','two',3,4
-'one\xea','two',3,4
-'one\xea\xfeone','two',3,4
-'one\xea\xea','two',3,4
----- TYPES
-STRING,STRING,INT,INT
-====
----- QUERY
 # create new tables like the ones above to test inserting
 create table tecn like functional.text_thorn_ecirc_newline;
 ---- RESULTS
diff --git a/tests/authorization/test_ranger.py 
b/tests/authorization/test_ranger.py
index aef153fd1..59896f5f2 100644
--- a/tests/authorization/test_ranger.py
+++ b/tests/authorization/test_ranger.py
@@ -45,6 +45,7 @@ from tests.util.calculation_util import get_random_id
 from tests.util.filesystem_utils import WAREHOUSE, WAREHOUSE_PREFIX
 from tests.util.hdfs_util import NAMENODE
 from tests.util.iceberg_util import get_snapshots
+from tests.util.parse_util import bytes_to_str
 
 ADMIN = "admin"
 OWNER_USER = getuser()
@@ -791,7 +792,7 @@ class TestRanger(CustomClusterTestSuite):
         
"{0}/service/public/v2/api/policy?servicename=test_impala&policyname={1}".format(
             RANGER_HOST, policy_name),
         auth=RANGER_AUTH, headers=REST_HEADERS)
-    assert 300 > r.status_code >= 200, r.content
+    assert 300 > r.status_code >= 200, bytes_to_str(r.content)
 
   @staticmethod
   def _check_privileges(result, expected):
@@ -807,7 +808,14 @@ class TestRanger(CustomClusterTestSuite):
 
   def _run_query_as_user(self, query, username, expect_success):
     """Helper to run an input query as a given user."""
-    impala_client = self.create_impala_client(user=username)
+    with self.create_impala_client(user=username) as impala_client:
+      if expect_success:
+        return self.execute_query_expect_success(
+            impala_client, query, query_options={'sync_ddl': 1})
+      return self.execute_query_expect_failure(impala_client, query)
+
+  def _run_query_with_client(self, query, impala_client, expect_success):
+    """Helper to run an input query using a given impala_client."""
     if expect_success:
       return self.execute_query_expect_success(
           impala_client, query, query_options={'sync_ddl': 1})
@@ -882,6 +890,7 @@ class TestRanger(CustomClusterTestSuite):
     grantee_role = "grantee_role"
     resource_owner_role = OWNER_USER
     admin_client = self.create_impala_client(user=ADMIN)
+    user_client = self.create_impala_client(user=OWNER_USER)
     unique_database = unique_name + "_db"
     table_name = "tbl"
     column_names = ["a", "b"]
@@ -899,15 +908,15 @@ class TestRanger(CustomClusterTestSuite):
       # able to create a UDF.
       admin_client.execute("grant all on uri '{0}{1}' to user {2}"
           .format(os.getenv("FILESYSTEM_PREFIX"), udf_uri, OWNER_USER))
-      self._run_query_as_user("create database {0}".format(unique_database), 
OWNER_USER,
-          True)
-      self._run_query_as_user("create table {0}.{1} ({2} int, {3} string)"
+      self._run_query_with_client("create database 
{0}".format(unique_database),
+                                  user_client, True)
+      self._run_query_with_client("create table {0}.{1} ({2} int, {3} string)"
           .format(unique_database, table_name, column_names[0], 
column_names[1]),
-          OWNER_USER, True)
-      self._run_query_as_user("create function {0}.{1} "
+          user_client, True)
+      self._run_query_with_client("create function {0}.{1} "
           "location '{2}{3}' symbol='org.apache.impala.TestUdf'"
           .format(unique_database, udf_name, os.getenv("FILESYSTEM_PREFIX"), 
udf_uri),
-          OWNER_USER, True)
+          user_client, True)
 
       for data in test_data:
         grantee_type = data[0]
@@ -935,6 +944,8 @@ class TestRanger(CustomClusterTestSuite):
       admin_client.execute("revoke create on server from user 
{0}".format(OWNER_USER))
       admin_client.execute("revoke all on uri '{0}{1}' from user {2}"
           .format(os.getenv("FILESYSTEM_PREFIX"), udf_uri, OWNER_USER))
+      admin_client.close()
+      user_client.close()
 
   def _test_grant_revoke_by_owner_on_database(self, privilege, unique_database,
       grantee_type, grantee, resource_owner_role):
@@ -946,10 +957,11 @@ class TestRanger(CustomClusterTestSuite):
     set_database_owner_role_stmt = "alter database {0} set owner role {1}"
     resource_owner_group = OWNER_USER
     admin_client = self.create_impala_client(user=ADMIN)
+    user_client = self.create_impala_client(user=OWNER_USER)
 
     try:
-      self._run_query_as_user(grant_database_stmt
-          .format(privilege, unique_database, grantee_type, grantee), 
OWNER_USER,
+      self._run_query_with_client(grant_database_stmt
+          .format(privilege, unique_database, grantee_type, grantee), 
user_client,
           True)
       result = admin_client.execute(show_grant_database_stmt
           .format(grantee_type, grantee, unique_database))
@@ -959,8 +971,8 @@ class TestRanger(CustomClusterTestSuite):
           [grantee_type, grantee, unique_database, "*", "*", "", "", "", "",
            privilege, "false"]])
 
-      self._run_query_as_user(revoke_database_stmt
-          .format(privilege, unique_database, grantee_type, grantee), 
OWNER_USER,
+      self._run_query_with_client(revoke_database_stmt
+          .format(privilege, unique_database, grantee_type, grantee), 
user_client,
           True)
       result = admin_client.execute(show_grant_database_stmt
           .format(grantee_type, grantee, unique_database))
@@ -977,12 +989,12 @@ class TestRanger(CustomClusterTestSuite):
           .format(unique_database, resource_owner_group))
       admin_client.execute("invalidate metadata")
 
-      result = self._run_query_as_user(grant_database_stmt
-          .format(privilege, unique_database, grantee_type, grantee), 
OWNER_USER,
+      result = self._run_query_with_client(grant_database_stmt
+          .format(privilege, unique_database, grantee_type, grantee), 
user_client,
           False)
       assert ERROR_GRANT in str(result)
-      result = self._run_query_as_user(revoke_database_stmt
-          .format(privilege, unique_database, grantee_type, grantee), 
OWNER_USER,
+      result = self._run_query_with_client(revoke_database_stmt
+          .format(privilege, unique_database, grantee_type, grantee), 
user_client,
           False)
       assert ERROR_REVOKE in str(result)
 
@@ -992,12 +1004,12 @@ class TestRanger(CustomClusterTestSuite):
       admin_client.execute(set_database_owner_role_stmt
           .format(unique_database, resource_owner_role))
 
-      result = self._run_query_as_user(grant_database_stmt
-          .format(privilege, unique_database, grantee_type, grantee), 
OWNER_USER,
+      result = self._run_query_with_client(grant_database_stmt
+          .format(privilege, unique_database, grantee_type, grantee), 
user_client,
           False)
       assert ERROR_GRANT in str(result)
-      result = self._run_query_as_user(revoke_database_stmt
-          .format(privilege, unique_database, grantee_type, grantee), 
OWNER_USER,
+      result = self._run_query_with_client(revoke_database_stmt
+          .format(privilege, unique_database, grantee_type, grantee), 
user_client,
           False)
       assert ERROR_REVOKE in str(result)
       # Change the database owner back to the user 'OWNER_USER'.
@@ -1009,6 +1021,8 @@ class TestRanger(CustomClusterTestSuite):
       # from interfering with other tests.
       admin_client.execute(revoke_database_stmt
           .format(privilege, unique_database, grantee_type, grantee))
+      admin_client.close()
+      user_client.close()
 
   def _test_grant_revoke_by_owner_on_table(self, privilege, unique_database, 
table_name,
       grantee_type, grantee, resource_owner_role):
@@ -1020,23 +1034,24 @@ class TestRanger(CustomClusterTestSuite):
     show_grant_table_stmt = "show grant {0} {1} on table {2}.{3}"
     resource_owner_group = OWNER_USER
     admin_client = self.create_impala_client(user=ADMIN)
+    user_client = self.create_impala_client(user=OWNER_USER)
     set_table_owner_user_stmt = "alter table {0}.{1} set owner user {2}"
     set_table_owner_group_stmt = "alter table {0}.{1} set owner group {2}"
     set_table_owner_role_stmt = "alter table {0}.{1} set owner role {2}"
 
     try:
-      self._run_query_as_user(grant_table_stmt
+      self._run_query_with_client(grant_table_stmt
           .format(privilege, unique_database, table_name, grantee_type, 
grantee),
-          OWNER_USER, True)
+          user_client, True)
       result = admin_client.execute(show_grant_table_stmt
           .format(grantee_type, grantee, unique_database, table_name))
       TestRanger._check_privileges(result, [
           [grantee_type, grantee, unique_database, table_name, "*", "", "", "",
           "", privilege, "false"]])
 
-      self._run_query_as_user(revoke_table_stmt
+      self._run_query_with_client(revoke_table_stmt
           .format(privilege, unique_database, table_name, grantee_type, 
grantee),
-          OWNER_USER, True)
+          user_client, True)
       result = admin_client.execute(show_grant_table_stmt
           .format(grantee_type, grantee, unique_database, table_name))
       TestRanger._check_privileges(result, [])
@@ -1052,13 +1067,13 @@ class TestRanger(CustomClusterTestSuite):
           .format(unique_database, table_name, resource_owner_group))
       admin_client.execute("refresh {0}.{1}".format(unique_database, 
table_name))
 
-      result = self._run_query_as_user(grant_table_stmt
+      result = self._run_query_with_client(grant_table_stmt
           .format(privilege, unique_database, table_name, grantee_type, 
grantee),
-          OWNER_USER, False)
+          user_client, False)
       assert ERROR_GRANT in str(result)
-      result = self._run_query_as_user(revoke_table_stmt
+      result = self._run_query_with_client(revoke_table_stmt
           .format(privilege, unique_database, table_name, grantee_type, 
grantee),
-          OWNER_USER, False)
+          user_client, False)
       assert ERROR_REVOKE in str(result)
 
       # Set the owner of the table to a role that has the same name as
@@ -1067,13 +1082,13 @@ class TestRanger(CustomClusterTestSuite):
       admin_client.execute(set_table_owner_role_stmt
           .format(unique_database, table_name, resource_owner_role))
 
-      result = self._run_query_as_user(grant_table_stmt
+      result = self._run_query_with_client(grant_table_stmt
           .format(privilege, unique_database, table_name, grantee_type, 
grantee),
-          OWNER_USER, False)
+          user_client, False)
       assert ERROR_GRANT in str(result)
-      result = self._run_query_as_user(revoke_table_stmt
+      result = self._run_query_with_client(revoke_table_stmt
           .format(privilege, unique_database, table_name, grantee_type, 
grantee),
-          OWNER_USER, False)
+          user_client, False)
       assert ERROR_REVOKE in str(result)
       # Change the table owner back to the user 'OWNER_USER'.
       admin_client.execute(set_table_owner_user_stmt
@@ -1084,6 +1099,8 @@ class TestRanger(CustomClusterTestSuite):
       # from interfering with other tests.
       admin_client.execute(revoke_table_stmt
           .format(privilege, unique_database, table_name, grantee_type, 
grantee))
+      admin_client.close()
+      user_client.close()
 
   def _test_grant_revoke_by_owner_on_column(self, privilege, column_names,
       unique_database, table_name, grantee_type, grantee, resource_owner_role):
@@ -1095,14 +1112,15 @@ class TestRanger(CustomClusterTestSuite):
     show_grant_column_stmt = "show grant {0} {1} on column {2}.{3}.{4}"
     resource_owner_group = OWNER_USER
     admin_client = self.create_impala_client(user=ADMIN)
+    user_client = self.create_impala_client(user=OWNER_USER)
     set_table_owner_user_stmt = "alter table {0}.{1} set owner user {2}"
     set_table_owner_group_stmt = "alter table {0}.{1} set owner group {2}"
     set_table_owner_role_stmt = "alter table {0}.{1} set owner role {2}"
 
     try:
-      self._run_query_as_user(grant_column_stmt
+      self._run_query_with_client(grant_column_stmt
           .format(privilege, column_names[0], unique_database, table_name,
-          grantee_type, grantee), OWNER_USER, True)
+          grantee_type, grantee), user_client, True)
       result = admin_client.execute(show_grant_column_stmt
           .format(grantee_type, grantee, unique_database, table_name,
           column_names[0]))
@@ -1110,9 +1128,9 @@ class TestRanger(CustomClusterTestSuite):
           [grantee_type, grantee, unique_database, table_name, column_names[0],
           "", "", "", "", privilege, "false"]])
 
-      self._run_query_as_user(revoke_column_stmt
+      self._run_query_with_client(revoke_column_stmt
           .format(privilege, column_names[0], unique_database, table_name,
-          grantee_type, grantee), OWNER_USER, True)
+          grantee_type, grantee), user_client, True)
       result = admin_client.execute(show_grant_column_stmt
           .format(grantee_type, grantee, unique_database, table_name,
           column_names[0]))
@@ -1125,13 +1143,13 @@ class TestRanger(CustomClusterTestSuite):
           .format(unique_database, table_name, resource_owner_group))
       admin_client.execute("refresh {0}.{1}".format(unique_database, 
table_name))
 
-      result = self._run_query_as_user(grant_column_stmt
+      result = self._run_query_with_client(grant_column_stmt
           .format(privilege, column_names[0], unique_database, table_name,
-          grantee_type, grantee), OWNER_USER, False)
+          grantee_type, grantee), user_client, False)
       assert ERROR_GRANT in str(result)
-      result = self._run_query_as_user(revoke_column_stmt
+      result = self._run_query_with_client(revoke_column_stmt
           .format(privilege, column_names[0], unique_database, table_name,
-          grantee_type, grantee), OWNER_USER, False)
+          grantee_type, grantee), user_client, False)
       assert ERROR_REVOKE in str(result)
 
       # Set the owner of the table to a role that has the same name as 
'OWNER_USER' and
@@ -1140,13 +1158,13 @@ class TestRanger(CustomClusterTestSuite):
       admin_client.execute(set_table_owner_role_stmt
           .format(unique_database, table_name, resource_owner_role))
 
-      result = self._run_query_as_user(grant_column_stmt
+      result = self._run_query_with_client(grant_column_stmt
           .format(privilege, column_names[0], unique_database, table_name,
-          grantee_type, grantee), OWNER_USER, False)
+          grantee_type, grantee), user_client, False)
       assert ERROR_GRANT in str(result)
-      result = self._run_query_as_user(revoke_column_stmt
+      result = self._run_query_with_client(revoke_column_stmt
           .format(privilege, column_names[0], unique_database, table_name,
-          grantee_type, grantee), OWNER_USER, False)
+          grantee_type, grantee), user_client, False)
       assert ERROR_REVOKE in str(result)
       # Change the table owner back to the user 'owner_user'.
       admin_client.execute(set_table_owner_user_stmt
@@ -1158,19 +1176,22 @@ class TestRanger(CustomClusterTestSuite):
       admin_client.execute(revoke_column_stmt
           .format(privilege, column_names[0], unique_database, table_name,
           grantee_type, grantee))
+      admin_client.close()
+      user_client.close()
 
   def _test_grant_revoke_by_owner_on_udf(self, privilege, unique_database, 
udf_name,
       grantee_type, grantee):
     # Due to IMPALA-11743 and IMPALA-12685, the owner of a UDF could not grant
     # or revoke the SELECT privilege.
-    result = self._run_query_as_user("grant {0} on user_defined_fn "
-        "{1}.{2} to {3} {4}".format(privilege, unique_database, udf_name,
-        grantee_type, grantee), OWNER_USER, False)
-    assert ERROR_GRANT in str(result)
-    result = self._run_query_as_user("revoke {0} on user_defined_fn "
-        "{1}.{2} from {3} {4}".format(privilege, unique_database, udf_name,
-        grantee_type, grantee), OWNER_USER, False)
-    assert ERROR_REVOKE in str(result)
+    with self.create_impala_client(user=OWNER_USER) as user_client:
+      result = self._run_query_with_client("grant {0} on user_defined_fn "
+          "{1}.{2} to {3} {4}".format(privilege, unique_database, udf_name,
+          grantee_type, grantee), user_client, False)
+      assert ERROR_GRANT in str(result)
+      result = self._run_query_with_client("revoke {0} on user_defined_fn "
+          "{1}.{2} from {3} {4}".format(privilege, unique_database, udf_name,
+          grantee_type, grantee), user_client, False)
+      assert ERROR_REVOKE in str(result)
 
   def _test_allow_catalog_cache_op_from_masked_users(self, unique_name):
     """Verify that catalog cache operations are allowed for masked users
@@ -1820,6 +1841,7 @@ class TestRangerIndependent(TestRanger):
   def test_grant_multiple_columns_consolidate_grant_revoke_requests(self):
     self._test_grant_multiple_columns(1)
 
+  @SkipIfFS.hive
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
     impalad_args=LEGACY_CATALOG_IMPALAD_ARGS,
@@ -2199,10 +2221,12 @@ class TestRangerLegacyCatalog(TestRanger):
   def test_legacy_catalog_ownership(self):
       self._test_ownership()
 
+  @SkipIfFS.hive
   @pytest.mark.execute_serially
   def test_grant_revoke_by_owner_legacy_catalog(self, unique_name):
     self._test_grant_revoke_by_owner(unique_name)
 
+  @SkipIfFS.hive
   @pytest.mark.execute_serially
   def test_select_view_created_by_non_superuser_with_catalog_v1(self, 
unique_name):
     self._test_select_view_created_by_non_superuser(unique_name)
@@ -2231,10 +2255,12 @@ class TestRangerLocalCatalog(TestRanger):
       pytest.xfail("getTableIfCached() faulty behavior, known issue")
       self._test_ownership()
 
+  @SkipIfFS.hive
   @pytest.mark.execute_serially
   def test_grant_revoke_by_owner_local_catalog(self, unique_name):
     self._test_grant_revoke_by_owner(unique_name)
 
+  @SkipIfFS.hive
   @pytest.mark.execute_serially
   def test_select_view_created_by_non_superuser_with_local_catalog(self, 
unique_name):
     self._test_select_view_created_by_non_superuser(unique_name)
@@ -2453,6 +2479,8 @@ class TestRangerLocalCatalog(TestRanger):
       else:
         assert "Error revoking a privilege in Ranger. Ranger error message: " \
                "HTTP 403 Error: Grantee group invalid_group doesn't exist" in 
str(result)
+    invalid_impala_client.close()
+    valid_impala_client.close()
 
   @pytest.mark.execute_serially
   def test_show_functions(self, unique_name):
diff --git a/tests/common/custom_cluster_test_suite.py 
b/tests/common/custom_cluster_test_suite.py
index ef57e8da7..04b34cfe0 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -204,7 +204,9 @@ class CustomClusterTestSuite(ImpalaTestSuite):
       args[LOG_SYMLINKS] = True
     if workload_mgmt:
       args[WORKLOAD_MGMT] = True
-    if force_restart:
+    if force_restart or pytest.config.option.shard_tests:
+      # When sharding tests, always restart the cluster to avoid issues with 
tests
+      # that depend on a specific test order within a shard.
       args[FORCE_RESTART] = True
 
     def merge_args(args_first, args_last):
@@ -349,7 +351,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
       kwargs[IMPALAD_TIMEOUT_S] = args[IMPALAD_TIMEOUT_S]
     if FORCE_RESTART in args:
       kwargs[FORCE_RESTART] = args[FORCE_RESTART]
-      if args[FORCE_RESTART] is True:
+      if args[FORCE_RESTART] is True and not pytest.config.option.shard_tests:
         LOG.warning("Test uses force_restart=True to avoid restarting the 
cluster. "
                     "Test reorganization/assertion rewrite is needed")
     else:
@@ -398,6 +400,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
   @classmethod
   def cluster_teardown(cls, name, args):
     if args.get(WORKLOAD_MGMT, False):
+      cls.close_impala_clients()
       cls.cluster.graceful_shutdown_impalads()
 
     cls.clear_tmp_dirs()
@@ -618,6 +621,9 @@ class CustomClusterTestSuite(ImpalaTestSuite):
         except Exception as e:
           LOG.info("Failed to reuse running cluster: %s" % e)
           pass
+        finally:
+          cls.cluster = ImpalaCluster.get_e2e_test_cluster()
+          cls.impalad_test_service = cls.create_impala_service()
 
     LOG.info("Starting cluster with command: %s" % cmd_str)
     try:
diff --git a/tests/common/impala_test_suite.py 
b/tests/common/impala_test_suite.py
index 9e7414716..c2d7744dd 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -35,6 +35,7 @@ import socket
 import subprocess
 import time
 import string
+
 from functools import wraps
 from getpass import getuser
 from impala.hiveserver2 import HiveServer2Cursor
@@ -455,7 +456,7 @@ class ImpalaTestSuite(BaseTestSuite):
   @classmethod
   def close_impala_clients(cls):
     """Closes Impala clients created by create_impala_clients()."""
-    # cls.client should be equal to one of belove, unless test method 
implicitly override.
+    # cls.client should be equal to one of below, unless test method 
implicitly override.
     # Closing twice would lead to error in some clients (impyla+SSL).
     if cls.client not in (cls.beeswax_client, cls.hs2_client, 
cls.hs2_http_client):
       cls.client.close()
diff --git a/tests/common/skip.py b/tests/common/skip.py
index 6d3a8b182..69637dcf4 100644
--- a/tests/common/skip.py
+++ b/tests/common/skip.py
@@ -81,7 +81,8 @@ class SkipIfFS:
   incorrent_reported_ec = pytest.mark.skipif(IS_OZONE and IS_EC, 
reason="HDDS-8543")
 
   # These need test infra work to re-enable.
-  hive = pytest.mark.skipif(not IS_HDFS, reason="Hive doesn't work")
+  hive = pytest.mark.skipif(
+      not IS_HDFS, reason="HiveServer2 doesn't work or not started")
   hbase = pytest.mark.skipif(not IS_HDFS, reason="HBase not started")
   qualified_path = pytest.mark.skipif(not IS_HDFS,
       reason="Tests rely on HDFS qualified paths, IMPALA-1872")
@@ -135,6 +136,7 @@ class SkipIf:
   not_tuple_cache = pytest.mark.skipif(not IS_TUPLE_CACHE,
       reason="Tuple Cache needed")
 
+
 class SkipIfLocal:
   # These are skipped due to product limitations.
   hdfs_blocks = pytest.mark.skipif(IS_LOCAL,
@@ -152,6 +154,7 @@ class SkipIfLocal:
   root_path = pytest.mark.skipif(IS_LOCAL,
       reason="Tests rely on the root directory")
 
+
 class SkipIfNotHdfsMinicluster:
   # These are skipped when not running against a local HDFS mini-cluster.
   plans = pytest.mark.skipif(
@@ -165,6 +168,7 @@ class SkipIfNotHdfsMinicluster:
       reason="Test is tuned for scheduling decisions made on a 3-node HDFS 
minicluster "
              "with no EC")
 
+
 class SkipIfBuildType:
   dev_build = pytest.mark.skipif(IMPALA_TEST_CLUSTER_PROPERTIES.is_dev(),
       reason="Test takes too much time on debug build.")
@@ -173,6 +177,7 @@ class SkipIfBuildType:
   remote = 
pytest.mark.skipif(IMPALA_TEST_CLUSTER_PROPERTIES.is_remote_cluster(),
       reason="Test depends on running against a local Impala cluster")
 
+
 class SkipIfEC:
   contain_full_explain = pytest.mark.skipif(IS_EC, reason="Contain full 
explain output "
               "for hdfs tables.")
@@ -225,6 +230,7 @@ class SkipIfHive2:
   ranger_auth = pytest.mark.skipif(HIVE_MAJOR_VERSION <= 2,
       reason="Hive 2 doesn't support Ranger authorization.")
 
+
 class SkipIfCatalogV2:
   """Expose decorators as methods so that is_catalog_v2_cluster() can be 
evaluated lazily
   when needed, instead of whenever this module is imported."""
@@ -261,6 +267,7 @@ class SkipIfCatalogV2:
       IMPALA_TEST_CLUSTER_PROPERTIES.is_catalog_v2_cluster(),
       reason="Table isn't invalidated with Local catalog and enabled 
hms_event_polling.")
 
+
 class SkipIfApacheHive():
   feature_not_supported = pytest.mark.skipif(IS_APACHE_HIVE,
       reason="Apache Hive does not support this feature")
diff --git a/tests/comparison/cluster.py b/tests/comparison/cluster.py
index 8a7c623cb..15949c167 100644
--- a/tests/comparison/cluster.py
+++ b/tests/comparison/cluster.py
@@ -30,7 +30,6 @@ import requests
 import shutil
 import subprocess
 from abc import ABCMeta, abstractproperty
-from cm_api.api_client import ApiResource as CmApiResource
 from collections import defaultdict
 from collections import OrderedDict
 from contextlib import contextmanager
@@ -38,7 +37,6 @@ from getpass import getuser
 from io import BytesIO
 from multiprocessing.pool import ThreadPool
 from random import choice
-from sys import maxsize
 from tempfile import mkdtemp
 from threading import Lock
 from time import mktime, strptime
@@ -50,6 +48,13 @@ try:
 except ImportError:
   from urlparse import urlparse
 
+try:
+  from cm_api.api_client import ApiResource as CmApiResource
+except ImportError:
+  # If the cm_api module is not available, we will not be able to use Cloudera 
Manager.
+  # This is fine for local testing.
+  pass
+
 from tests.comparison.db_connection import HiveConnection, ImpalaConnection
 from tests.common.environ import HIVE_MAJOR_VERSION
 from tests.common.errors import Timeout
@@ -179,12 +184,9 @@ class Cluster(with_metaclass(ABCMeta, object)):
     """
     Print the cluster impalad version info to the console sorted by hostname.
     """
-    def _sorter(i1, i2):
-      return cmp(i1.host_name, i2.host_name)
-
     version_info = self.impala.get_version_info()
     print("Cluster Impalad Version Info:")
-    for impalad in sorted(version_info.keys(), cmp=_sorter):
+    for impalad in sorted(version_info.keys(), key=lambda x: x.host_name):
       print("{0}: {1}".format(impalad.host_name, version_info[impalad]))
 
 
@@ -635,7 +637,7 @@ class Impala(Service):
       impalads = self.impalads
     promise = self._thread_pool.map_async(func, impalads)
     # Python doesn't handle ctrl-c well unless a timeout is provided.
-    results = promise.get(maxsize)
+    results = promise.get(timeout=(2 ** 31 - 1))
     if as_dict:
       results = dict(zip(impalads, results))
     return results
diff --git a/tests/comparison/db_types.py b/tests/comparison/db_types.py
index 27e2a6025..0039b445c 100644
--- a/tests/comparison/db_types.py
+++ b/tests/comparison/db_types.py
@@ -26,6 +26,7 @@ from tests.comparison.common import ValExpr, ValExprList
 
 module_contents = dict()
 
+
 class DataTypeMetaclass(type):
   '''Provides sorting of classes used to determine upcasting.'''
 
@@ -39,9 +40,23 @@ class DataTypeMetaclass(type):
   def __cmp__(cls, other):
     if not isinstance(other, DataTypeMetaclass):
       return -1
-    return cmp(
-        getattr(cls, 'CMP_VALUE', cls.__name__),
-        getattr(other, 'CMP_VALUE', other.__name__))
+    val_this = getattr(cls, 'CMP_VALUE', cls.__name__)
+    val_other = getattr(other, 'CMP_VALUE', other.__name__)
+    if (val_this < val_other):
+      return -1
+    elif (val_this > val_other):
+      return 1
+    else:
+      return 0
+
+  def __lt__(cls, other):
+    # This __lt__ method replace __cmp__ that is removed in Python3.
+    # See https://peps.python.org/pep-0207/.
+    # It is mainly to serve max() inside update_return_type_and_append() of 
funcs.py.
+    if not isinstance(other, DataTypeMetaclass):
+      return False
+    return (getattr(cls, 'CMP_VALUE', cls.__name__)
+            < getattr(other, 'CMP_VALUE', other.__name__))
 
 
 class DataType(with_metaclass(DataTypeMetaclass, ValExpr)):
@@ -92,7 +107,6 @@ class DataType(with_metaclass(DataTypeMetaclass, ValExpr)):
     return type(self)
 
 
-
 class Boolean(DataType):
   pass
 
@@ -213,6 +227,10 @@ JOINABLE_TYPES = (Char, Decimal, Int, Timestamp)
 TYPES = tuple(set(type_.type for type_ in EXACT_TYPES))
 
 __DECIMAL_TYPE_CACHE = dict()
+__CHAR_TYPE_CACHE = dict()
+__VARCHAR_TYPE_CACHE = dict()
+
+
 def get_decimal_class(total_digits, fractional_digits):
   cache_key = (total_digits, fractional_digits)
   if cache_key not in __DECIMAL_TYPE_CACHE:
@@ -223,7 +241,6 @@ def get_decimal_class(total_digits, fractional_digits):
   return __DECIMAL_TYPE_CACHE[cache_key]
 
 
-__CHAR_TYPE_CACHE = dict()
 def get_char_class(length):
   if length not in __CHAR_TYPE_CACHE:
     __CHAR_TYPE_CACHE[length] = type(
@@ -233,7 +250,6 @@ def get_char_class(length):
   return __CHAR_TYPE_CACHE[length]
 
 
-__VARCHAR_TYPE_CACHE = dict()
 def get_varchar_class(length):
   if length not in __VARCHAR_TYPE_CACHE:
     __VARCHAR_TYPE_CACHE[length] = type(
diff --git a/tests/conftest.py b/tests/conftest.py
index 0dc702337..4d395b658 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -696,7 +696,7 @@ def validate_python_version():
 def pytest_collection_modifyitems(items, config, session):
   """Hook to handle --shard_tests command line option.
 
-  If set, this "deselects" a subset of tests, by hashing
+  If set, this "deselects" a subset of tests, by hashing (using crc32())
   their id into buckets.
   """
   if not config.option.shard_tests:
@@ -710,7 +710,7 @@ def pytest_collection_modifyitems(items, config, session):
 
   items_selected, items_deselected = [], []
   for i in items:
-    if hash(i.nodeid) % num_shards == this_shard:
+    if crc32(i.nodeid.encode('utf-8')) % num_shards == this_shard:
       items_selected.append(i)
     else:
       items_deselected.append(i)
diff --git a/tests/custom_cluster/test_ipv6.py 
b/tests/custom_cluster/test_ipv6.py
index f4a0a9de5..415695573 100644
--- a/tests/custom_cluster/test_ipv6.py
+++ b/tests/custom_cluster/test_ipv6.py
@@ -22,6 +22,7 @@ import logging
 import os
 import pytest
 import requests
+import sys
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.network import SKIP_SSL_MSG
@@ -66,7 +67,8 @@ WEBUI_PORTS = [25000, 25010, 25020]
 # Error text can depend on both protocol and python version.
 CONN_ERR = ["Could not connect", "Connection refused"]
 CERT_ERR = ["doesn't match", "certificate verify failed"]
-WEB_CERT_ERR = "CertificateError"
+WEB_CERT_ERR = ("CertificateError" if sys.version_info.major < 3
+                else "SSLCertVerificationError")
 
 
 class TestIPv6Base(CustomClusterTestSuite):
diff --git a/tests/custom_cluster/test_logging.py 
b/tests/custom_cluster/test_logging.py
index ddae8b165..bb10651b9 100644
--- a/tests/custom_cluster/test_logging.py
+++ b/tests/custom_cluster/test_logging.py
@@ -46,21 +46,24 @@ class TestLoggingCore(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(cluster_size=1,
       impalad_args="--max_error_logs_per_instance=2",
-      disable_log_buffering=True)
+      disable_log_buffering=True,
+      force_restart=True)
   def test_max_errors(self):
     self._test_max_errors(2, 4, True)
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(cluster_size=1,
       impalad_args="--max_error_logs_per_instance=3",
-      disable_log_buffering=True)
+      disable_log_buffering=True,
+      force_restart=True)
   def test_max_errors_0(self):
     self._test_max_errors(3, 0, True)
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(cluster_size=1,
       impalad_args="--max_error_logs_per_instance=2",
-      disable_log_buffering=True)
+      disable_log_buffering=True,
+      force_restart=True)
   def test_max_errors_no_downgrade(self):
     self._test_max_errors(2, -1, False)
 
diff --git a/tests/custom_cluster/test_query_retries.py 
b/tests/custom_cluster/test_query_retries.py
index a1789ed52..9880affd1 100644
--- a/tests/custom_cluster/test_query_retries.py
+++ b/tests/custom_cluster/test_query_retries.py
@@ -93,6 +93,7 @@ class TestQueryRetries(CustomClusterTestSuite):
   _count_query_result = "55"
 
   @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(force_restart=True)
   def test_retries_from_cancellation_pool(self):
     """Tests that queries are retried instead of cancelled if one of the nodes 
leaves the
     cluster. The retries are triggered by the cancellation pool in the 
ImpalaServer. The
@@ -138,7 +139,8 @@ class TestQueryRetries(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      statestored_args="-statestore_heartbeat_frequency_ms=1000")
+      statestored_args="-statestore_heartbeat_frequency_ms=1000",
+      force_restart=True)
   def test_kill_impalad_expect_retry(self):
     """Launch a query, wait for it to start running, kill a random impalad and 
then
     validate that the query has successfully been retried. Increase the 
statestore
@@ -251,7 +253,8 @@ class TestQueryRetries(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      statestored_args="-statestore_heartbeat_frequency_ms=60000")
+      statestored_args="-statestore_heartbeat_frequency_ms=60000",
+      force_restart=True)
   def test_retry_exec_rpc_failure(self):
     """Test ExecFInstance RPC failures. Set a really high statestort heartbeat 
frequency
     so that killed impalads are not removed from the cluster membership. This 
will cause
@@ -306,8 +309,9 @@ class TestQueryRetries(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args="--debug_actions=" + _get_rpc_fail_action(FAILED_KRPC_PORT),
-      statestored_args="--statestore_heartbeat_frequency_ms=1000 \
-          --statestore_max_missed_heartbeats=2")
+      statestored_args=("--statestore_heartbeat_frequency_ms=1000 "
+                        "--statestore_max_missed_heartbeats=2"),
+      force_restart=True)
   def test_retry_exec_rpc_failure_before_admin_delay(self):
     """Test retried query triggered by RPC failures by simulating RPC errors 
at the port
     of the 2nd node in the cluster. Simulate admission delay for query with 
debug_action
@@ -375,7 +379,7 @@ class TestQueryRetries(CustomClusterTestSuite):
       impalad_args="--debug_actions=" + _get_rpc_fail_action(FAILED_KRPC_PORT),
       statestored_args="--statestore_heartbeat_frequency_ms=1000 \
           --statestore_max_missed_heartbeats=2",
-      cluster_size=2, num_exclusive_coordinators=1)
+      cluster_size=2, num_exclusive_coordinators=1, force_restart=True)
   def test_retry_query_failure_all_executors_blacklisted(self):
     """Test retried query triggered by RPC failures by simulating RPC errors 
at the port
     of the 2nd node, which is the only executor in the cluster. Simulate 
admission delay
@@ -438,7 +442,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      statestored_args="-statestore_heartbeat_frequency_ms=1000")
+      statestored_args="-statestore_heartbeat_frequency_ms=1000", 
force_restart=True)
   def test_multiple_retries(self):
     """Test that a query can only be retried once, and that if the retry 
attempt fails,
     it fails correctly and with the right error message. Multiple retry 
attempts are
@@ -498,6 +502,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     self.__validate_memz()
 
   @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(force_restart=True)
   def test_retry_fetched_rows(self):
     """Test that query retries are not triggered if some rows have already been
     fetched. Run a query, fetch some rows from it, kill one of the impalads 
that is
@@ -612,7 +617,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     self.client.close_query(handle)
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(disable_log_buffering=True)
+  @CustomClusterTestSuite.with_args(disable_log_buffering=True, 
force_restart=True)
   def test_query_retry_reaches_spool_limit(self):
     """Test retryable queries with results spooling enabled and
     spool_all_results_for_retries=true that reach spooling mem limit will 
return rows and
@@ -661,6 +666,7 @@ class TestQueryRetries(CustomClusterTestSuite):
              "fetched some rows" % self.client.handle_id(handle) in str(e)
 
   @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(force_restart=True)
   def test_original_query_cancel(self):
     """Test canceling a retryable query with 
spool_all_results_for_retries=true. Make sure
     Coordinator::Wait() won't block in cancellation."""
@@ -682,6 +688,7 @@ class TestQueryRetries(CustomClusterTestSuite):
         assert "Cancelled" in str(e)
 
   @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(force_restart=True)
   def test_retry_finished_query(self):
     """Test that queries in FINISHED state can still be retried before the 
client fetch
     any rows. Sets batch_size to 1 so results will be available as soon as 
possible.
@@ -708,7 +715,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      statestored_args="-statestore_heartbeat_frequency_ms=60000")
+      statestored_args="-statestore_heartbeat_frequency_ms=60000", 
force_restart=True)
   def test_retry_query_cancel(self):
     """Trigger a query retry, and then cancel the retried query. Validate that 
the
     cancelled query fails with the correct error message. Set a really high 
statestore
@@ -750,7 +757,8 @@ class TestQueryRetries(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       
impalad_args="--debug_actions=RETRY_DELAY_CHECKING_ORIGINAL_DRIVER:SLEEP@1000",
-      statestored_args="--statestore_heartbeat_frequency_ms=60000")
+      statestored_args="--statestore_heartbeat_frequency_ms=60000",
+      force_restart=True)
   def test_retrying_query_cancel(self):
     """Trigger a query retry, and then cancel and close the retried query in 
RETRYING
     state. Validate that it doesn't crash the impalad. Set a really high 
statestore
@@ -780,7 +788,8 @@ class TestQueryRetries(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      statestored_args="--statestore_heartbeat_frequency_ms=60000")
+      statestored_args="--statestore_heartbeat_frequency_ms=60000",
+      force_restart=True)
   def test_retrying_query_before_inflight(self):
     """Trigger a query retry, and delay setting the original query inflight as 
that may
     happen after the query is retried. Validate that the query succeeds. Set a 
really
@@ -820,7 +829,8 @@ class TestQueryRetries(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args="--debug_actions=RETRY_DELAY_GET_QUERY_DRIVER:SLEEP@2000",
-      statestored_args="--statestore_heartbeat_frequency_ms=60000")
+      statestored_args="--statestore_heartbeat_frequency_ms=60000",
+      force_restart=True)
   def test_retry_query_close_before_getting_query_driver(self):
     """Trigger a query retry, and then close the retried query before getting
     the query driver. Validate that it doesn't crash the impalad.
@@ -847,7 +857,8 @@ class TestQueryRetries(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args="--debug_actions=QUERY_RETRY_SET_RESULT_CACHE:FAIL",
-      statestored_args="--statestore_heartbeat_frequency_ms=60000")
+      statestored_args="--statestore_heartbeat_frequency_ms=60000",
+      force_restart=True)
   def test_retry_query_result_cacheing_failed(self):
     """Test setting up results cacheing failed."""
 
@@ -874,7 +885,8 @@ class TestQueryRetries(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args="--debug_actions=QUERY_RETRY_SET_QUERY_IN_FLIGHT:FAIL",
-      statestored_args="--statestore_heartbeat_frequency_ms=60000")
+      statestored_args="--statestore_heartbeat_frequency_ms=60000",
+      force_restart=True)
   def test_retry_query_set_query_in_flight_failed(self):
     """Test setting query in flight failed."""
 
@@ -898,7 +910,8 @@ class TestQueryRetries(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      statestored_args="-statestore_heartbeat_frequency_ms=60000")
+      statestored_args="-statestore_heartbeat_frequency_ms=60000",
+      force_restart=True)
   def test_retry_query_timeout(self):
     """Trigger a query retry, and then leave the query handle open until the
     'query_timeout_s' causes the handle to be closed. Assert that the runtime 
profile of
@@ -941,8 +954,10 @@ class TestQueryRetries(CustomClusterTestSuite):
     assert 
impalad_service.get_metric_value('impala-server.num-queries-expired') == 1
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(impalad_args="--idle_session_timeout=1",
-        statestored_args="--statestore_heartbeat_frequency_ms=60000")
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--idle_session_timeout=1",
+      statestored_args="--statestore_heartbeat_frequency_ms=60000",
+      force_restart=True)
   def test_retry_query_session_timeout(self):
     """Similar to 'test_retry_query_timeout' except with an idle session 
timeout."""
     self.close_impala_clients()
@@ -977,7 +992,8 @@ class TestQueryRetries(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      statestored_args="-statestore_heartbeat_frequency_ms=60000")
+      statestored_args="-statestore_heartbeat_frequency_ms=60000",
+      force_restart=True)
   def test_retry_query_hs2(self):
     """Test query retries with the HS2 protocol. Enable the results set cache 
as well and
     test that query retries work with the results cache."""
diff --git a/tests/custom_cluster/test_restart_services.py 
b/tests/custom_cluster/test_restart_services.py
index 08995cd94..3548bb200 100644
--- a/tests/custom_cluster/test_restart_services.py
+++ b/tests/custom_cluster/test_restart_services.py
@@ -1010,7 +1010,7 @@ class TestGracefulShutdown(CustomClusterTestSuite, 
HS2TestSuite):
     assert cancel == "{0}s000ms".format(get_remain_shutdown_query_cancel(
         self.COORD_SHUTDOWN_FAST_DEADLINE_S, 
self.COORD_SHUTDOWN_FAST_DEADLINE_S))
     assert registered == "0"
-    assert running > 0
+    assert int(running) > 0
     self.cluster.impalads[1].wait_for_exit()
     # The slow query should be cancelled.
     self.__check_deadline_expired(SLOW_QUERY, slow_query_handle, True)
diff --git a/tests/custom_cluster/test_s3a_access.py 
b/tests/custom_cluster/test_s3a_access.py
index 8364ef856..845e617ad 100644
--- a/tests/custom_cluster/test_s3a_access.py
+++ b/tests/custom_cluster/test_s3a_access.py
@@ -25,7 +25,7 @@ from tests.common.custom_cluster_test_suite import 
CustomClusterTestSuite
 from tests.common.skip import SkipIf
 from tests.util.filesystem_utils import WAREHOUSE
 
-tmp = tempfile.NamedTemporaryFile(delete=False)
+tmp = tempfile.NamedTemporaryFile(mode='w+t', delete=False)
 BAD_KEY_FILE = tmp.name
 
 
diff --git a/tests/custom_cluster/test_saml2_sso.py 
b/tests/custom_cluster/test_saml2_sso.py
index 3df48b183..8b0dc7fad 100644
--- a/tests/custom_cluster/test_saml2_sso.py
+++ b/tests/custom_cluster/test_saml2_sso.py
@@ -21,6 +21,7 @@ import base64
 import datetime
 import os
 import pytest
+import sys
 import uuid
 import xml.etree.ElementTree as ET
 import zlib
@@ -40,7 +41,7 @@ from tests.shell.util import run_impala_shell_cmd
 
 class NoRedirection(HTTPErrorProcessor):
   """Allows inspecting http redirection responses. """
-  def http_response(self, request, response):
+  def http_response(self, request, response):  # noqa: U100
     return response
 
 
@@ -49,6 +50,13 @@ def format_time(time):
   return time.strftime("%Y-%m-%dT%H:%M:%SZ")
 
 
+def encode_if_needed(value):
+  """ Encodes the value to bytes if needed, depending on the python version. 
"""
+  if sys.version_info.major < 3:
+    return value.encode('utf-8') if isinstance(value, str) else value
+  return value if isinstance(value, bytes) else value.encode('utf-8')
+
+
 class TestClientSaml(CustomClusterTestSuite):
   """ Tests for a client using SAML2 browser profile.
 
@@ -100,9 +108,9 @@ class TestClientSaml(CustomClusterTestSuite):
               "--saml2_ee_test_mode=true"
               % (CERT_DIR, CERT_DIR, SP_CALLBACK_URL))
 
-  SSO_ARGS_WITH_GROUP_FILTER = (SSO_ARGS + " " +
-                                "--saml2_group_filter=group1,group2 "
-                                
"--saml2_group_attribute_name=eduPersonAffiliation")
+  SSO_ARGS_WITH_GROUP_FILTER = (
+      "{} --saml2_group_filter=group1,group2 "
+      "--saml2_group_attribute_name=eduPersonAffiliation").format(SSO_ARGS)
 
   @CustomClusterTestSuite.with_args(impalad_args=SSO_ARGS, cluster_size=1)
   def test_saml2_browser_profile_no_group_filter(self, vector):
@@ -131,7 +139,7 @@ class TestClientSaml(CustomClusterTestSuite):
 
   @CustomClusterTestSuite.with_args(
       impalad_args=SSO_ARGS_WITH_GROUP_FILTER, cluster_size=1)
-  def test_saml2_browser_profile_with_group_filter(self, vector):
+  def test_saml2_browser_profile_with_group_filter(self):
       # test the SAML worflow with different attributes
       self._test_saml2_browser_workflow("", False)
 
@@ -157,7 +165,8 @@ class TestClientSaml(CustomClusterTestSuite):
     """ Initial POST request to hs2-http port, response should be redirected
         to IDP and contain the authnrequest. """
     opener = build_opener(NoRedirection)
-    req = Request("http://localhost:%s"; % TestClientSaml.HOST_PORT, " ")
+    payload = encode_if_needed(" ")
+    req = Request("http://localhost:%s"; % TestClientSaml.HOST_PORT, payload)
     req.add_header('X-Hive-Token-Response-Port', TestClientSaml.CLIENT_PORT)
     response = opener.open(req)
     relay_state, client_id, saml_req_xml = \
@@ -171,7 +180,10 @@ class TestClientSaml(CustomClusterTestSuite):
     assert client_id is not None
     new_url = response.info()["location"]
     assert new_url.startswith(TestClientSaml.IDP_URL)
-    query = parse_qs(urlparse(new_url).query.encode('ASCII'))
+    query_part = urlparse(new_url).query
+    query = parse_qs(query_part.encode('ASCII') if sys.version_info.major < 3
+                     else query_part)
+    assert "RelayState" in query, query
     relay_state = query["RelayState"][0]
     assert relay_state is not None
     saml_req = query["SAMLRequest"][0]
@@ -187,7 +199,8 @@ class TestClientSaml(CustomClusterTestSuite):
   def _request_resource_with_bearer(self, client_id, bearer_token):
     """ Send POST request to hs2-http port again, this time with bearer tokan.
         The response should contain a security cookie if the validation 
succeeded """
-    req = Request("http://localhost:%s"; % TestClientSaml.HOST_PORT, " ")
+    payload = encode_if_needed(" ")
+    req = Request("http://localhost:%s"; % TestClientSaml.HOST_PORT, payload)
     req.add_header('X-Hive-Client-Identifier', client_id)
     req.add_header('Authorization', "Bearer " + bearer_token)
     opener = build_opener(NoRedirection)
@@ -205,10 +218,11 @@ class TestClientSaml(CustomClusterTestSuite):
         Impala should answer with a form that submits to CLIENT_PORT and 
contains
         the bearer token as a hidden state. """
     authn_resp = self._generate_authn_response(request_id, attributes_xml)
-    encoded_authn_resp = base64.urlsafe_b64encode(authn_resp)
-    body = "SAMLResponse=%s&RelayState=%s" % (encoded_authn_resp, relay_state)
+    encoded_authn_resp = base64.urlsafe_b64encode(authn_resp.encode('utf-8'))
+    body = (b"SAMLResponse=" + encoded_authn_resp + b"&RelayState="
+            + encode_if_needed(relay_state))
     opener = build_opener(NoRedirection)
-    req = Request(TestClientSaml.SP_CALLBACK_URL, body)
+    req = Request(TestClientSaml.SP_CALLBACK_URL, encode_if_needed(body))
     response = opener.open(req)
     bearer_token = self._parse_xhtml_form(response, expect_success)
     return bearer_token
@@ -244,7 +258,7 @@ class TestClientSaml(CustomClusterTestSuite):
         message = input.attrib["value"]
 
     if expect_success:
-      assert token.startswith("u=user1")
+      assert token.startswith("u=user1"), str(content)
     else:
       assert message == TestClientSaml.ASSERTATION_ERROR_MESSAGE
     return token
diff --git a/tests/custom_cluster/test_scratch_disk.py 
b/tests/custom_cluster/test_scratch_disk.py
index 3a57f5a1e..38d454227 100644
--- a/tests/custom_cluster/test_scratch_disk.py
+++ b/tests/custom_cluster/test_scratch_disk.py
@@ -30,6 +30,7 @@ import time
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.skip import SkipIf
 from tests.util.hdfs_util import NAMENODE
+from tests.util.parse_util import bytes_to_str
 
 
 class TestScratchDir(CustomClusterTestSuite):
@@ -537,16 +538,18 @@ class TestScratchDir(CustomClusterTestSuite):
     hostname = socket.gethostname()
     # Verify that there are leftover files in the remote scratch dirs after 
being killed.
     full_dfs_tmp_path = "{}/impala-scratch".format(self.dfs_tmp_path())
-    files_result = subprocess.check_output(["hdfs", "dfs", "-ls", 
full_dfs_tmp_path])
+    files_result = bytes_to_str(
+        subprocess.check_output(["hdfs", "dfs", "-ls", full_dfs_tmp_path]))
     assert "Found 1 items" in files_result
     assert hostname in files_result
     full_dfs_tmp_path_with_hostname = "{}/{}".format(full_dfs_tmp_path, 
hostname)
-    files_result = subprocess.check_output(["hdfs", "dfs", "-ls",
-        full_dfs_tmp_path_with_hostname])
+    files_result = bytes_to_str(
+        subprocess.check_output(["hdfs", "dfs", "-ls", 
full_dfs_tmp_path_with_hostname]))
     assert "Found 1 items" in files_result
     impalad.start()
     # Verify that the leftover files being removed after the impala daemon 
restarted.
-    files_result = subprocess.check_output(["hdfs", "dfs", "-ls", 
full_dfs_tmp_path])
+    files_result = bytes_to_str(
+        subprocess.check_output(["hdfs", "dfs", "-ls", full_dfs_tmp_path]))
     assert files_result == ""
 
   @pytest.mark.execute_serially
@@ -578,6 +581,7 @@ class TestScratchDir(CustomClusterTestSuite):
     client.close()
     # Verify that no host-level dir in the remote scratch dirs after shutdown.
     full_dfs_tmp_path = "{}/impala-scratch".format(self.dfs_tmp_path())
-    files_result = subprocess.check_output(["hdfs", "dfs", "-ls", 
full_dfs_tmp_path])
+    files_result = bytes_to_str(
+        subprocess.check_output(["hdfs", "dfs", "-ls", full_dfs_tmp_path]))
     assert files_result == ""
     impalad.start()
diff --git a/tests/custom_cluster/test_shell_interactive_reconnect.py 
b/tests/custom_cluster/test_shell_interactive_reconnect.py
index 7bf391b40..e54fe2ca0 100644
--- a/tests/custom_cluster/test_shell_interactive_reconnect.py
+++ b/tests/custom_cluster/test_shell_interactive_reconnect.py
@@ -18,7 +18,6 @@
 from __future__ import absolute_import, division, print_function
 import socket
 
-import pexpect
 import pytest
 
 # Follow tests/shell/test_shell_interactive.py naming.
@@ -32,6 +31,7 @@ from tests.verifiers.metric_verifier import MetricVerifier
 
 NUM_QUERIES = 'impala-server.num-queries'
 
+
 class TestShellInteractiveReconnect(CustomClusterTestSuite):
   """ Check if interactive shell is using the current DB after reconnecting """
   @pytest.mark.execute_serially
@@ -75,6 +75,7 @@ class TestShellInteractiveReconnect(CustomClusterTestSuite):
       assert "alltypesaggmultifilesnopart" in result.stdout, result.stdout
 
   @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(force_restart=True)
   def test_auto_reconnect_after_impalad_died(self):
     """Test reconnect after restarting the remote impalad without using 
connect;"""
     # Use pexpect instead of ImpalaShell() since after using get_result() in 
ImpalaShell()
diff --git a/tests/query_test/test_delimited_text.py 
b/tests/query_test/test_delimited_text.py
index 70e2ae0c6..cc03f382a 100644
--- a/tests/query_test/test_delimited_text.py
+++ b/tests/query_test/test_delimited_text.py
@@ -26,6 +26,7 @@ from tests.common.test_dimensions import (
     create_single_exec_option_dimension,
     create_uncompressed_text_dimension)
 
+
 class TestDelimitedText(ImpalaTestSuite):
   """
   Tests delimited text files with different tuple delimiters, field delimiters
@@ -43,7 +44,7 @@ class TestDelimitedText(ImpalaTestSuite):
   def test_delimited_text(self, vector, unique_database):
     self.run_test_case('QueryTest/delimited-text', vector, unique_database)
 
-  def test_delimited_text_newlines(self, vector, unique_database):
+  def test_delimited_text_newlines(self, unique_database):
     """ Test text with newlines in strings - IMPALA-1943. Execute queries from 
Python to
     avoid issues with newline handling in test file format. """
     self.execute_query_expect_success(self.client, """
@@ -62,7 +63,7 @@ class TestDelimitedText(ImpalaTestSuite):
     result = self.execute_query("select * from %s.nl_queries" % 
unique_database)
     assert len(result.data) == 2
     assert result.data[0].split("\t") == ["the\n", "\nquick\nbrown", "fox\n"]
-    assert result.data[1].split("\t") == ["\njumped","over the lazy\n","\ndog"]
+    assert result.data[1].split("\t") == ["\njumped", "over the lazy\n", 
"\ndog"]
     # The row count may be computed without parsing each row, so could be 
inconsistent.
     result = self.execute_query("select count(*) from %s.nl_queries" % 
unique_database)
     assert len(result.data) == 1
@@ -72,8 +73,15 @@ class TestDelimitedText(ImpalaTestSuite):
     """Verifies Impala is able to properly handle delimited text that contains
     extended ASCII/latin characters. Marked as running serial because of shared
     cleanup/setup"""
-    self.run_test_case('QueryTest/delimited-latin-text', vector, 
unique_database,
-      encoding="latin-1")
+    result = self.execute_query_expect_success(
+        self.hs2_client,
+        "select * from functional.text_thorn_ecirc_newline order by col1")
+    assert result.tuples() == [('one', 'two', 3, 4),
+                               (b'one\xea', 'two', 3, 4),
+                               (b'one\xea\xea', 'two', 3, 4),
+                               (b'one\xea\xfeone', 'two', 3, 4),
+                               (b'one\xfeone', 'two', 3, 4)]
+    self.run_test_case('QueryTest/delimited-latin-text', vector, 
unique_database)
 
   def test_large_file_of_field_delimiters(self, vector, unique_database):
     """IMPALA-13161: Verifies reading a large file which has full of field 
delimiters
@@ -83,7 +91,7 @@ class TestDelimitedText(ImpalaTestSuite):
     table_loc = self._get_table_location(tbl, vector)
     # Generate a 3GB data file that has full of '\x00' (the default field 
delimiter)
     with open("data.txt", "wb") as f:
-      long_str = "\x00" * 1024 * 1024 * 3
+      long_str = b"\x00" * 1024 * 1024 * 3
       [f.write(long_str) for i in range(1024)]
     check_call(["hdfs", "dfs", "-put", "data.txt", table_loc])
     self.execute_query("refresh " + tbl)
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index f3f5bcf63..7d4e7e1bc 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -47,6 +47,7 @@ from tests.shell.util import run_impala_shell_cmd
 from tests.util.filesystem_utils import FILESYSTEM_PREFIX, get_fs_path, 
IS_HDFS, WAREHOUSE
 from tests.util.get_parquet_metadata import get_parquet_metadata
 from tests.util.iceberg_util import cast_ts, get_snapshots, IcebergCatalogs, 
quote
+from tests.util.parse_util import bytes_to_str
 
 LOG = logging.getLogger(__name__)
 
@@ -1130,35 +1131,35 @@ class TestIcebergTable(IcebergTestSuite):
          {'key': 6, 'value': 1},
          {'key': 7, 'value': 2}]
     assert datafiles[1]['lower_bounds'] == \
-        [{'key': 1, 'value': 'abc'},
+        [{'key': 1, 'value': b'abc'},
          # INT is serialized as 4-byte little endian
-         {'key': 2, 'value': '\x00\x00\x00\x00'},
+         {'key': 2, 'value': b'\x00\x00\x00\x00'},
          # BOOLEAN is serialized as 0x00 for FALSE
-         {'key': 3, 'value': '\x00'},
+         {'key': 3, 'value': b'\x00'},
          # BIGINT is serialized as 8-byte little endian
-         {'key': 4, 'value': '\x40\xaf\x0d\x86\x48\x70\x00\x00'},
+         {'key': 4, 'value': b'\x40\xaf\x0d\x86\x48\x70\x00\x00'},
          # TIMESTAMP is serialized as 8-byte little endian (number of 
microseconds since
          # 1970-01-01 00:00:00)
-         {'key': 5, 'value': '\xc0\xd7\xff\x06\xd0\xff\xff\xff'},
+         {'key': 5, 'value': b'\xc0\xd7\xff\x06\xd0\xff\xff\xff'},
          # DATE is serialized as 4-byte little endian (number of days since 
1970-01-01)
-         {'key': 6, 'value': '\x93\xfe\xff\xff'},
+         {'key': 6, 'value': b'\x93\xfe\xff\xff'},
          # Unlike other numerical values, DECIMAL is serialized as big-endian.
-         {'key': 7, 'value': '\xd8\xf0'}]
+         {'key': 7, 'value': b'\xd8\xf0'}]
     assert datafiles[1]['upper_bounds'] == \
-        [{'key': 1, 'value': 'ghij'},
+        [{'key': 1, 'value': b'ghij'},
          # INT is serialized as 4-byte little endian
-         {'key': 2, 'value': '\x03\x00\x00\x00'},
+         {'key': 2, 'value': b'\x03\x00\x00\x00'},
          # BOOLEAN is serialized as 0x01 for TRUE
-         {'key': 3, 'value': '\x01'},
+         {'key': 3, 'value': b'\x01'},
          # BIGINT is serialized as 8-byte little endian
-         {'key': 4, 'value': '\x81\x58\xc2\x97\x56\xd5\x00\x00'},
+         {'key': 4, 'value': b'\x81\x58\xc2\x97\x56\xd5\x00\x00'},
          # TIMESTAMP is serialized as 8-byte little endian (number of 
microseconds since
          # 1970-01-01 00:00:00)
-         {'key': 5, 'value': '\x80\x02\x86\xef\x2f\x00\x00\x00'},
+         {'key': 5, 'value': b'\x80\x02\x86\xef\x2f\x00\x00\x00'},
          # DATE is serialized as 4-byte little endian (number of days since 
1970-01-01)
-         {'key': 6, 'value': '\x6d\x01\x00\x00'},
+         {'key': 6, 'value': b'\x6d\x01\x00\x00'},
          # Unlike other numerical values, DECIMAL is serialized as big-endian.
-         {'key': 7, 'value': '\x00\xdc\x14'}]
+         {'key': 7, 'value': b'\x00\xdc\x14'}]
 
   def test_using_upper_lower_bound_metrics(self, vector, unique_database):
     self.run_test_case('QueryTest/iceberg-upper-lower-bound-metrics', vector,
@@ -2224,7 +2225,7 @@ class TestIcebergV2Table(IcebergTestSuite):
       table_location = "{0}/test-warehouse/{1}.db/{2}/data".format(
           FILESYSTEM_PREFIX, unique_database, table_name)
       files_result = check_output(["hdfs", "dfs", "-ls", table_location])
-      assert "Found 1 items" in files_result
+      assert "Found 1 items" in bytes_to_str(files_result)
 
   def test_predicate_push_down_hint(self, vector, unique_database):
     self.run_test_case('QueryTest/iceberg-predicate-push-down-hint', vector,
diff --git a/tests/query_test/test_queries.py b/tests/query_test/test_queries.py
index a3c86e771..3b1496bf8 100644
--- a/tests/query_test/test_queries.py
+++ b/tests/query_test/test_queries.py
@@ -161,21 +161,20 @@ class TestQueries(ImpalaTestSuite):
     file_format = vector.get_value('table_format').file_format
     if file_format == 'hbase':
       pytest.xfail(reason="IMPALA-283 - select count(*) produces inconsistent 
results")
-    vector.get_value('exec_option')['disable_outermost_topn'] = 1
-    vector.get_value('exec_option')['analytic_rank_pushdown_threshold'] = 0
-    self.run_test_case('QueryTest/sort', vector)
+    new_vector = deepcopy(vector)
+    options = new_vector.get_value('exec_option')
+    options['disable_outermost_topn'] = 1
+    options['analytic_rank_pushdown_threshold'] = 0
+    self.run_test_case('QueryTest/sort', new_vector)
     # We can get the sort tests for free from the top-n file
-    self.run_test_case('QueryTest/top-n', vector)
+    self.run_test_case('QueryTest/top-n', new_vector)
 
     if file_format in ['parquet', 'orc']:
       # set timestamp options to get consistent results for both format.
-      new_vector = deepcopy(vector)
-      options = new_vector.get_value('exec_option')
       options['convert_legacy_hive_parquet_utc_timestamps'] = 1
       options['timezone'] = '"Europe/Budapest"'
       self.run_test_case('QueryTest/sort-complex', new_vector)
 
-
   def test_partitioned_top_n(self, vector):
     """Test partitioned Top-N operator."""
     if vector.get_value('table_format').file_format == "hbase":
diff --git a/tests/shell/test_shell_commandline.py 
b/tests/shell/test_shell_commandline.py
index 369a1fdc5..0658c0776 100644
--- a/tests/shell/test_shell_commandline.py
+++ b/tests/shell/test_shell_commandline.py
@@ -33,11 +33,11 @@ from time import sleep, time
 from builtins import range
 import pytest
 
+from impala_shell.impala_client import utf8_encode_if_needed
 from impala_shell.impala_shell import ImpalaShell as ImpalaShellClass
 from tests.common.environ import ImpalaTestClusterProperties
 from tests.common.impala_service import ImpaladService
 from tests.common.impala_test_suite import IMPALAD_HS2_HOST_PORT, 
ImpalaTestSuite
-from tests.common.skip import SkipIf
 from tests.common.test_dimensions import (
     create_client_protocol_dimension,
     create_client_protocol_strict_dimension,
@@ -60,8 +60,9 @@ from tests.shell.util import (
 DEFAULT_QUERY = 'select 1'
 QUERY_FILE_PATH = os.path.join(os.environ['IMPALA_HOME'], 'tests', 'shell')
 
-RUSSIAN_CHARS = (u"А, Б, В, Г, Д, Е, Ё, Ж, З, И, Й, К, Л, М, Н, О, П, Р,"
-                 u"С, Т, У, Ф, Х, Ц,Ч, Ш, Щ, Ъ, Ы, Ь, Э, Ю, Я")
+RUSSIAN_CHARS = utf8_encode_if_needed(
+    u"А, Б, В, Г, Д, Е, Ё, Ж, З, И, Й, К, Л, М, Н, О, П, Р,"
+    u"С, Т, У, Ф, Х, Ц,Ч, Ш, Щ, Ъ, Ы, Ь, Э, Ю, Я")
 
 """IMPALA-12216 implemented timestamp to be printed in case of any 
error/warning
   during query execution, below is an example :
@@ -90,6 +91,7 @@ def find_query_option(key, string, strip_brackets=True):
   assert len(values) == 1
   return values[0].strip("[]") if strip_brackets else values[0]
 
+
 @pytest.fixture
 def empty_table(unique_database, request):
   """Create an empty table within the test database before executing test.
@@ -299,7 +301,6 @@ class TestImpalaShell(ImpalaTestSuite):
         "Column metadata states there are 11 values, but read 10 values from 
column id."
     )
 
-
   def test_completed_query_errors_2(self, vector):
     if vector.get_value('strict_hs2_protocol'):
       pytest.skip("Impala-10827: Multiple queries not supported in strict hs2 
mode.")
@@ -412,7 +413,7 @@ class TestImpalaShell(ImpalaTestSuite):
     args = ['-p', '-q', 'select 1; profile;']
     result_set = run_impala_shell_cmd(vector, args)
     # This regex helps us uniquely identify a profile.
-    regex = re.compile("Operator\s+#Hosts\s+#Inst\s+Avg\s+Time")
+    regex = re.compile(r"Operator\s+#Hosts\s+#Inst\s+Avg\s+Time")
     # We expect two query profiles.
     assert len(re.findall(regex, result_set.stdout)) == 2, \
         "Could not detect two profiles, stdout: %s" % result_set.stdout
@@ -626,24 +627,24 @@ class TestImpalaShell(ImpalaTestSuite):
 
   def test_international_characters(self, vector):
     """Sanity test to ensure that the shell can read international 
characters."""
-    args = ['-B', '-q', "select '{0}'".format(RUSSIAN_CHARS.encode('utf-8'))]
+    args = ['-B', '-q', "select '{0}'".format(RUSSIAN_CHARS)]
     result = run_impala_shell_cmd(vector, args)
     assert 'UnicodeDecodeError' not in result.stderr
-    assert RUSSIAN_CHARS.encode('utf-8') in result.stdout
+    assert RUSSIAN_CHARS in result.stdout
 
   def test_international_characters_prettyprint(self, vector):
     """IMPALA-2717: ensure we can handle international characters in 
pretty-printed
     output"""
-    args = ['-q', "select '{0}'".format(RUSSIAN_CHARS.encode('utf-8'))]
+    args = ['-q', "select '{0}'".format(RUSSIAN_CHARS)]
     result = run_impala_shell_cmd(vector, args)
     assert 'UnicodeDecodeError' not in result.stderr
-    assert RUSSIAN_CHARS.encode('utf-8') in result.stdout
+    assert RUSSIAN_CHARS in result.stdout
 
   def test_international_characters_prettyprint_tabs(self, vector):
     """IMPALA-2717: ensure we can handle international characters in 
pretty-printed
     output when pretty-printing falls back to delimited output."""
 
-    args = ['-q', "select '{0}\\t'".format(RUSSIAN_CHARS.encode('utf-8'))]
+    args = ['-q', "select '{0}\\t'".format(RUSSIAN_CHARS)]
 
     result = run_impala_shell_cmd(vector, args)
     protocol = vector.get_value('protocol')
@@ -654,13 +655,13 @@ class TestImpalaShell(ImpalaTestSuite):
       assert protocol in ('hs2', 'hs2-http'), protocol
       assert 'Reverting to tab delimited text' not in result.stderr
     assert 'UnicodeDecodeError' not in result.stderr
-    assert RUSSIAN_CHARS.encode('utf-8') in result.stdout
+    assert RUSSIAN_CHARS in result.stdout
 
   def test_international_characters_profile(self, vector):
     """IMPALA-12145: ensure we can handle international characters in the 
profile. """
     if vector.get_value('strict_hs2_protocol'):
       pytest.skip("Profile not supported in strict hs2 mode.")
-    text = RUSSIAN_CHARS.encode('utf-8')
+    text = RUSSIAN_CHARS
     args = ['-o', '/dev/null', '-p', '-q', "select '{0}'".format(text)]
     result = run_impala_shell_cmd(vector, args)
     assert 'UnicodeDecodeError' not in result.stderr
@@ -687,7 +688,7 @@ class TestImpalaShell(ImpalaTestSuite):
     # is running against Hive which is another variable. On thrift 0.14 and 
higher,
     # talking to Hive, the result is b'\\xaa', so allow this as another 
possibility.
     assert '\xef\xbf\xbd' in result.stdout or '\xaa' in result.stdout or \
-           '\\xaa' in result.stdout
+           '\\xaa' in result.stdout or '�' in result.stdout
 
   def test_global_config_file(self, vector):
     """Test global and user configuration files."""
@@ -776,13 +777,13 @@ class TestImpalaShell(ImpalaTestSuite):
     assert "WARNING: Option 'config_file' can be only set from shell." in 
result.stderr
     err_msg = ("WARNING: Unable to read configuration file correctly. "
                "Ignoring unrecognized config option: 'invalid_option'\n")
-    assert  err_msg in result.stderr
+    assert err_msg in result.stderr
 
     args = ['--config_file=%s/impalarc_with_error' % QUERY_FILE_PATH]
     result = run_impala_shell_cmd(vector, args, expect_success=False)
     err_msg = ("Unexpected value in configuration file. "
                "'maybe' is not a valid value for a boolean option.")
-    assert  err_msg in result.stderr
+    assert err_msg in result.stderr
 
     # live_progress and live_summary are not supported with strict_hs2_protocol
     if not vector.get_value('strict_hs2_protocol'):
@@ -886,7 +887,7 @@ class TestImpalaShell(ImpalaTestSuite):
 
     # Test with an escaped variable.
     result = run_impala_shell_cmd(vector, ['--var=msg1=1', 
'--var=msg2=${var:msg1}2',
-                                           
'--var=msg3=\${var:msg1}${var:msg2}',
+                                           
'--var=msg3=\\${var:msg1}${var:msg2}',
                                            "--query=select '${var:msg3}'"])
     self._validate_shell_messages(result.stderr, ['${var:msg1}12', 'Fetched 1 
row(s)'],
                                   should_exist=True)
@@ -894,7 +895,7 @@ class TestImpalaShell(ImpalaTestSuite):
     # Referencing a non-existent variable will result in an error.
     result = run_impala_shell_cmd(vector, [
         '--var=msg1=1', '--var=msg2=${var:doesnotexist}2',
-        '--var=msg3=\${var:msg1}${var:msg2}', "--query=select '${var:msg3}'"],
+        '--var=msg3=\\${var:msg1}${var:msg2}', "--query=select '${var:msg3}'"],
         expect_success=False)
     self._validate_shell_messages(result.stderr,
                                   ['Error: Unknown variable DOESNOTEXIST',
@@ -1045,7 +1046,7 @@ class TestImpalaShell(ImpalaTestSuite):
             if e.errno != errno.EINTR:
               raise
         data = connection.recv(1024)
-        assert expected_output in data
+        assert expected_output in str(data)
       finally:
         if impala_shell.poll() is None:
           impala_shell.kill()
@@ -1063,7 +1064,7 @@ class TestImpalaShell(ImpalaTestSuite):
     try:
       socket.setdefaulttimeout(10)
       s = socket.socket()
-      s.bind(("",0))
+      s.bind(("", 0))
       s.listen(1)
       test_impalad_port = s.getsockname()[1]
       load_balancer_fqdn = "my-load-balancer.local"
@@ -1087,7 +1088,7 @@ class TestImpalaShell(ImpalaTestSuite):
       assert "Could not execute command:" in result.stderr
     else:
       assert "Encountered: EOF" in result.stderr
-    args = ['-q', 'with v as (select 1) \;"']
+    args = ['-q', 'with v as (select 1) \\;"']
     result = run_impala_shell_cmd(vector, args, expect_success=False)
     if vector.get_value('strict_hs2_protocol'):
       assert "cannot recognize input near"
@@ -1103,15 +1104,16 @@ class TestImpalaShell(ImpalaTestSuite):
     sql_file, sql_path = tempfile.mkstemp()
     # This generates a sql file size of ~50K.
     num_cols = 1000
-    os.write(sql_file, "select \n")
-    for i in range(num_cols):
-      if i < num_cols:
-        os.write(sql_file, "col_{0} as a{1},\n".format(i, i))
-        os.write(sql_file, "col_{0} as b{1},\n".format(i, i))
-        os.write(sql_file, "col_{0} as c{1}{2}\n".format(
-            i, i, "," if i < num_cols - 1 else ""))
-    os.write(sql_file, "from non_existence_large_table;")
-    os.close(sql_file)
+    with open(sql_path, 'w') as f:
+      f.write("select \n")
+      for i in range(num_cols):
+        if i < num_cols:
+          f.write("col_{0} as a{1},\n".format(i, i))
+          f.write("col_{0} as b{1},\n".format(i, i))
+          f.write("col_{0} as c{1}{2}\n".format(
+              i, i, "," if i < num_cols - 1 else ""))
+      f.write("from non_existence_large_table;")
+      f.close()
 
     try:
       args = ['-f', sql_path, '-d', unique_database]
@@ -1148,7 +1150,7 @@ class TestImpalaShell(ImpalaTestSuite):
     tzname = find_query_option("TIMEZONE", result_set.stdout)
     assert os.path.isfile("/usr/share/zoneinfo/" + tzname)
 
-  def test_find_query_option(self, vector):
+  def test_find_query_option(self):
     """Test utility function find_query_option()."""
     test_input = """
         not_an_option
@@ -1177,8 +1179,8 @@ class TestImpalaShell(ImpalaTestSuite):
     # --connect_timeout_ms not supported with HTTP transport. Refer to the 
comment
     # in ImpalaClient::_get_http_transport() for details.
     # --http_socket_timeout_s not supported for strict_hs2_protocol.
-    if (vector.get_value('protocol') == 'hs2-http' and
-          vector.get_value('strict_hs2_protocol')):
+    if (vector.get_value('protocol') == 'hs2-http'
+        and vector.get_value('strict_hs2_protocol')):
         pytest.skip("THRIFT-4600")
 
     with closing(socket.socket()) as s:
@@ -1259,9 +1261,9 @@ class TestImpalaShell(ImpalaTestSuite):
     assert "|                    |" in result.stdout, result.stdout
     assert "| árvíztűrőtükörfúró |" in result.stdout, result.stdout
     assert "| 你好hello          |" in result.stdout, result.stdout
-    assert "| \x00\xef\xbf\xbd\x00\xef\xbf\xbd                 |" in 
result.stdout, \
-        result.stdout
-    assert '| \xef\xbf\xbdD3"\x11\x00              |' in result.stdout, 
result.stdout
+    # The last two output lines are malformed UTF-8 strings.
+    assert "| " + utf8_encode_if_needed("\0�\0�") in result.stdout, 
result.stdout
+    assert "| " + utf8_encode_if_needed("�D3\"") in result.stdout, 
result.stdout
 
   def test_binary_as_string(self, vector):
     query = """select cast(binary_col as string) from functional.binary_tbl
@@ -1292,8 +1294,8 @@ class TestImpalaShell(ImpalaTestSuite):
     # way that python2 and python3 represent floating point values, the output
     # from the shell will differ with regard to which version of python the
     # shell is running under.
-    assert("3\t3\t30.299999999999997" in result.stdout or
-      "3\t3\t30.3" in result.stdout), result.stdout
+    assert ("3\t3\t30.299999999999997" in result.stdout
+            or "3\t3\t30.3" in result.stdout), result.stdout
 
     assert "4\t4\t40.4" in result.stdout, result.stdout
 
@@ -1410,8 +1412,8 @@ class TestImpalaShell(ImpalaTestSuite):
 
   def test_http_socket_timeout(self, vector):
     """Test setting different http_socket_timeout_s values."""
-    if (vector.get_value('strict_hs2_protocol') or
-          vector.get_value('protocol') != 'hs2-http'):
+    if (vector.get_value('strict_hs2_protocol')
+        or vector.get_value('protocol') != 'hs2-http'):
         pytest.skip("http socket timeout not supported in strict hs2 mode."
                     " Only supported with hs2-http protocol.")
     # Test http_socket_timeout_s=0, expect errors
diff --git a/tests/shell/test_shell_interactive.py 
b/tests/shell/test_shell_interactive.py
index bedc10216..3ce3d6cfe 100755
--- a/tests/shell/test_shell_interactive.py
+++ b/tests/shell/test_shell_interactive.py
@@ -35,6 +35,7 @@ from time import sleep
 import pexpect
 import pytest
 
+from impala_shell.impala_client import utf8_encode_if_needed
 # This import is the actual ImpalaShell class from impala_shell.py.
 # We rename it to ImpalaShellClass here because we later import another
 # class called ImpalaShell from tests/shell/util.py, and we don't want
@@ -63,6 +64,7 @@ from tests.shell.util import (
     spawn_shell,
     stderr_get_first_error_msg,
 )
+from tests.util.parse_util import bytes_to_str
 
 QUERY_FILE_PATH = os.path.join(os.environ['IMPALA_HOME'], 'tests', 'shell')
 
@@ -117,7 +119,7 @@ class 
RequestHandler503(http.server.SimpleHTTPRequestHandler):
     self.end_headers()
     if self.should_send_body_text():
       # Optionally send body text with 503 message.
-      self.wfile.write("EXTRA")
+      self.wfile.write(b"EXTRA")
 
 
 class RequestHandler503Extra(RequestHandler503):
@@ -195,7 +197,7 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
     proc.expect(":{0}] {1}>".format(get_impalad_port(vector), db))
     if not expectations: return
     for e in expectations:
-      assert e in proc.before
+      assert e in bytes_to_str(proc.before)
 
   def _wait_for_num_open_sessions(self, vector, impala_service, expected, err):
     """Helper method to wait for the number of open sessions to reach 
'expected'."""
@@ -403,7 +405,7 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
   def test_cancellation_mid_command(self, vector):
     """Test that keyboard interrupt cancels multiline query strings"""
     if vector.get_value('strict_hs2_protocol'):
-      pytest.skip("IMPALA-10827: Cancellation infrastructure does not " +
+      pytest.skip("IMPALA-10827: Cancellation infrastructure does not "
           "work in strict hs2 mode.")
     shell_cmd = get_shell_cmd(vector)
     multiline_query = ["select column_1\n", "from table_1\n", "where ..."]
@@ -413,7 +415,7 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
     for query_line in multiline_query:
       child_proc.send(query_line)
     child_proc.sendintr()
-    child_proc.expect("\^C")
+    child_proc.expect(r"\^C")
     child_proc.expect(PROMPT_REGEX)
     child_proc.sendline('quit;')
     child_proc.wait()
@@ -425,7 +427,7 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
     child_proc.send("\n")
     child_proc.expect(">")
     child_proc.sendintr()
-    child_proc.expect("> \^C")
+    child_proc.expect(r"> \^C")
     child_proc.expect(PROMPT_REGEX)
     child_proc.sendline('quit;')
     child_proc.wait()
@@ -435,7 +437,7 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
     if vector.get_value('strict_hs2_protocol'):
       pytest.skip("IMPALA-10827: Failed, need to investigate.")
     # test a unicode query spanning multiple lines
-    unicode_bytes = u'\ufffd'.encode('utf-8')
+    unicode_bytes = utf8_encode_if_needed(u'\ufffd')
     args = "select '{0}'\n;".format(unicode_bytes)
     result = run_impala_shell_interactive(vector, args)
     assert "Fetched 1 row(s)" in result.stderr, result.stderr
@@ -449,7 +451,7 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
     child_proc = spawn_shell(shell_cmd)
     child_proc.expect(PROMPT_REGEX)
     child_proc.sendline("select '{0}'\n;".format(unicode_bytes))
-    child_proc.expect("Fetched 1 row\(s\) in [0-9]+\.?[0-9]*s")
+    child_proc.expect(r"Fetched 1 row\(s\) in [0-9]+\.?[0-9]*s")
     child_proc.expect(PROMPT_REGEX)
     child_proc.sendline('quit;')
     child_proc.wait()
@@ -477,7 +479,6 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
     assert "Fetched 1 row(s)" in result.stderr, result.stderr
     assert "세율중분류구분코드" in result.stdout
 
-
   def test_welcome_string(self, vector):
     """Test that the shell's welcome message is only printed once
     when the shell is started. Ensure it is not reprinted on errors.
@@ -616,7 +617,7 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
       run_impala_shell_interactive(vector, "drop database if exists foo;")
       self.create_impala_clients()
 
-  def test_multiline_queries_in_history(self, vector, tmp_history_file):
+  def test_multiline_queries_in_history(self, vector, tmp_history_file):  # 
noqa: U100
     """Test to ensure that multiline queries with comments are preserved in 
history
 
     Ensure that multiline queries are preserved when they're read back from 
history.
@@ -638,7 +639,7 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
     for query, _ in queries:
       child_proc.expect(PROMPT_REGEX)
       child_proc.sendline(query)
-      child_proc.expect("Fetched 1 row\(s\) in [0-9]+\.?[0-9]*s")
+      child_proc.expect(r"Fetched 1 row\(s\) in [0-9]+\.?[0-9]*s")
     child_proc.expect(PROMPT_REGEX)
     child_proc.sendline('quit;')
     child_proc.wait()
@@ -649,7 +650,8 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
       assert history_entry in result.stderr, "'%s' not in '%s'" % 
(history_entry,
                                                                    
result.stderr)
 
-  def test_history_does_not_duplicate_on_interrupt(self, vector, 
tmp_history_file):
+  def test_history_does_not_duplicate_on_interrupt(
+      self, vector, tmp_history_file):  # noqa: U100
     """This test verifies that once the cmdloop is broken the history file 
will not be
     re-read. The cmdloop can be broken when the user sends a SIGINT or 
exceptions
     occur."""
@@ -662,7 +664,7 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
     # initialize history
     child_proc.expect(PROMPT_REGEX)
     child_proc.sendline("select 1;")
-    child_proc.expect("Fetched 1 row\(s\) in [0-9]+\.?[0-9]*s")
+    child_proc.expect(r"Fetched 1 row\(s\) in [0-9]+\.?[0-9]*s")
     child_proc.expect(PROMPT_REGEX)
     child_proc.sendline("quit;")
     child_proc.wait()
@@ -671,9 +673,9 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
     child_proc = spawn_shell(shell_cmd)
     child_proc.expect(PROMPT_REGEX)
     child_proc.sendintr()
-    child_proc.expect("\^C")
+    child_proc.expect(r"\^C")
     child_proc.sendline("select 2;")
-    child_proc.expect("Fetched 1 row\(s\) in [0-9]+\.?[0-9]*s")
+    child_proc.expect(r"Fetched 1 row\(s\) in [0-9]+\.?[0-9]*s")
     child_proc.expect(PROMPT_REGEX)
     child_proc.sendline("quit;")
     child_proc.wait()
@@ -682,12 +684,14 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
     p = ImpalaShell(vector)
     p.send_cmd('history')
     result = p.get_result().stderr.splitlines()
-    assert "[1]: select 1;" == result[1]
-    assert "[2]: quit;" == result[2]
-    assert "[3]: select 2;" == result[3]
-    assert "[4]: quit;" == result[4]
-
-  def test_history_file_option(self, vector, tmp_history_file):
+    # Python 2 and Python 3 shell have different first lines in the history 
output.
+    start_idx = 1 if "Server version: " in result[0] else 0
+    assert "[1]: select 1;" == result[start_idx]
+    assert "[2]: quit;" == result[start_idx + 1]
+    assert "[3]: select 2;" == result[start_idx + 2]
+    assert "[4]: quit;" == result[start_idx + 3]
+
+  def test_history_file_option(self, vector, tmp_history_file):  # noqa: U100
     """
     Setting the 'tmp_history_file' fixture above means that the IMPALA_HISTFILE
     environment will be overridden. Here we override that environment by 
passing
@@ -704,7 +708,7 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
       history_contents = open(new_hist.name).read()
       assert "select 'hi'" in history_contents
 
-  def test_rerun(self, vector, tmp_history_file):
+  def test_rerun(self, vector, tmp_history_file):  # noqa: U100
     """Smoke test for the 'rerun' command"""
     if vector.get_value('strict_hs2_protocol'):
       pytest.skip("Rerun not supported in strict hs2 mode.")
@@ -721,11 +725,12 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
         ("second_command"))
     child_proc.sendline('history;')
     child_proc.expect(":{0}] default>".format(get_impalad_port(vector)))
-    assert '[1]: select \'first_command\';' in child_proc.before
-    assert '[2]: select \'second_command\';' in child_proc.before
-    assert '[3]: history;' in child_proc.before
+    before_line = child_proc.before.decode('UTF-8', 'replace')
+    assert '[1]: select \'first_command\';' in before_line
+    assert '[2]: select \'second_command\';' in before_line
+    assert '[3]: history;' in before_line
     # Rerunning command should not add an entry into history.
-    assert '[4]' not in child_proc.before
+    assert '[4]' not in before_line
     self._expect_with_cmd(child_proc, "@0", vector, ("Command index out of 
range"))
     self._expect_with_cmd(child_proc, "rerun   4", vector, ("Command index out 
of range"))
     self._expect_with_cmd(child_proc, "@-4", vector, ("Command index out of 
range"))
@@ -861,7 +866,7 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
       # IMPALA-5416: Test that two source commands on a line won't crash the 
shell.
       result = run_impala_shell_interactive(
           vector, "source shell.cmds;source shell.cmds;")
-      assert len(re.findall("version\(\)", result.stdout)) == 2
+      assert len(re.findall(r"version\(\)", result.stdout)) == 2
     finally:
       os.chdir(cwd)
 
@@ -888,11 +893,11 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
     # the client does not fetch. For statements returning 0 rows we do not
     # want an empty line in stdout.
     result = run_impala_shell_interactive(vector, "-- foo \n use default;")
-    assert re.search('> \[', result.stdout)
+    assert re.search(r'> \[', result.stdout)
     result = run_impala_shell_interactive(vector,
         "select * from functional.alltypes limit 0;")
     assert "Fetched 0 row(s)" in result.stderr
-    assert re.search('> \[', result.stdout)
+    assert re.search(r'> \[', result.stdout)
 
   def test_set_and_set_all(self, vector):
     """IMPALA-2181. Tests the outputs of SET and SET ALL commands. SET should 
contain the
@@ -1050,15 +1055,15 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
       assert '| \'--\' |' in result.stdout
       assert '| --   |' in result.stdout
 
-    query = ('select * from (\n' +
-             'select count(*) from functional.alltypes\n' +
+    query = ('select * from (\n'
+             'select count(*) from functional.alltypes\n'
              ') v; -- Incomplete SQL statement in this line')
     result = run_impala_shell_interactive(vector, query)
     assert '| count(*) |' in result.stdout
 
-    query = ('select id from functional.alltypes\n' +
-             'order by id; /*\n' +
-             '* Multi-line comment\n' +
+    query = ('select id from functional.alltypes\n'
+             'order by id; /*\n'
+             '* Multi-line comment\n'
              '*/')
     result = run_impala_shell_interactive(vector, query)
     assert '| id   |' in result.stdout
@@ -1154,7 +1159,7 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
     proc.sendeof()
     proc.wait()
 
-  def test_strip_leading_comment(self, vector):
+  def test_strip_leading_comment(self):
     """Test stripping leading comments from SQL statements"""
     assert ('--delete\n', 'select 1') == \
         ImpalaShellClass.strip_leading_comment('--delete\nselect 1')
@@ -1230,7 +1235,6 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
         "Bad date/time conversion format: yyyy年MM月dd日"
     )
 
-
   def test_timezone_validation(self, vector):
     """Test that query option TIMEZONE is validated when executing a query.
 
@@ -1300,7 +1304,7 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
                   "-i{0}:{1}".format(http_503_server_extra.HOST,
                                      http_503_server_extra.PORT)]
     shell_proc = spawn_shell(impala_shell_executable + shell_args)
-    shell_proc.expect("HTTP code 503: Service Unavailable \[EXTRA\]", 
timeout=10)
+    shell_proc.expect(r"HTTP code 503: Service Unavailable \[EXTRA\]", 
timeout=10)
 
 
 def run_impala_shell_interactive(vector, input_lines, shell_args=None,
diff --git a/tests/shell/util.py b/tests/shell/util.py
index 191f24849..30f0afb3d 100755
--- a/tests/shell/util.py
+++ b/tests/shell/util.py
@@ -23,10 +23,8 @@ from contextlib import closing
 import logging
 import os
 import re
-import shlex
 import socket
 from subprocess import PIPE, Popen
-import sys
 import time
 
 import pexpect
@@ -54,6 +52,7 @@ LOG.addHandler(logging.StreamHandler())
 SHELL_HISTORY_FILE = os.path.expanduser("~/.impalahistory")
 IMPALA_HOME = os.environ['IMPALA_HOME']
 
+
 def build_shell_env(env=None):
   """ Construct the environment for the shell to run in based on 'env', or the 
current
   process's environment if env is None."""
@@ -69,24 +68,24 @@ def build_shell_env(env=None):
 
 
 def assert_var_substitution(result):
-  assert_pattern(r'\bfoo_number=.*$', 'foo_number= 123123', result.stdout, \
+  assert_pattern(r'\bfoo_number=.*$', 'foo_number= 123123', result.stdout,
     'Numeric values not replaced correctly')
-  assert_pattern(r'\bfoo_string=.*$', 'foo_string=123', result.stdout, \
+  assert_pattern(r'\bfoo_string=.*$', 'foo_string=123', result.stdout,
     'String values not replaced correctly')
-  assert_pattern(r'\bVariables:[\s\n]*BAR:\s*[0-9]*\n\s*FOO:\s*[0-9]*', \
-    'Variables:\n\tBAR: 456\n\tFOO: 123', result.stdout, \
+  assert_pattern(r'\bVariables:[\s\n]*BAR:\s*[0-9]*\n\s*FOO:\s*[0-9]*',
+    'Variables:\n\tBAR: 456\n\tFOO: 123', result.stdout,
     "Set variable not listed correctly by the first SET command")
-  assert_pattern(r'\bError: Unknown variable FOO1$', \
-    'Error: Unknown variable FOO1', result.stderr, \
+  assert_pattern(r'\bError: Unknown variable FOO1$',
+    'Error: Unknown variable FOO1', result.stderr,
     'Missing variable FOO1 not reported correctly')
-  assert_pattern(r'\bmulti_test=.*$', 'multi_test=456_123_456_123', \
+  assert_pattern(r'\bmulti_test=.*$', 'multi_test=456_123_456_123',
     result.stdout, 'Multiple replaces not working correctly')
-  assert_pattern(r'\bError:\s*Unknown\s*substitution\s*syntax\s*' +
-                 r'\(RANDOM_NAME\). Use \${VAR:var_name}', \
-    'Error: Unknown substitution syntax (RANDOM_NAME). Use ${VAR:var_name}', \
+  assert_pattern(r'\bError:\s*Unknown\s*substitution\s*syntax\s*'
+                 r'\(RANDOM_NAME\). Use \${VAR:var_name}',
+    'Error: Unknown substitution syntax (RANDOM_NAME). Use ${VAR:var_name}',
     result.stderr, "Invalid variable reference")
   assert_pattern(r'"This should be not replaced: \${VAR:foo} \${HIVEVAR:bar}"',
-    '"This should be not replaced: ${VAR:foo} ${HIVEVAR:bar}"', \
+    '"This should be not replaced: ${VAR:foo} ${HIVEVAR:bar}"',
     result.stdout, "Variable escaping not working")
   assert_pattern(r'\bVariable MYVAR set to.*$', 'Variable MYVAR set to foo123',
     result.stderr, 'No evidence of MYVAR variable being set.')
@@ -97,11 +96,11 @@ def assert_var_substitution(result):
     result.stdout, 'No evidence of variable FOO being unset')
   assert_pattern(r'\bUnsetting variable BAR$', 'Unsetting variable BAR',
     result.stdout, 'No evidence of variable BAR being unset')
-  assert_pattern(r'\bVariables:[\s\n]*No variables defined\.$', \
-    'Variables:\n\tNo variables defined.', result.stdout, \
+  assert_pattern(r'\bVariables:[\s\n]*No variables defined\.$',
+    'Variables:\n\tNo variables defined.', result.stdout,
     'Unset variables incorrectly listed by third SET command.')
-  assert_pattern(r'\bNo variable called NONEXISTENT is set', \
-    'No variable called NONEXISTENT is set', result.stdout, \
+  assert_pattern(r'\bNo variable called NONEXISTENT is set',
+    'No variable called NONEXISTENT is set', result.stdout,
     'Problem unsetting non-existent variable.')
   assert_pattern(r'\bVariable COMMENT_TYPE1 set to.*$',
     'Variable COMMENT_TYPE1 set to ok', result.stderr,
@@ -112,11 +111,12 @@ def assert_var_substitution(result):
   assert_pattern(r'\bVariable COMMENT_TYPE3 set to.*$',
     'Variable COMMENT_TYPE3 set to ok', result.stderr,
     'No evidence of COMMENT_TYPE3 variable being set.')
-  assert_pattern(r'\bVariables:[\s\n]*COMMENT_TYPE1:.*[\s\n]*' + \
-    'COMMENT_TYPE2:.*[\s\n]*COMMENT_TYPE3:.*$',
-    'Variables:\n\tCOMMENT_TYPE1: ok\n\tCOMMENT_TYPE2: ok\n\tCOMMENT_TYPE3: 
ok', \
+  assert_pattern(r'\bVariables:[\s\n]*COMMENT_TYPE1:.*[\s\n]*'
+    r'COMMENT_TYPE2:.*[\s\n]*COMMENT_TYPE3:.*$',
+    'Variables:\n\tCOMMENT_TYPE1: ok\n\tCOMMENT_TYPE2: ok\n\tCOMMENT_TYPE3: 
ok',
     result.stdout, 'Set variables not listed correctly by the SET command')
 
+
 def assert_pattern(pattern, result, text, message):
   """Asserts that the pattern, when applied to text, returns the expected 
result"""
   m = re.search(pattern, text, re.MULTILINE)
@@ -217,6 +217,7 @@ def get_open_sessions_metric(vector):
     assert protocol == 'beeswax', protocol
     return 'impala-server.num-open-beeswax-sessions'
 
+
 class ImpalaShellResult(object):
   def __init__(self):
     self.rc = 0
@@ -380,12 +381,13 @@ def get_dev_impala_shell_executable():
 def create_impala_shell_executable_dimension(dev_only=False):
   _, include_pypi = get_dev_impala_shell_executable()
   dimensions = []
-  if os.getenv("IMPALA_SYSTEM_PYTHON2"):
+  python3_pytest = (os.getenv("IMPALA_USE_PYTHON3_TESTS", "false") == "true")
+  if os.getenv("IMPALA_SYSTEM_PYTHON2") and not python3_pytest:
     dimensions.append('dev')
   if os.getenv("IMPALA_SYSTEM_PYTHON3"):
     dimensions.append('dev3')
   if include_pypi and not dev_only:
-    if os.getenv("IMPALA_SYSTEM_PYTHON2"):
+    if os.getenv("IMPALA_SYSTEM_PYTHON2") and not python3_pytest:
       dimensions.append('python2')
     if os.getenv("IMPALA_SYSTEM_PYTHON3"):
       dimensions.append('python3')
diff --git a/tests/statestore/test_statestore.py 
b/tests/statestore/test_statestore.py
index e6417721a..3661d09d1 100644
--- a/tests/statestore/test_statestore.py
+++ b/tests/statestore/test_statestore.py
@@ -20,11 +20,14 @@ from collections import defaultdict
 import json
 import logging
 import socket
+import sys
 import threading
 import time
 import traceback
 import uuid
 
+import pytest
+
 from builtins import range
 from thrift.protocol import TBinaryProtocol
 from thrift.server.TServer import TServer
@@ -115,6 +118,7 @@ class KillableThreadedServer(TServer):
     self.port = self.serverTransport.port
 
   def shutdown(self):
+    LOG.info('Server localhost:{} is shutting down'.format(self.port))
     self.is_shutdown = True
     self.serverTransport.close()
     self.wait_until_down()
@@ -127,20 +131,22 @@ class KillableThreadedServer(TServer):
       cnxn = TSocket.TSocket('localhost', self.port)
       try:
         cnxn.open()
+        LOG.info('Server localhost:{} is up'.format(cnxn.port))
         return
       except Exception:
         if i == num_tries - 1: raise
-      time.sleep(0.1)
+      time.sleep(0.5)
 
   def wait_until_down(self, num_tries=10):
     for i in range(num_tries):
       cnxn = TSocket.TSocket('localhost', self.port)
       try:
         cnxn.open()
-        time.sleep(0.1)
       except Exception:
+        LOG.info('Server localhost:{} is down'.format(cnxn.port))
         return
-    raise Exception("Server did not stop")
+      time.sleep(0.5)
+    raise Exception("Server localhost:{} did not stop".format(cnxn.port))
 
   def serve(self):
     self.serverTransport.listen()
@@ -149,8 +155,12 @@ class KillableThreadedServer(TServer):
       # Since accept() can take a while, check again if the server is shutdown 
to avoid
       # starting an unnecessary thread.
       if self.is_shutdown: return
-      t = threading.Thread(target=self.handle, args=(client,))
-      t.setDaemon(self.daemon)
+      t = None
+      if sys.version_info.major < 3:
+        t = threading.Thread(target=self.handle, args=(client,))
+        t.setDaemon(True)
+      else:
+        t = threading.Thread(target=self.handle, args=(client,), 
daemon=self.daemon)
       t.start()
 
   def handle(self, client):
@@ -196,6 +206,9 @@ class StatestoreSubscriber(object):
     self.heartbeat_cb, self.update_cb = heartbeat_cb, update_cb
     self.subscriber_id = "python-test-client-%s" % uuid.uuid1()
     self.exception = None
+    self.server = None
+    self.server_thread = None
+    self.client_transport = None
 
   def __enter__(self):
     return self
@@ -239,19 +252,24 @@ class StatestoreSubscriber(object):
     return response
 
   def __init_server(self):
+    LOG.info('Initializing server')
     processor = Subscriber.Processor(self)
     transport = WildcardServerSocket()
     tfactory = TTransport.TBufferedTransportFactory()
     pfactory = TBinaryProtocol.TBinaryProtocolFactory()
     self.server = KillableThreadedServer(processor, transport, tfactory, 
pfactory,
                                          daemon=True)
-    self.server_thread = threading.Thread(target=self.server.serve)
-    self.server_thread.setDaemon(True)
+    if sys.version_info.major < 3:
+      self.server_thread = threading.Thread(target=self.server.serve)
+      self.server_thread.setDaemon(True)
+    else:
+      self.server_thread = threading.Thread(target=self.server.serve, 
daemon=True)
     self.server_thread.start()
     self.server.wait_until_up()
     self.port = self.server.port
 
   def __init_client(self):
+    LOG.info('Initializing client')
     self.client_transport = \
         TTransport.TBufferedTransport(TSocket.TSocket('localhost', 24000))
     self.protocol = TBinaryProtocol.TBinaryProtocol(self.client_transport)
@@ -352,6 +370,7 @@ class StatestoreSubscriber(object):
       time.sleep(0.2)
 
 
+@pytest.mark.execute_serially
 @SkipIfDockerizedCluster.statestore_not_exposed
 class TestStatestore(BaseTestSuite):
   def make_topic_update(self, topic_name, key_template="foo", 
value_template="bar",
diff --git a/tests/stress/test_update_stress.py 
b/tests/stress/test_update_stress.py
index 5d76451af..013cefc02 100644
--- a/tests/stress/test_update_stress.py
+++ b/tests/stress/test_update_stress.py
@@ -29,6 +29,7 @@ from tests.common.skip import SkipIfFS
 from tests.common.test_dimensions import create_exec_option_dimension
 from tests.stress.stress_util import run_tasks, Task
 from tests.util.filesystem_utils import FILESYSTEM_PREFIX, IS_HDFS
+from tests.util.parse_util import bytes_to_str
 from tests.conftest import DEFAULT_HIVE_SERVER2
 
 
@@ -339,7 +340,8 @@ class TestIcebergConcurrentOperations(ImpalaTestSuite):
 
     table_location = "{0}/test-warehouse/{1}.db/{2}/data".format(
         FILESYSTEM_PREFIX, unique_database, table_name)
-    data_files_on_fs_result = check_output(["hdfs", "dfs", "-ls", 
table_location])
+    data_files_on_fs_result = bytes_to_str(
+        check_output(["hdfs", "dfs", "-ls", table_location]))
     # The first row of the HDFS result is a summary, the following lines 
contain
     # 1 file each.
     data_files_on_fs_rows = data_files_on_fs_result.strip().split('\n')[1:]
diff --git a/tests/util/hdfs_util.py b/tests/util/hdfs_util.py
index 125f57dcd..1dd2ce4e2 100644
--- a/tests/util/hdfs_util.py
+++ b/tests/util/hdfs_util.py
@@ -31,6 +31,7 @@ from xml.etree.ElementTree import parse
 
 from tests.util.filesystem_base import BaseFilesystem
 from tests.util.filesystem_utils import FILESYSTEM_PREFIX
+from tests.util.parse_util import bytes_to_str
 
 
 class HdfsConfig(object):
@@ -220,13 +221,13 @@ class HadoopFsCommandLineClient(BaseFilesystem):
           stdout=subprocess.PIPE, stderr=subprocess.PIPE)
     stdout, stderr = process.communicate()
     status = process.returncode
-    return (status, stdout, stderr)
+    return (status, bytes_to_str(stdout), bytes_to_str(stderr))
 
   def create_file(self, path, file_data, overwrite=True):
     """Creates a temporary file with the specified file_data on the local 
filesystem,
     then puts it into the specified path."""
     if not overwrite and self.exists(path): return False
-    with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
+    with tempfile.NamedTemporaryFile(mode='w+t', delete=False) as tmp_file:
       tmp_file.write(file_data)
     put_cmd_params = ['-put', '-d']
     if overwrite: put_cmd_params.append('-f')
diff --git a/tests/util/parse_util.py b/tests/util/parse_util.py
index 05289ebaa..d96beecc0 100644
--- a/tests/util/parse_util.py
+++ b/tests/util/parse_util.py
@@ -17,6 +17,7 @@
 
 from __future__ import absolute_import, division, print_function
 import re
+import sys
 from datetime import datetime
 
 # IMPALA-6715: Every so often the stress test or the TPC workload directories 
get
@@ -285,3 +286,11 @@ def get_time_summary_stats_counter(counter_name, 
runtime_profile):
           max_value=parse_duration_string_ns(summary_stat['max'])))
 
   return summary_stats
+
+
+def bytes_to_str(bytes):
+    """Utility function to convert bytes to string.
+    This is needed to handle the differences between Python 2 and 3."""
+    if sys.version_info.major < 3:
+        return str(bytes)
+    return bytes.decode('utf-8', errors='replace')
diff --git a/tests/util/shell_util.py b/tests/util/shell_util.py
index 2e63e3ff3..130d805b4 100644
--- a/tests/util/shell_util.py
+++ b/tests/util/shell_util.py
@@ -21,6 +21,8 @@ from __future__ import absolute_import, division, 
print_function
 import logging
 import os
 import shlex
+import sys
+
 from select import select
 from subprocess import PIPE, Popen, STDOUT, call
 from textwrap import dedent
@@ -35,6 +37,7 @@ def dump_server_stacktraces():
   LOG.debug('Dumping stacktraces of running servers')
   call([os.path.join(os.environ['IMPALA_HOME'], "bin/dump-stacktraces.sh")])
 
+
 def exec_process(cmd):
   """Executes a subprocess, waiting for completion. The process exit code, 
stdout and
   stderr are returned as a tuple."""
@@ -46,6 +49,7 @@ def exec_process(cmd):
   rc = p.returncode
   return rc, stdout, stderr
 
+
 def exec_process_async(cmd):
   """Executes a subprocess, returning immediately. The process object is 
returned for
   later retrieval of the exit code etc. """
@@ -55,6 +59,7 @@ def exec_process_async(cmd):
   return Popen(shlex.split(cmd), shell=False, stdout=PIPE, stderr=PIPE,
       universal_newlines=True)
 
+
 def shell(cmd, cmd_prepend="set -euo pipefail\n", stdout=PIPE, stderr=STDOUT,
     timeout_secs=None, **popen_kwargs):
   """Executes a command and returns its output. If the command's return code 
is non-zero
@@ -77,6 +82,7 @@ def shell(cmd, cmd_prepend="set -euo pipefail\n", 
stdout=PIPE, stderr=STDOUT,
     remaining_fds.append(stderr_fileno)
   stdout = list()
   stderr = list()
+
   def _read_available_output():
     while True:
       available_fds, _, _ = select(remaining_fds, [], [], 0)
@@ -88,7 +94,7 @@ def shell(cmd, cmd_prepend="set -euo pipefail\n", 
stdout=PIPE, stderr=STDOUT,
           if not data:
             del remaining_fds[0]
           else:
-            stdout.append(data)
+            stdout.append(data if sys.version_info.major < 3 else 
data.decode())
         elif fd == stderr_fileno:
           if not data:
             del remaining_fds[-1]

Reply via email to