This is an automated email from the ASF dual-hosted git repository. jasonmfehr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 28cff4022dac5224532b9f4afb2a69799d47ba24 Author: Riza Suminto <riza.sumi...@cloudera.com> AuthorDate: Sun Aug 17 10:11:59 2025 -0700 IMPALA-14333: Run impala-py.test using Python3 Running exhaustive tests with env var IMPALA_USE_PYTHON3_TESTS=true reveals some tests that require adjustment. This patch made such adjustment, which mostly revolves around encoding differences and string vs bytes type in Python3. This patch also switch the default to run pytest with Python3 by setting IMPALA_USE_PYTHON3_TESTS=true. The following are the details: Change hash() function in conftest.py to crc32() to produce deterministic hash. Hash randomization is enabled by default since Python 3.3 (see https://docs.python.org/3/reference/datamodel.html#object.__hash__). This cause test sharding (like --shard_tests=1/2) produce inconsistent set of tests per shard. Always restart minicluster during custom cluster tests if --shard_tests argument is set, because test order may change and affect test correctness, depending on whether running on fresh minicluster or not. Moved one test case from delimited-latin-text.test to test_delimited_text.py for easier binary comparison. Add bytes_to_str() as a utility function to decode bytes in Python3. This is often needed when inspecting the return value of subprocess.check_output() as a string. Implement DataTypeMetaclass.__lt__ to substitute DataTypeMetaclass.__cmp__ that is ignored in Python3 (see https://peps.python.org/pep-0207/). Fix WEB_CERT_ERR difference in test_ipv6.py. Fix trivial integer parsing in test_restart_services.py. Fix various encoding issues in test_saml2_sso.py, test_shell_commandline.py, and test_shell_interactive.py. Change timeout in Impala.for_each_impalad() from sys.maxsize to 2^31-1. Switch to binary comparison in test_iceberg.py where needed. Specify text mode when calling tempfile.NamedTemporaryFile(). Simplify create_impala_shell_executable_dimension to skip testing dev and python2 impala-shell when IMPALA_USE_PYTHON3_TESTS=true. The reason is that several UTF-8 related tests in test_shell_commandline.py break in Python3 pytest + Python2 impala-shell combo. This skipping already happen automatically in build OS without system Python2 available like RHEL9 (IMPALA_SYSTEM_PYTHON2 env var is empty). Removed unused vector argument and fixed some trivial flake8 issues. Several test logic require modification due to intermittent issue in Python3 pytest. These include: Add _run_query_with_client() in test_ranger.py to allow reusing a single Impala client for running several queries. Ensure clients are closed when the test is done. Mark several tests in test_ranger.py with SkipIfFS.hive because they run queries through beeline + HiveServer2, but Ozone and S3 build environment does not start HiveServer2 by default. Increase the sleep period from 0.1 to 0.5 seconds per iteration in test_statestore.py and mark TestStatestore to execute serially. This is because TServer appears to shut down more slowly when run concurrently with other tests. Handle the deprecation of Thread.setDaemon() as well. Always force_restart=True each test method in TestLoggingCore, TestShellInteractiveReconnect, and TestQueryRetries to prevent them from reusing minicluster from previous test method. Some of these tests destruct minicluster (kill impalad) and will produce minidump if metrics verifier for next tests fail to detect healthy minicluster state. Testing: Pass exhaustive tests with IMPALA_USE_PYTHON3_TESTS=true. Change-Id: I401a93b6cc7bcd17f41d24e7a310e0c882a550d4 Reviewed-on: http://gerrit.cloudera.org:8080/23319 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- bin/impala-config.sh | 2 +- shell/impala_shell/impala_shell.py | 3 +- .../queries/QueryTest/delimited-latin-text.test | 15 --- tests/authorization/test_ranger.py | 132 +++++++++++++-------- tests/common/custom_cluster_test_suite.py | 10 +- tests/common/impala_test_suite.py | 3 +- tests/common/skip.py | 9 +- tests/comparison/cluster.py | 16 +-- tests/comparison/db_types.py | 28 ++++- tests/conftest.py | 4 +- tests/custom_cluster/test_ipv6.py | 4 +- tests/custom_cluster/test_logging.py | 9 +- tests/custom_cluster/test_query_retries.py | 50 +++++--- tests/custom_cluster/test_restart_services.py | 2 +- tests/custom_cluster/test_s3a_access.py | 2 +- tests/custom_cluster/test_saml2_sso.py | 38 ++++-- tests/custom_cluster/test_scratch_disk.py | 14 ++- .../test_shell_interactive_reconnect.py | 3 +- tests/query_test/test_delimited_text.py | 18 ++- tests/query_test/test_iceberg.py | 31 ++--- tests/query_test/test_queries.py | 13 +- tests/shell/test_shell_commandline.py | 80 +++++++------ tests/shell/test_shell_interactive.py | 76 ++++++------ tests/shell/util.py | 46 +++---- tests/statestore/test_statestore.py | 33 ++++-- tests/stress/test_update_stress.py | 4 +- tests/util/hdfs_util.py | 5 +- tests/util/parse_util.py | 9 ++ tests/util/shell_util.py | 8 +- 29 files changed, 403 insertions(+), 264 deletions(-) diff --git a/bin/impala-config.sh b/bin/impala-config.sh index 8a4ec56e6..2bce6db8d 100755 --- a/bin/impala-config.sh +++ b/bin/impala-config.sh @@ -347,7 +347,7 @@ export IMPALA_KERBERIZE=false unset IMPALA_TOOLCHAIN_KUDU_MAVEN_REPOSITORY unset IMPALA_TOOLCHAIN_KUDU_MAVEN_REPOSITORY_ENABLED -export IMPALA_USE_PYTHON3_TESTS=${IMPALA_USE_PYTHON3_TESTS:-false} +export IMPALA_USE_PYTHON3_TESTS=${IMPALA_USE_PYTHON3_TESTS:-true} # Source the branch and local config override files here to override any # variables above or any variables below that allow overriding via environment diff --git a/shell/impala_shell/impala_shell.py b/shell/impala_shell/impala_shell.py index 2445e8629..14b329e3d 100755 --- a/shell/impala_shell/impala_shell.py +++ b/shell/impala_shell/impala_shell.py @@ -1553,7 +1553,7 @@ class ImpalaShell(cmd.Cmd, object): # undecodable elements. if self.last_query_handle is not None: self.imp_client.close_query(self.last_query_handle) - log_exception_with_timestamp(e, "UnicodeDecodeError", "Please check for" + log_exception_with_timestamp(e, "UnicodeDecodeError", "Please check for " "columns containing binary data to find the possible source of the error") except QueryStateException as e: # an exception occurred while executing the query @@ -1965,6 +1965,7 @@ class ImpalaShell(cmd.Cmd, object): print("Error: OAuth access token not found in json payload") sys.exit(1) + TIPS = [ "Press TAB twice to see a list of available commands.", "After running a query, type SUMMARY to see a summary of where time was spent.", diff --git a/testdata/workloads/functional-query/queries/QueryTest/delimited-latin-text.test b/testdata/workloads/functional-query/queries/QueryTest/delimited-latin-text.test index 004feff57..9df2b2cd5 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/delimited-latin-text.test +++ b/testdata/workloads/functional-query/queries/QueryTest/delimited-latin-text.test @@ -1,20 +1,5 @@ ==== ---- QUERY -# test querying text table "extended" ASCII (latin) delimiters: -# fields terminated by '-2' -- thorn character -# escaped by '-22' -- lowercase e with circumflex -# lines terminated by '\n' -select * from functional.text_thorn_ecirc_newline ----- RESULTS -'one','two',3,4 -'one\xfeone','two',3,4 -'one\xea','two',3,4 -'one\xea\xfeone','two',3,4 -'one\xea\xea','two',3,4 ----- TYPES -STRING,STRING,INT,INT -==== ----- QUERY # create new tables like the ones above to test inserting create table tecn like functional.text_thorn_ecirc_newline; ---- RESULTS diff --git a/tests/authorization/test_ranger.py b/tests/authorization/test_ranger.py index aef153fd1..59896f5f2 100644 --- a/tests/authorization/test_ranger.py +++ b/tests/authorization/test_ranger.py @@ -45,6 +45,7 @@ from tests.util.calculation_util import get_random_id from tests.util.filesystem_utils import WAREHOUSE, WAREHOUSE_PREFIX from tests.util.hdfs_util import NAMENODE from tests.util.iceberg_util import get_snapshots +from tests.util.parse_util import bytes_to_str ADMIN = "admin" OWNER_USER = getuser() @@ -791,7 +792,7 @@ class TestRanger(CustomClusterTestSuite): "{0}/service/public/v2/api/policy?servicename=test_impala&policyname={1}".format( RANGER_HOST, policy_name), auth=RANGER_AUTH, headers=REST_HEADERS) - assert 300 > r.status_code >= 200, r.content + assert 300 > r.status_code >= 200, bytes_to_str(r.content) @staticmethod def _check_privileges(result, expected): @@ -807,7 +808,14 @@ class TestRanger(CustomClusterTestSuite): def _run_query_as_user(self, query, username, expect_success): """Helper to run an input query as a given user.""" - impala_client = self.create_impala_client(user=username) + with self.create_impala_client(user=username) as impala_client: + if expect_success: + return self.execute_query_expect_success( + impala_client, query, query_options={'sync_ddl': 1}) + return self.execute_query_expect_failure(impala_client, query) + + def _run_query_with_client(self, query, impala_client, expect_success): + """Helper to run an input query using a given impala_client.""" if expect_success: return self.execute_query_expect_success( impala_client, query, query_options={'sync_ddl': 1}) @@ -882,6 +890,7 @@ class TestRanger(CustomClusterTestSuite): grantee_role = "grantee_role" resource_owner_role = OWNER_USER admin_client = self.create_impala_client(user=ADMIN) + user_client = self.create_impala_client(user=OWNER_USER) unique_database = unique_name + "_db" table_name = "tbl" column_names = ["a", "b"] @@ -899,15 +908,15 @@ class TestRanger(CustomClusterTestSuite): # able to create a UDF. admin_client.execute("grant all on uri '{0}{1}' to user {2}" .format(os.getenv("FILESYSTEM_PREFIX"), udf_uri, OWNER_USER)) - self._run_query_as_user("create database {0}".format(unique_database), OWNER_USER, - True) - self._run_query_as_user("create table {0}.{1} ({2} int, {3} string)" + self._run_query_with_client("create database {0}".format(unique_database), + user_client, True) + self._run_query_with_client("create table {0}.{1} ({2} int, {3} string)" .format(unique_database, table_name, column_names[0], column_names[1]), - OWNER_USER, True) - self._run_query_as_user("create function {0}.{1} " + user_client, True) + self._run_query_with_client("create function {0}.{1} " "location '{2}{3}' symbol='org.apache.impala.TestUdf'" .format(unique_database, udf_name, os.getenv("FILESYSTEM_PREFIX"), udf_uri), - OWNER_USER, True) + user_client, True) for data in test_data: grantee_type = data[0] @@ -935,6 +944,8 @@ class TestRanger(CustomClusterTestSuite): admin_client.execute("revoke create on server from user {0}".format(OWNER_USER)) admin_client.execute("revoke all on uri '{0}{1}' from user {2}" .format(os.getenv("FILESYSTEM_PREFIX"), udf_uri, OWNER_USER)) + admin_client.close() + user_client.close() def _test_grant_revoke_by_owner_on_database(self, privilege, unique_database, grantee_type, grantee, resource_owner_role): @@ -946,10 +957,11 @@ class TestRanger(CustomClusterTestSuite): set_database_owner_role_stmt = "alter database {0} set owner role {1}" resource_owner_group = OWNER_USER admin_client = self.create_impala_client(user=ADMIN) + user_client = self.create_impala_client(user=OWNER_USER) try: - self._run_query_as_user(grant_database_stmt - .format(privilege, unique_database, grantee_type, grantee), OWNER_USER, + self._run_query_with_client(grant_database_stmt + .format(privilege, unique_database, grantee_type, grantee), user_client, True) result = admin_client.execute(show_grant_database_stmt .format(grantee_type, grantee, unique_database)) @@ -959,8 +971,8 @@ class TestRanger(CustomClusterTestSuite): [grantee_type, grantee, unique_database, "*", "*", "", "", "", "", privilege, "false"]]) - self._run_query_as_user(revoke_database_stmt - .format(privilege, unique_database, grantee_type, grantee), OWNER_USER, + self._run_query_with_client(revoke_database_stmt + .format(privilege, unique_database, grantee_type, grantee), user_client, True) result = admin_client.execute(show_grant_database_stmt .format(grantee_type, grantee, unique_database)) @@ -977,12 +989,12 @@ class TestRanger(CustomClusterTestSuite): .format(unique_database, resource_owner_group)) admin_client.execute("invalidate metadata") - result = self._run_query_as_user(grant_database_stmt - .format(privilege, unique_database, grantee_type, grantee), OWNER_USER, + result = self._run_query_with_client(grant_database_stmt + .format(privilege, unique_database, grantee_type, grantee), user_client, False) assert ERROR_GRANT in str(result) - result = self._run_query_as_user(revoke_database_stmt - .format(privilege, unique_database, grantee_type, grantee), OWNER_USER, + result = self._run_query_with_client(revoke_database_stmt + .format(privilege, unique_database, grantee_type, grantee), user_client, False) assert ERROR_REVOKE in str(result) @@ -992,12 +1004,12 @@ class TestRanger(CustomClusterTestSuite): admin_client.execute(set_database_owner_role_stmt .format(unique_database, resource_owner_role)) - result = self._run_query_as_user(grant_database_stmt - .format(privilege, unique_database, grantee_type, grantee), OWNER_USER, + result = self._run_query_with_client(grant_database_stmt + .format(privilege, unique_database, grantee_type, grantee), user_client, False) assert ERROR_GRANT in str(result) - result = self._run_query_as_user(revoke_database_stmt - .format(privilege, unique_database, grantee_type, grantee), OWNER_USER, + result = self._run_query_with_client(revoke_database_stmt + .format(privilege, unique_database, grantee_type, grantee), user_client, False) assert ERROR_REVOKE in str(result) # Change the database owner back to the user 'OWNER_USER'. @@ -1009,6 +1021,8 @@ class TestRanger(CustomClusterTestSuite): # from interfering with other tests. admin_client.execute(revoke_database_stmt .format(privilege, unique_database, grantee_type, grantee)) + admin_client.close() + user_client.close() def _test_grant_revoke_by_owner_on_table(self, privilege, unique_database, table_name, grantee_type, grantee, resource_owner_role): @@ -1020,23 +1034,24 @@ class TestRanger(CustomClusterTestSuite): show_grant_table_stmt = "show grant {0} {1} on table {2}.{3}" resource_owner_group = OWNER_USER admin_client = self.create_impala_client(user=ADMIN) + user_client = self.create_impala_client(user=OWNER_USER) set_table_owner_user_stmt = "alter table {0}.{1} set owner user {2}" set_table_owner_group_stmt = "alter table {0}.{1} set owner group {2}" set_table_owner_role_stmt = "alter table {0}.{1} set owner role {2}" try: - self._run_query_as_user(grant_table_stmt + self._run_query_with_client(grant_table_stmt .format(privilege, unique_database, table_name, grantee_type, grantee), - OWNER_USER, True) + user_client, True) result = admin_client.execute(show_grant_table_stmt .format(grantee_type, grantee, unique_database, table_name)) TestRanger._check_privileges(result, [ [grantee_type, grantee, unique_database, table_name, "*", "", "", "", "", privilege, "false"]]) - self._run_query_as_user(revoke_table_stmt + self._run_query_with_client(revoke_table_stmt .format(privilege, unique_database, table_name, grantee_type, grantee), - OWNER_USER, True) + user_client, True) result = admin_client.execute(show_grant_table_stmt .format(grantee_type, grantee, unique_database, table_name)) TestRanger._check_privileges(result, []) @@ -1052,13 +1067,13 @@ class TestRanger(CustomClusterTestSuite): .format(unique_database, table_name, resource_owner_group)) admin_client.execute("refresh {0}.{1}".format(unique_database, table_name)) - result = self._run_query_as_user(grant_table_stmt + result = self._run_query_with_client(grant_table_stmt .format(privilege, unique_database, table_name, grantee_type, grantee), - OWNER_USER, False) + user_client, False) assert ERROR_GRANT in str(result) - result = self._run_query_as_user(revoke_table_stmt + result = self._run_query_with_client(revoke_table_stmt .format(privilege, unique_database, table_name, grantee_type, grantee), - OWNER_USER, False) + user_client, False) assert ERROR_REVOKE in str(result) # Set the owner of the table to a role that has the same name as @@ -1067,13 +1082,13 @@ class TestRanger(CustomClusterTestSuite): admin_client.execute(set_table_owner_role_stmt .format(unique_database, table_name, resource_owner_role)) - result = self._run_query_as_user(grant_table_stmt + result = self._run_query_with_client(grant_table_stmt .format(privilege, unique_database, table_name, grantee_type, grantee), - OWNER_USER, False) + user_client, False) assert ERROR_GRANT in str(result) - result = self._run_query_as_user(revoke_table_stmt + result = self._run_query_with_client(revoke_table_stmt .format(privilege, unique_database, table_name, grantee_type, grantee), - OWNER_USER, False) + user_client, False) assert ERROR_REVOKE in str(result) # Change the table owner back to the user 'OWNER_USER'. admin_client.execute(set_table_owner_user_stmt @@ -1084,6 +1099,8 @@ class TestRanger(CustomClusterTestSuite): # from interfering with other tests. admin_client.execute(revoke_table_stmt .format(privilege, unique_database, table_name, grantee_type, grantee)) + admin_client.close() + user_client.close() def _test_grant_revoke_by_owner_on_column(self, privilege, column_names, unique_database, table_name, grantee_type, grantee, resource_owner_role): @@ -1095,14 +1112,15 @@ class TestRanger(CustomClusterTestSuite): show_grant_column_stmt = "show grant {0} {1} on column {2}.{3}.{4}" resource_owner_group = OWNER_USER admin_client = self.create_impala_client(user=ADMIN) + user_client = self.create_impala_client(user=OWNER_USER) set_table_owner_user_stmt = "alter table {0}.{1} set owner user {2}" set_table_owner_group_stmt = "alter table {0}.{1} set owner group {2}" set_table_owner_role_stmt = "alter table {0}.{1} set owner role {2}" try: - self._run_query_as_user(grant_column_stmt + self._run_query_with_client(grant_column_stmt .format(privilege, column_names[0], unique_database, table_name, - grantee_type, grantee), OWNER_USER, True) + grantee_type, grantee), user_client, True) result = admin_client.execute(show_grant_column_stmt .format(grantee_type, grantee, unique_database, table_name, column_names[0])) @@ -1110,9 +1128,9 @@ class TestRanger(CustomClusterTestSuite): [grantee_type, grantee, unique_database, table_name, column_names[0], "", "", "", "", privilege, "false"]]) - self._run_query_as_user(revoke_column_stmt + self._run_query_with_client(revoke_column_stmt .format(privilege, column_names[0], unique_database, table_name, - grantee_type, grantee), OWNER_USER, True) + grantee_type, grantee), user_client, True) result = admin_client.execute(show_grant_column_stmt .format(grantee_type, grantee, unique_database, table_name, column_names[0])) @@ -1125,13 +1143,13 @@ class TestRanger(CustomClusterTestSuite): .format(unique_database, table_name, resource_owner_group)) admin_client.execute("refresh {0}.{1}".format(unique_database, table_name)) - result = self._run_query_as_user(grant_column_stmt + result = self._run_query_with_client(grant_column_stmt .format(privilege, column_names[0], unique_database, table_name, - grantee_type, grantee), OWNER_USER, False) + grantee_type, grantee), user_client, False) assert ERROR_GRANT in str(result) - result = self._run_query_as_user(revoke_column_stmt + result = self._run_query_with_client(revoke_column_stmt .format(privilege, column_names[0], unique_database, table_name, - grantee_type, grantee), OWNER_USER, False) + grantee_type, grantee), user_client, False) assert ERROR_REVOKE in str(result) # Set the owner of the table to a role that has the same name as 'OWNER_USER' and @@ -1140,13 +1158,13 @@ class TestRanger(CustomClusterTestSuite): admin_client.execute(set_table_owner_role_stmt .format(unique_database, table_name, resource_owner_role)) - result = self._run_query_as_user(grant_column_stmt + result = self._run_query_with_client(grant_column_stmt .format(privilege, column_names[0], unique_database, table_name, - grantee_type, grantee), OWNER_USER, False) + grantee_type, grantee), user_client, False) assert ERROR_GRANT in str(result) - result = self._run_query_as_user(revoke_column_stmt + result = self._run_query_with_client(revoke_column_stmt .format(privilege, column_names[0], unique_database, table_name, - grantee_type, grantee), OWNER_USER, False) + grantee_type, grantee), user_client, False) assert ERROR_REVOKE in str(result) # Change the table owner back to the user 'owner_user'. admin_client.execute(set_table_owner_user_stmt @@ -1158,19 +1176,22 @@ class TestRanger(CustomClusterTestSuite): admin_client.execute(revoke_column_stmt .format(privilege, column_names[0], unique_database, table_name, grantee_type, grantee)) + admin_client.close() + user_client.close() def _test_grant_revoke_by_owner_on_udf(self, privilege, unique_database, udf_name, grantee_type, grantee): # Due to IMPALA-11743 and IMPALA-12685, the owner of a UDF could not grant # or revoke the SELECT privilege. - result = self._run_query_as_user("grant {0} on user_defined_fn " - "{1}.{2} to {3} {4}".format(privilege, unique_database, udf_name, - grantee_type, grantee), OWNER_USER, False) - assert ERROR_GRANT in str(result) - result = self._run_query_as_user("revoke {0} on user_defined_fn " - "{1}.{2} from {3} {4}".format(privilege, unique_database, udf_name, - grantee_type, grantee), OWNER_USER, False) - assert ERROR_REVOKE in str(result) + with self.create_impala_client(user=OWNER_USER) as user_client: + result = self._run_query_with_client("grant {0} on user_defined_fn " + "{1}.{2} to {3} {4}".format(privilege, unique_database, udf_name, + grantee_type, grantee), user_client, False) + assert ERROR_GRANT in str(result) + result = self._run_query_with_client("revoke {0} on user_defined_fn " + "{1}.{2} from {3} {4}".format(privilege, unique_database, udf_name, + grantee_type, grantee), user_client, False) + assert ERROR_REVOKE in str(result) def _test_allow_catalog_cache_op_from_masked_users(self, unique_name): """Verify that catalog cache operations are allowed for masked users @@ -1820,6 +1841,7 @@ class TestRangerIndependent(TestRanger): def test_grant_multiple_columns_consolidate_grant_revoke_requests(self): self._test_grant_multiple_columns(1) + @SkipIfFS.hive @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args=LEGACY_CATALOG_IMPALAD_ARGS, @@ -2199,10 +2221,12 @@ class TestRangerLegacyCatalog(TestRanger): def test_legacy_catalog_ownership(self): self._test_ownership() + @SkipIfFS.hive @pytest.mark.execute_serially def test_grant_revoke_by_owner_legacy_catalog(self, unique_name): self._test_grant_revoke_by_owner(unique_name) + @SkipIfFS.hive @pytest.mark.execute_serially def test_select_view_created_by_non_superuser_with_catalog_v1(self, unique_name): self._test_select_view_created_by_non_superuser(unique_name) @@ -2231,10 +2255,12 @@ class TestRangerLocalCatalog(TestRanger): pytest.xfail("getTableIfCached() faulty behavior, known issue") self._test_ownership() + @SkipIfFS.hive @pytest.mark.execute_serially def test_grant_revoke_by_owner_local_catalog(self, unique_name): self._test_grant_revoke_by_owner(unique_name) + @SkipIfFS.hive @pytest.mark.execute_serially def test_select_view_created_by_non_superuser_with_local_catalog(self, unique_name): self._test_select_view_created_by_non_superuser(unique_name) @@ -2453,6 +2479,8 @@ class TestRangerLocalCatalog(TestRanger): else: assert "Error revoking a privilege in Ranger. Ranger error message: " \ "HTTP 403 Error: Grantee group invalid_group doesn't exist" in str(result) + invalid_impala_client.close() + valid_impala_client.close() @pytest.mark.execute_serially def test_show_functions(self, unique_name): diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py index ef57e8da7..04b34cfe0 100644 --- a/tests/common/custom_cluster_test_suite.py +++ b/tests/common/custom_cluster_test_suite.py @@ -204,7 +204,9 @@ class CustomClusterTestSuite(ImpalaTestSuite): args[LOG_SYMLINKS] = True if workload_mgmt: args[WORKLOAD_MGMT] = True - if force_restart: + if force_restart or pytest.config.option.shard_tests: + # When sharding tests, always restart the cluster to avoid issues with tests + # that depend on a specific test order within a shard. args[FORCE_RESTART] = True def merge_args(args_first, args_last): @@ -349,7 +351,7 @@ class CustomClusterTestSuite(ImpalaTestSuite): kwargs[IMPALAD_TIMEOUT_S] = args[IMPALAD_TIMEOUT_S] if FORCE_RESTART in args: kwargs[FORCE_RESTART] = args[FORCE_RESTART] - if args[FORCE_RESTART] is True: + if args[FORCE_RESTART] is True and not pytest.config.option.shard_tests: LOG.warning("Test uses force_restart=True to avoid restarting the cluster. " "Test reorganization/assertion rewrite is needed") else: @@ -398,6 +400,7 @@ class CustomClusterTestSuite(ImpalaTestSuite): @classmethod def cluster_teardown(cls, name, args): if args.get(WORKLOAD_MGMT, False): + cls.close_impala_clients() cls.cluster.graceful_shutdown_impalads() cls.clear_tmp_dirs() @@ -618,6 +621,9 @@ class CustomClusterTestSuite(ImpalaTestSuite): except Exception as e: LOG.info("Failed to reuse running cluster: %s" % e) pass + finally: + cls.cluster = ImpalaCluster.get_e2e_test_cluster() + cls.impalad_test_service = cls.create_impala_service() LOG.info("Starting cluster with command: %s" % cmd_str) try: diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py index 9e7414716..c2d7744dd 100644 --- a/tests/common/impala_test_suite.py +++ b/tests/common/impala_test_suite.py @@ -35,6 +35,7 @@ import socket import subprocess import time import string + from functools import wraps from getpass import getuser from impala.hiveserver2 import HiveServer2Cursor @@ -455,7 +456,7 @@ class ImpalaTestSuite(BaseTestSuite): @classmethod def close_impala_clients(cls): """Closes Impala clients created by create_impala_clients().""" - # cls.client should be equal to one of belove, unless test method implicitly override. + # cls.client should be equal to one of below, unless test method implicitly override. # Closing twice would lead to error in some clients (impyla+SSL). if cls.client not in (cls.beeswax_client, cls.hs2_client, cls.hs2_http_client): cls.client.close() diff --git a/tests/common/skip.py b/tests/common/skip.py index 6d3a8b182..69637dcf4 100644 --- a/tests/common/skip.py +++ b/tests/common/skip.py @@ -81,7 +81,8 @@ class SkipIfFS: incorrent_reported_ec = pytest.mark.skipif(IS_OZONE and IS_EC, reason="HDDS-8543") # These need test infra work to re-enable. - hive = pytest.mark.skipif(not IS_HDFS, reason="Hive doesn't work") + hive = pytest.mark.skipif( + not IS_HDFS, reason="HiveServer2 doesn't work or not started") hbase = pytest.mark.skipif(not IS_HDFS, reason="HBase not started") qualified_path = pytest.mark.skipif(not IS_HDFS, reason="Tests rely on HDFS qualified paths, IMPALA-1872") @@ -135,6 +136,7 @@ class SkipIf: not_tuple_cache = pytest.mark.skipif(not IS_TUPLE_CACHE, reason="Tuple Cache needed") + class SkipIfLocal: # These are skipped due to product limitations. hdfs_blocks = pytest.mark.skipif(IS_LOCAL, @@ -152,6 +154,7 @@ class SkipIfLocal: root_path = pytest.mark.skipif(IS_LOCAL, reason="Tests rely on the root directory") + class SkipIfNotHdfsMinicluster: # These are skipped when not running against a local HDFS mini-cluster. plans = pytest.mark.skipif( @@ -165,6 +168,7 @@ class SkipIfNotHdfsMinicluster: reason="Test is tuned for scheduling decisions made on a 3-node HDFS minicluster " "with no EC") + class SkipIfBuildType: dev_build = pytest.mark.skipif(IMPALA_TEST_CLUSTER_PROPERTIES.is_dev(), reason="Test takes too much time on debug build.") @@ -173,6 +177,7 @@ class SkipIfBuildType: remote = pytest.mark.skipif(IMPALA_TEST_CLUSTER_PROPERTIES.is_remote_cluster(), reason="Test depends on running against a local Impala cluster") + class SkipIfEC: contain_full_explain = pytest.mark.skipif(IS_EC, reason="Contain full explain output " "for hdfs tables.") @@ -225,6 +230,7 @@ class SkipIfHive2: ranger_auth = pytest.mark.skipif(HIVE_MAJOR_VERSION <= 2, reason="Hive 2 doesn't support Ranger authorization.") + class SkipIfCatalogV2: """Expose decorators as methods so that is_catalog_v2_cluster() can be evaluated lazily when needed, instead of whenever this module is imported.""" @@ -261,6 +267,7 @@ class SkipIfCatalogV2: IMPALA_TEST_CLUSTER_PROPERTIES.is_catalog_v2_cluster(), reason="Table isn't invalidated with Local catalog and enabled hms_event_polling.") + class SkipIfApacheHive(): feature_not_supported = pytest.mark.skipif(IS_APACHE_HIVE, reason="Apache Hive does not support this feature") diff --git a/tests/comparison/cluster.py b/tests/comparison/cluster.py index 8a7c623cb..15949c167 100644 --- a/tests/comparison/cluster.py +++ b/tests/comparison/cluster.py @@ -30,7 +30,6 @@ import requests import shutil import subprocess from abc import ABCMeta, abstractproperty -from cm_api.api_client import ApiResource as CmApiResource from collections import defaultdict from collections import OrderedDict from contextlib import contextmanager @@ -38,7 +37,6 @@ from getpass import getuser from io import BytesIO from multiprocessing.pool import ThreadPool from random import choice -from sys import maxsize from tempfile import mkdtemp from threading import Lock from time import mktime, strptime @@ -50,6 +48,13 @@ try: except ImportError: from urlparse import urlparse +try: + from cm_api.api_client import ApiResource as CmApiResource +except ImportError: + # If the cm_api module is not available, we will not be able to use Cloudera Manager. + # This is fine for local testing. + pass + from tests.comparison.db_connection import HiveConnection, ImpalaConnection from tests.common.environ import HIVE_MAJOR_VERSION from tests.common.errors import Timeout @@ -179,12 +184,9 @@ class Cluster(with_metaclass(ABCMeta, object)): """ Print the cluster impalad version info to the console sorted by hostname. """ - def _sorter(i1, i2): - return cmp(i1.host_name, i2.host_name) - version_info = self.impala.get_version_info() print("Cluster Impalad Version Info:") - for impalad in sorted(version_info.keys(), cmp=_sorter): + for impalad in sorted(version_info.keys(), key=lambda x: x.host_name): print("{0}: {1}".format(impalad.host_name, version_info[impalad])) @@ -635,7 +637,7 @@ class Impala(Service): impalads = self.impalads promise = self._thread_pool.map_async(func, impalads) # Python doesn't handle ctrl-c well unless a timeout is provided. - results = promise.get(maxsize) + results = promise.get(timeout=(2 ** 31 - 1)) if as_dict: results = dict(zip(impalads, results)) return results diff --git a/tests/comparison/db_types.py b/tests/comparison/db_types.py index 27e2a6025..0039b445c 100644 --- a/tests/comparison/db_types.py +++ b/tests/comparison/db_types.py @@ -26,6 +26,7 @@ from tests.comparison.common import ValExpr, ValExprList module_contents = dict() + class DataTypeMetaclass(type): '''Provides sorting of classes used to determine upcasting.''' @@ -39,9 +40,23 @@ class DataTypeMetaclass(type): def __cmp__(cls, other): if not isinstance(other, DataTypeMetaclass): return -1 - return cmp( - getattr(cls, 'CMP_VALUE', cls.__name__), - getattr(other, 'CMP_VALUE', other.__name__)) + val_this = getattr(cls, 'CMP_VALUE', cls.__name__) + val_other = getattr(other, 'CMP_VALUE', other.__name__) + if (val_this < val_other): + return -1 + elif (val_this > val_other): + return 1 + else: + return 0 + + def __lt__(cls, other): + # This __lt__ method replace __cmp__ that is removed in Python3. + # See https://peps.python.org/pep-0207/. + # It is mainly to serve max() inside update_return_type_and_append() of funcs.py. + if not isinstance(other, DataTypeMetaclass): + return False + return (getattr(cls, 'CMP_VALUE', cls.__name__) + < getattr(other, 'CMP_VALUE', other.__name__)) class DataType(with_metaclass(DataTypeMetaclass, ValExpr)): @@ -92,7 +107,6 @@ class DataType(with_metaclass(DataTypeMetaclass, ValExpr)): return type(self) - class Boolean(DataType): pass @@ -213,6 +227,10 @@ JOINABLE_TYPES = (Char, Decimal, Int, Timestamp) TYPES = tuple(set(type_.type for type_ in EXACT_TYPES)) __DECIMAL_TYPE_CACHE = dict() +__CHAR_TYPE_CACHE = dict() +__VARCHAR_TYPE_CACHE = dict() + + def get_decimal_class(total_digits, fractional_digits): cache_key = (total_digits, fractional_digits) if cache_key not in __DECIMAL_TYPE_CACHE: @@ -223,7 +241,6 @@ def get_decimal_class(total_digits, fractional_digits): return __DECIMAL_TYPE_CACHE[cache_key] -__CHAR_TYPE_CACHE = dict() def get_char_class(length): if length not in __CHAR_TYPE_CACHE: __CHAR_TYPE_CACHE[length] = type( @@ -233,7 +250,6 @@ def get_char_class(length): return __CHAR_TYPE_CACHE[length] -__VARCHAR_TYPE_CACHE = dict() def get_varchar_class(length): if length not in __VARCHAR_TYPE_CACHE: __VARCHAR_TYPE_CACHE[length] = type( diff --git a/tests/conftest.py b/tests/conftest.py index 0dc702337..4d395b658 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -696,7 +696,7 @@ def validate_python_version(): def pytest_collection_modifyitems(items, config, session): """Hook to handle --shard_tests command line option. - If set, this "deselects" a subset of tests, by hashing + If set, this "deselects" a subset of tests, by hashing (using crc32()) their id into buckets. """ if not config.option.shard_tests: @@ -710,7 +710,7 @@ def pytest_collection_modifyitems(items, config, session): items_selected, items_deselected = [], [] for i in items: - if hash(i.nodeid) % num_shards == this_shard: + if crc32(i.nodeid.encode('utf-8')) % num_shards == this_shard: items_selected.append(i) else: items_deselected.append(i) diff --git a/tests/custom_cluster/test_ipv6.py b/tests/custom_cluster/test_ipv6.py index f4a0a9de5..415695573 100644 --- a/tests/custom_cluster/test_ipv6.py +++ b/tests/custom_cluster/test_ipv6.py @@ -22,6 +22,7 @@ import logging import os import pytest import requests +import sys from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.network import SKIP_SSL_MSG @@ -66,7 +67,8 @@ WEBUI_PORTS = [25000, 25010, 25020] # Error text can depend on both protocol and python version. CONN_ERR = ["Could not connect", "Connection refused"] CERT_ERR = ["doesn't match", "certificate verify failed"] -WEB_CERT_ERR = "CertificateError" +WEB_CERT_ERR = ("CertificateError" if sys.version_info.major < 3 + else "SSLCertVerificationError") class TestIPv6Base(CustomClusterTestSuite): diff --git a/tests/custom_cluster/test_logging.py b/tests/custom_cluster/test_logging.py index ddae8b165..bb10651b9 100644 --- a/tests/custom_cluster/test_logging.py +++ b/tests/custom_cluster/test_logging.py @@ -46,21 +46,24 @@ class TestLoggingCore(CustomClusterTestSuite): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args(cluster_size=1, impalad_args="--max_error_logs_per_instance=2", - disable_log_buffering=True) + disable_log_buffering=True, + force_restart=True) def test_max_errors(self): self._test_max_errors(2, 4, True) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args(cluster_size=1, impalad_args="--max_error_logs_per_instance=3", - disable_log_buffering=True) + disable_log_buffering=True, + force_restart=True) def test_max_errors_0(self): self._test_max_errors(3, 0, True) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args(cluster_size=1, impalad_args="--max_error_logs_per_instance=2", - disable_log_buffering=True) + disable_log_buffering=True, + force_restart=True) def test_max_errors_no_downgrade(self): self._test_max_errors(2, -1, False) diff --git a/tests/custom_cluster/test_query_retries.py b/tests/custom_cluster/test_query_retries.py index a1789ed52..9880affd1 100644 --- a/tests/custom_cluster/test_query_retries.py +++ b/tests/custom_cluster/test_query_retries.py @@ -93,6 +93,7 @@ class TestQueryRetries(CustomClusterTestSuite): _count_query_result = "55" @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args(force_restart=True) def test_retries_from_cancellation_pool(self): """Tests that queries are retried instead of cancelled if one of the nodes leaves the cluster. The retries are triggered by the cancellation pool in the ImpalaServer. The @@ -138,7 +139,8 @@ class TestQueryRetries(CustomClusterTestSuite): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( - statestored_args="-statestore_heartbeat_frequency_ms=1000") + statestored_args="-statestore_heartbeat_frequency_ms=1000", + force_restart=True) def test_kill_impalad_expect_retry(self): """Launch a query, wait for it to start running, kill a random impalad and then validate that the query has successfully been retried. Increase the statestore @@ -251,7 +253,8 @@ class TestQueryRetries(CustomClusterTestSuite): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( - statestored_args="-statestore_heartbeat_frequency_ms=60000") + statestored_args="-statestore_heartbeat_frequency_ms=60000", + force_restart=True) def test_retry_exec_rpc_failure(self): """Test ExecFInstance RPC failures. Set a really high statestort heartbeat frequency so that killed impalads are not removed from the cluster membership. This will cause @@ -306,8 +309,9 @@ class TestQueryRetries(CustomClusterTestSuite): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args="--debug_actions=" + _get_rpc_fail_action(FAILED_KRPC_PORT), - statestored_args="--statestore_heartbeat_frequency_ms=1000 \ - --statestore_max_missed_heartbeats=2") + statestored_args=("--statestore_heartbeat_frequency_ms=1000 " + "--statestore_max_missed_heartbeats=2"), + force_restart=True) def test_retry_exec_rpc_failure_before_admin_delay(self): """Test retried query triggered by RPC failures by simulating RPC errors at the port of the 2nd node in the cluster. Simulate admission delay for query with debug_action @@ -375,7 +379,7 @@ class TestQueryRetries(CustomClusterTestSuite): impalad_args="--debug_actions=" + _get_rpc_fail_action(FAILED_KRPC_PORT), statestored_args="--statestore_heartbeat_frequency_ms=1000 \ --statestore_max_missed_heartbeats=2", - cluster_size=2, num_exclusive_coordinators=1) + cluster_size=2, num_exclusive_coordinators=1, force_restart=True) def test_retry_query_failure_all_executors_blacklisted(self): """Test retried query triggered by RPC failures by simulating RPC errors at the port of the 2nd node, which is the only executor in the cluster. Simulate admission delay @@ -438,7 +442,7 @@ class TestQueryRetries(CustomClusterTestSuite): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( - statestored_args="-statestore_heartbeat_frequency_ms=1000") + statestored_args="-statestore_heartbeat_frequency_ms=1000", force_restart=True) def test_multiple_retries(self): """Test that a query can only be retried once, and that if the retry attempt fails, it fails correctly and with the right error message. Multiple retry attempts are @@ -498,6 +502,7 @@ class TestQueryRetries(CustomClusterTestSuite): self.__validate_memz() @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args(force_restart=True) def test_retry_fetched_rows(self): """Test that query retries are not triggered if some rows have already been fetched. Run a query, fetch some rows from it, kill one of the impalads that is @@ -612,7 +617,7 @@ class TestQueryRetries(CustomClusterTestSuite): self.client.close_query(handle) @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args(disable_log_buffering=True) + @CustomClusterTestSuite.with_args(disable_log_buffering=True, force_restart=True) def test_query_retry_reaches_spool_limit(self): """Test retryable queries with results spooling enabled and spool_all_results_for_retries=true that reach spooling mem limit will return rows and @@ -661,6 +666,7 @@ class TestQueryRetries(CustomClusterTestSuite): "fetched some rows" % self.client.handle_id(handle) in str(e) @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args(force_restart=True) def test_original_query_cancel(self): """Test canceling a retryable query with spool_all_results_for_retries=true. Make sure Coordinator::Wait() won't block in cancellation.""" @@ -682,6 +688,7 @@ class TestQueryRetries(CustomClusterTestSuite): assert "Cancelled" in str(e) @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args(force_restart=True) def test_retry_finished_query(self): """Test that queries in FINISHED state can still be retried before the client fetch any rows. Sets batch_size to 1 so results will be available as soon as possible. @@ -708,7 +715,7 @@ class TestQueryRetries(CustomClusterTestSuite): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( - statestored_args="-statestore_heartbeat_frequency_ms=60000") + statestored_args="-statestore_heartbeat_frequency_ms=60000", force_restart=True) def test_retry_query_cancel(self): """Trigger a query retry, and then cancel the retried query. Validate that the cancelled query fails with the correct error message. Set a really high statestore @@ -750,7 +757,8 @@ class TestQueryRetries(CustomClusterTestSuite): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args="--debug_actions=RETRY_DELAY_CHECKING_ORIGINAL_DRIVER:SLEEP@1000", - statestored_args="--statestore_heartbeat_frequency_ms=60000") + statestored_args="--statestore_heartbeat_frequency_ms=60000", + force_restart=True) def test_retrying_query_cancel(self): """Trigger a query retry, and then cancel and close the retried query in RETRYING state. Validate that it doesn't crash the impalad. Set a really high statestore @@ -780,7 +788,8 @@ class TestQueryRetries(CustomClusterTestSuite): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( - statestored_args="--statestore_heartbeat_frequency_ms=60000") + statestored_args="--statestore_heartbeat_frequency_ms=60000", + force_restart=True) def test_retrying_query_before_inflight(self): """Trigger a query retry, and delay setting the original query inflight as that may happen after the query is retried. Validate that the query succeeds. Set a really @@ -820,7 +829,8 @@ class TestQueryRetries(CustomClusterTestSuite): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args="--debug_actions=RETRY_DELAY_GET_QUERY_DRIVER:SLEEP@2000", - statestored_args="--statestore_heartbeat_frequency_ms=60000") + statestored_args="--statestore_heartbeat_frequency_ms=60000", + force_restart=True) def test_retry_query_close_before_getting_query_driver(self): """Trigger a query retry, and then close the retried query before getting the query driver. Validate that it doesn't crash the impalad. @@ -847,7 +857,8 @@ class TestQueryRetries(CustomClusterTestSuite): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args="--debug_actions=QUERY_RETRY_SET_RESULT_CACHE:FAIL", - statestored_args="--statestore_heartbeat_frequency_ms=60000") + statestored_args="--statestore_heartbeat_frequency_ms=60000", + force_restart=True) def test_retry_query_result_cacheing_failed(self): """Test setting up results cacheing failed.""" @@ -874,7 +885,8 @@ class TestQueryRetries(CustomClusterTestSuite): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args="--debug_actions=QUERY_RETRY_SET_QUERY_IN_FLIGHT:FAIL", - statestored_args="--statestore_heartbeat_frequency_ms=60000") + statestored_args="--statestore_heartbeat_frequency_ms=60000", + force_restart=True) def test_retry_query_set_query_in_flight_failed(self): """Test setting query in flight failed.""" @@ -898,7 +910,8 @@ class TestQueryRetries(CustomClusterTestSuite): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( - statestored_args="-statestore_heartbeat_frequency_ms=60000") + statestored_args="-statestore_heartbeat_frequency_ms=60000", + force_restart=True) def test_retry_query_timeout(self): """Trigger a query retry, and then leave the query handle open until the 'query_timeout_s' causes the handle to be closed. Assert that the runtime profile of @@ -941,8 +954,10 @@ class TestQueryRetries(CustomClusterTestSuite): assert impalad_service.get_metric_value('impala-server.num-queries-expired') == 1 @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args(impalad_args="--idle_session_timeout=1", - statestored_args="--statestore_heartbeat_frequency_ms=60000") + @CustomClusterTestSuite.with_args( + impalad_args="--idle_session_timeout=1", + statestored_args="--statestore_heartbeat_frequency_ms=60000", + force_restart=True) def test_retry_query_session_timeout(self): """Similar to 'test_retry_query_timeout' except with an idle session timeout.""" self.close_impala_clients() @@ -977,7 +992,8 @@ class TestQueryRetries(CustomClusterTestSuite): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( - statestored_args="-statestore_heartbeat_frequency_ms=60000") + statestored_args="-statestore_heartbeat_frequency_ms=60000", + force_restart=True) def test_retry_query_hs2(self): """Test query retries with the HS2 protocol. Enable the results set cache as well and test that query retries work with the results cache.""" diff --git a/tests/custom_cluster/test_restart_services.py b/tests/custom_cluster/test_restart_services.py index 08995cd94..3548bb200 100644 --- a/tests/custom_cluster/test_restart_services.py +++ b/tests/custom_cluster/test_restart_services.py @@ -1010,7 +1010,7 @@ class TestGracefulShutdown(CustomClusterTestSuite, HS2TestSuite): assert cancel == "{0}s000ms".format(get_remain_shutdown_query_cancel( self.COORD_SHUTDOWN_FAST_DEADLINE_S, self.COORD_SHUTDOWN_FAST_DEADLINE_S)) assert registered == "0" - assert running > 0 + assert int(running) > 0 self.cluster.impalads[1].wait_for_exit() # The slow query should be cancelled. self.__check_deadline_expired(SLOW_QUERY, slow_query_handle, True) diff --git a/tests/custom_cluster/test_s3a_access.py b/tests/custom_cluster/test_s3a_access.py index 8364ef856..845e617ad 100644 --- a/tests/custom_cluster/test_s3a_access.py +++ b/tests/custom_cluster/test_s3a_access.py @@ -25,7 +25,7 @@ from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.skip import SkipIf from tests.util.filesystem_utils import WAREHOUSE -tmp = tempfile.NamedTemporaryFile(delete=False) +tmp = tempfile.NamedTemporaryFile(mode='w+t', delete=False) BAD_KEY_FILE = tmp.name diff --git a/tests/custom_cluster/test_saml2_sso.py b/tests/custom_cluster/test_saml2_sso.py index 3df48b183..8b0dc7fad 100644 --- a/tests/custom_cluster/test_saml2_sso.py +++ b/tests/custom_cluster/test_saml2_sso.py @@ -21,6 +21,7 @@ import base64 import datetime import os import pytest +import sys import uuid import xml.etree.ElementTree as ET import zlib @@ -40,7 +41,7 @@ from tests.shell.util import run_impala_shell_cmd class NoRedirection(HTTPErrorProcessor): """Allows inspecting http redirection responses. """ - def http_response(self, request, response): + def http_response(self, request, response): # noqa: U100 return response @@ -49,6 +50,13 @@ def format_time(time): return time.strftime("%Y-%m-%dT%H:%M:%SZ") +def encode_if_needed(value): + """ Encodes the value to bytes if needed, depending on the python version. """ + if sys.version_info.major < 3: + return value.encode('utf-8') if isinstance(value, str) else value + return value if isinstance(value, bytes) else value.encode('utf-8') + + class TestClientSaml(CustomClusterTestSuite): """ Tests for a client using SAML2 browser profile. @@ -100,9 +108,9 @@ class TestClientSaml(CustomClusterTestSuite): "--saml2_ee_test_mode=true" % (CERT_DIR, CERT_DIR, SP_CALLBACK_URL)) - SSO_ARGS_WITH_GROUP_FILTER = (SSO_ARGS + " " + - "--saml2_group_filter=group1,group2 " - "--saml2_group_attribute_name=eduPersonAffiliation") + SSO_ARGS_WITH_GROUP_FILTER = ( + "{} --saml2_group_filter=group1,group2 " + "--saml2_group_attribute_name=eduPersonAffiliation").format(SSO_ARGS) @CustomClusterTestSuite.with_args(impalad_args=SSO_ARGS, cluster_size=1) def test_saml2_browser_profile_no_group_filter(self, vector): @@ -131,7 +139,7 @@ class TestClientSaml(CustomClusterTestSuite): @CustomClusterTestSuite.with_args( impalad_args=SSO_ARGS_WITH_GROUP_FILTER, cluster_size=1) - def test_saml2_browser_profile_with_group_filter(self, vector): + def test_saml2_browser_profile_with_group_filter(self): # test the SAML worflow with different attributes self._test_saml2_browser_workflow("", False) @@ -157,7 +165,8 @@ class TestClientSaml(CustomClusterTestSuite): """ Initial POST request to hs2-http port, response should be redirected to IDP and contain the authnrequest. """ opener = build_opener(NoRedirection) - req = Request("http://localhost:%s" % TestClientSaml.HOST_PORT, " ") + payload = encode_if_needed(" ") + req = Request("http://localhost:%s" % TestClientSaml.HOST_PORT, payload) req.add_header('X-Hive-Token-Response-Port', TestClientSaml.CLIENT_PORT) response = opener.open(req) relay_state, client_id, saml_req_xml = \ @@ -171,7 +180,10 @@ class TestClientSaml(CustomClusterTestSuite): assert client_id is not None new_url = response.info()["location"] assert new_url.startswith(TestClientSaml.IDP_URL) - query = parse_qs(urlparse(new_url).query.encode('ASCII')) + query_part = urlparse(new_url).query + query = parse_qs(query_part.encode('ASCII') if sys.version_info.major < 3 + else query_part) + assert "RelayState" in query, query relay_state = query["RelayState"][0] assert relay_state is not None saml_req = query["SAMLRequest"][0] @@ -187,7 +199,8 @@ class TestClientSaml(CustomClusterTestSuite): def _request_resource_with_bearer(self, client_id, bearer_token): """ Send POST request to hs2-http port again, this time with bearer tokan. The response should contain a security cookie if the validation succeeded """ - req = Request("http://localhost:%s" % TestClientSaml.HOST_PORT, " ") + payload = encode_if_needed(" ") + req = Request("http://localhost:%s" % TestClientSaml.HOST_PORT, payload) req.add_header('X-Hive-Client-Identifier', client_id) req.add_header('Authorization', "Bearer " + bearer_token) opener = build_opener(NoRedirection) @@ -205,10 +218,11 @@ class TestClientSaml(CustomClusterTestSuite): Impala should answer with a form that submits to CLIENT_PORT and contains the bearer token as a hidden state. """ authn_resp = self._generate_authn_response(request_id, attributes_xml) - encoded_authn_resp = base64.urlsafe_b64encode(authn_resp) - body = "SAMLResponse=%s&RelayState=%s" % (encoded_authn_resp, relay_state) + encoded_authn_resp = base64.urlsafe_b64encode(authn_resp.encode('utf-8')) + body = (b"SAMLResponse=" + encoded_authn_resp + b"&RelayState=" + + encode_if_needed(relay_state)) opener = build_opener(NoRedirection) - req = Request(TestClientSaml.SP_CALLBACK_URL, body) + req = Request(TestClientSaml.SP_CALLBACK_URL, encode_if_needed(body)) response = opener.open(req) bearer_token = self._parse_xhtml_form(response, expect_success) return bearer_token @@ -244,7 +258,7 @@ class TestClientSaml(CustomClusterTestSuite): message = input.attrib["value"] if expect_success: - assert token.startswith("u=user1") + assert token.startswith("u=user1"), str(content) else: assert message == TestClientSaml.ASSERTATION_ERROR_MESSAGE return token diff --git a/tests/custom_cluster/test_scratch_disk.py b/tests/custom_cluster/test_scratch_disk.py index 3a57f5a1e..38d454227 100644 --- a/tests/custom_cluster/test_scratch_disk.py +++ b/tests/custom_cluster/test_scratch_disk.py @@ -30,6 +30,7 @@ import time from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.skip import SkipIf from tests.util.hdfs_util import NAMENODE +from tests.util.parse_util import bytes_to_str class TestScratchDir(CustomClusterTestSuite): @@ -537,16 +538,18 @@ class TestScratchDir(CustomClusterTestSuite): hostname = socket.gethostname() # Verify that there are leftover files in the remote scratch dirs after being killed. full_dfs_tmp_path = "{}/impala-scratch".format(self.dfs_tmp_path()) - files_result = subprocess.check_output(["hdfs", "dfs", "-ls", full_dfs_tmp_path]) + files_result = bytes_to_str( + subprocess.check_output(["hdfs", "dfs", "-ls", full_dfs_tmp_path])) assert "Found 1 items" in files_result assert hostname in files_result full_dfs_tmp_path_with_hostname = "{}/{}".format(full_dfs_tmp_path, hostname) - files_result = subprocess.check_output(["hdfs", "dfs", "-ls", - full_dfs_tmp_path_with_hostname]) + files_result = bytes_to_str( + subprocess.check_output(["hdfs", "dfs", "-ls", full_dfs_tmp_path_with_hostname])) assert "Found 1 items" in files_result impalad.start() # Verify that the leftover files being removed after the impala daemon restarted. - files_result = subprocess.check_output(["hdfs", "dfs", "-ls", full_dfs_tmp_path]) + files_result = bytes_to_str( + subprocess.check_output(["hdfs", "dfs", "-ls", full_dfs_tmp_path])) assert files_result == "" @pytest.mark.execute_serially @@ -578,6 +581,7 @@ class TestScratchDir(CustomClusterTestSuite): client.close() # Verify that no host-level dir in the remote scratch dirs after shutdown. full_dfs_tmp_path = "{}/impala-scratch".format(self.dfs_tmp_path()) - files_result = subprocess.check_output(["hdfs", "dfs", "-ls", full_dfs_tmp_path]) + files_result = bytes_to_str( + subprocess.check_output(["hdfs", "dfs", "-ls", full_dfs_tmp_path])) assert files_result == "" impalad.start() diff --git a/tests/custom_cluster/test_shell_interactive_reconnect.py b/tests/custom_cluster/test_shell_interactive_reconnect.py index 7bf391b40..e54fe2ca0 100644 --- a/tests/custom_cluster/test_shell_interactive_reconnect.py +++ b/tests/custom_cluster/test_shell_interactive_reconnect.py @@ -18,7 +18,6 @@ from __future__ import absolute_import, division, print_function import socket -import pexpect import pytest # Follow tests/shell/test_shell_interactive.py naming. @@ -32,6 +31,7 @@ from tests.verifiers.metric_verifier import MetricVerifier NUM_QUERIES = 'impala-server.num-queries' + class TestShellInteractiveReconnect(CustomClusterTestSuite): """ Check if interactive shell is using the current DB after reconnecting """ @pytest.mark.execute_serially @@ -75,6 +75,7 @@ class TestShellInteractiveReconnect(CustomClusterTestSuite): assert "alltypesaggmultifilesnopart" in result.stdout, result.stdout @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args(force_restart=True) def test_auto_reconnect_after_impalad_died(self): """Test reconnect after restarting the remote impalad without using connect;""" # Use pexpect instead of ImpalaShell() since after using get_result() in ImpalaShell() diff --git a/tests/query_test/test_delimited_text.py b/tests/query_test/test_delimited_text.py index 70e2ae0c6..cc03f382a 100644 --- a/tests/query_test/test_delimited_text.py +++ b/tests/query_test/test_delimited_text.py @@ -26,6 +26,7 @@ from tests.common.test_dimensions import ( create_single_exec_option_dimension, create_uncompressed_text_dimension) + class TestDelimitedText(ImpalaTestSuite): """ Tests delimited text files with different tuple delimiters, field delimiters @@ -43,7 +44,7 @@ class TestDelimitedText(ImpalaTestSuite): def test_delimited_text(self, vector, unique_database): self.run_test_case('QueryTest/delimited-text', vector, unique_database) - def test_delimited_text_newlines(self, vector, unique_database): + def test_delimited_text_newlines(self, unique_database): """ Test text with newlines in strings - IMPALA-1943. Execute queries from Python to avoid issues with newline handling in test file format. """ self.execute_query_expect_success(self.client, """ @@ -62,7 +63,7 @@ class TestDelimitedText(ImpalaTestSuite): result = self.execute_query("select * from %s.nl_queries" % unique_database) assert len(result.data) == 2 assert result.data[0].split("\t") == ["the\n", "\nquick\nbrown", "fox\n"] - assert result.data[1].split("\t") == ["\njumped","over the lazy\n","\ndog"] + assert result.data[1].split("\t") == ["\njumped", "over the lazy\n", "\ndog"] # The row count may be computed without parsing each row, so could be inconsistent. result = self.execute_query("select count(*) from %s.nl_queries" % unique_database) assert len(result.data) == 1 @@ -72,8 +73,15 @@ class TestDelimitedText(ImpalaTestSuite): """Verifies Impala is able to properly handle delimited text that contains extended ASCII/latin characters. Marked as running serial because of shared cleanup/setup""" - self.run_test_case('QueryTest/delimited-latin-text', vector, unique_database, - encoding="latin-1") + result = self.execute_query_expect_success( + self.hs2_client, + "select * from functional.text_thorn_ecirc_newline order by col1") + assert result.tuples() == [('one', 'two', 3, 4), + (b'one\xea', 'two', 3, 4), + (b'one\xea\xea', 'two', 3, 4), + (b'one\xea\xfeone', 'two', 3, 4), + (b'one\xfeone', 'two', 3, 4)] + self.run_test_case('QueryTest/delimited-latin-text', vector, unique_database) def test_large_file_of_field_delimiters(self, vector, unique_database): """IMPALA-13161: Verifies reading a large file which has full of field delimiters @@ -83,7 +91,7 @@ class TestDelimitedText(ImpalaTestSuite): table_loc = self._get_table_location(tbl, vector) # Generate a 3GB data file that has full of '\x00' (the default field delimiter) with open("data.txt", "wb") as f: - long_str = "\x00" * 1024 * 1024 * 3 + long_str = b"\x00" * 1024 * 1024 * 3 [f.write(long_str) for i in range(1024)] check_call(["hdfs", "dfs", "-put", "data.txt", table_loc]) self.execute_query("refresh " + tbl) diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py index f3f5bcf63..7d4e7e1bc 100644 --- a/tests/query_test/test_iceberg.py +++ b/tests/query_test/test_iceberg.py @@ -47,6 +47,7 @@ from tests.shell.util import run_impala_shell_cmd from tests.util.filesystem_utils import FILESYSTEM_PREFIX, get_fs_path, IS_HDFS, WAREHOUSE from tests.util.get_parquet_metadata import get_parquet_metadata from tests.util.iceberg_util import cast_ts, get_snapshots, IcebergCatalogs, quote +from tests.util.parse_util import bytes_to_str LOG = logging.getLogger(__name__) @@ -1130,35 +1131,35 @@ class TestIcebergTable(IcebergTestSuite): {'key': 6, 'value': 1}, {'key': 7, 'value': 2}] assert datafiles[1]['lower_bounds'] == \ - [{'key': 1, 'value': 'abc'}, + [{'key': 1, 'value': b'abc'}, # INT is serialized as 4-byte little endian - {'key': 2, 'value': '\x00\x00\x00\x00'}, + {'key': 2, 'value': b'\x00\x00\x00\x00'}, # BOOLEAN is serialized as 0x00 for FALSE - {'key': 3, 'value': '\x00'}, + {'key': 3, 'value': b'\x00'}, # BIGINT is serialized as 8-byte little endian - {'key': 4, 'value': '\x40\xaf\x0d\x86\x48\x70\x00\x00'}, + {'key': 4, 'value': b'\x40\xaf\x0d\x86\x48\x70\x00\x00'}, # TIMESTAMP is serialized as 8-byte little endian (number of microseconds since # 1970-01-01 00:00:00) - {'key': 5, 'value': '\xc0\xd7\xff\x06\xd0\xff\xff\xff'}, + {'key': 5, 'value': b'\xc0\xd7\xff\x06\xd0\xff\xff\xff'}, # DATE is serialized as 4-byte little endian (number of days since 1970-01-01) - {'key': 6, 'value': '\x93\xfe\xff\xff'}, + {'key': 6, 'value': b'\x93\xfe\xff\xff'}, # Unlike other numerical values, DECIMAL is serialized as big-endian. - {'key': 7, 'value': '\xd8\xf0'}] + {'key': 7, 'value': b'\xd8\xf0'}] assert datafiles[1]['upper_bounds'] == \ - [{'key': 1, 'value': 'ghij'}, + [{'key': 1, 'value': b'ghij'}, # INT is serialized as 4-byte little endian - {'key': 2, 'value': '\x03\x00\x00\x00'}, + {'key': 2, 'value': b'\x03\x00\x00\x00'}, # BOOLEAN is serialized as 0x01 for TRUE - {'key': 3, 'value': '\x01'}, + {'key': 3, 'value': b'\x01'}, # BIGINT is serialized as 8-byte little endian - {'key': 4, 'value': '\x81\x58\xc2\x97\x56\xd5\x00\x00'}, + {'key': 4, 'value': b'\x81\x58\xc2\x97\x56\xd5\x00\x00'}, # TIMESTAMP is serialized as 8-byte little endian (number of microseconds since # 1970-01-01 00:00:00) - {'key': 5, 'value': '\x80\x02\x86\xef\x2f\x00\x00\x00'}, + {'key': 5, 'value': b'\x80\x02\x86\xef\x2f\x00\x00\x00'}, # DATE is serialized as 4-byte little endian (number of days since 1970-01-01) - {'key': 6, 'value': '\x6d\x01\x00\x00'}, + {'key': 6, 'value': b'\x6d\x01\x00\x00'}, # Unlike other numerical values, DECIMAL is serialized as big-endian. - {'key': 7, 'value': '\x00\xdc\x14'}] + {'key': 7, 'value': b'\x00\xdc\x14'}] def test_using_upper_lower_bound_metrics(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-upper-lower-bound-metrics', vector, @@ -2224,7 +2225,7 @@ class TestIcebergV2Table(IcebergTestSuite): table_location = "{0}/test-warehouse/{1}.db/{2}/data".format( FILESYSTEM_PREFIX, unique_database, table_name) files_result = check_output(["hdfs", "dfs", "-ls", table_location]) - assert "Found 1 items" in files_result + assert "Found 1 items" in bytes_to_str(files_result) def test_predicate_push_down_hint(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-predicate-push-down-hint', vector, diff --git a/tests/query_test/test_queries.py b/tests/query_test/test_queries.py index a3c86e771..3b1496bf8 100644 --- a/tests/query_test/test_queries.py +++ b/tests/query_test/test_queries.py @@ -161,21 +161,20 @@ class TestQueries(ImpalaTestSuite): file_format = vector.get_value('table_format').file_format if file_format == 'hbase': pytest.xfail(reason="IMPALA-283 - select count(*) produces inconsistent results") - vector.get_value('exec_option')['disable_outermost_topn'] = 1 - vector.get_value('exec_option')['analytic_rank_pushdown_threshold'] = 0 - self.run_test_case('QueryTest/sort', vector) + new_vector = deepcopy(vector) + options = new_vector.get_value('exec_option') + options['disable_outermost_topn'] = 1 + options['analytic_rank_pushdown_threshold'] = 0 + self.run_test_case('QueryTest/sort', new_vector) # We can get the sort tests for free from the top-n file - self.run_test_case('QueryTest/top-n', vector) + self.run_test_case('QueryTest/top-n', new_vector) if file_format in ['parquet', 'orc']: # set timestamp options to get consistent results for both format. - new_vector = deepcopy(vector) - options = new_vector.get_value('exec_option') options['convert_legacy_hive_parquet_utc_timestamps'] = 1 options['timezone'] = '"Europe/Budapest"' self.run_test_case('QueryTest/sort-complex', new_vector) - def test_partitioned_top_n(self, vector): """Test partitioned Top-N operator.""" if vector.get_value('table_format').file_format == "hbase": diff --git a/tests/shell/test_shell_commandline.py b/tests/shell/test_shell_commandline.py index 369a1fdc5..0658c0776 100644 --- a/tests/shell/test_shell_commandline.py +++ b/tests/shell/test_shell_commandline.py @@ -33,11 +33,11 @@ from time import sleep, time from builtins import range import pytest +from impala_shell.impala_client import utf8_encode_if_needed from impala_shell.impala_shell import ImpalaShell as ImpalaShellClass from tests.common.environ import ImpalaTestClusterProperties from tests.common.impala_service import ImpaladService from tests.common.impala_test_suite import IMPALAD_HS2_HOST_PORT, ImpalaTestSuite -from tests.common.skip import SkipIf from tests.common.test_dimensions import ( create_client_protocol_dimension, create_client_protocol_strict_dimension, @@ -60,8 +60,9 @@ from tests.shell.util import ( DEFAULT_QUERY = 'select 1' QUERY_FILE_PATH = os.path.join(os.environ['IMPALA_HOME'], 'tests', 'shell') -RUSSIAN_CHARS = (u"А, Б, В, Г, Д, Е, Ё, Ж, З, И, Й, К, Л, М, Н, О, П, Р," - u"С, Т, У, Ф, Х, Ц,Ч, Ш, Щ, Ъ, Ы, Ь, Э, Ю, Я") +RUSSIAN_CHARS = utf8_encode_if_needed( + u"А, Б, В, Г, Д, Е, Ё, Ж, З, И, Й, К, Л, М, Н, О, П, Р," + u"С, Т, У, Ф, Х, Ц,Ч, Ш, Щ, Ъ, Ы, Ь, Э, Ю, Я") """IMPALA-12216 implemented timestamp to be printed in case of any error/warning during query execution, below is an example : @@ -90,6 +91,7 @@ def find_query_option(key, string, strip_brackets=True): assert len(values) == 1 return values[0].strip("[]") if strip_brackets else values[0] + @pytest.fixture def empty_table(unique_database, request): """Create an empty table within the test database before executing test. @@ -299,7 +301,6 @@ class TestImpalaShell(ImpalaTestSuite): "Column metadata states there are 11 values, but read 10 values from column id." ) - def test_completed_query_errors_2(self, vector): if vector.get_value('strict_hs2_protocol'): pytest.skip("Impala-10827: Multiple queries not supported in strict hs2 mode.") @@ -412,7 +413,7 @@ class TestImpalaShell(ImpalaTestSuite): args = ['-p', '-q', 'select 1; profile;'] result_set = run_impala_shell_cmd(vector, args) # This regex helps us uniquely identify a profile. - regex = re.compile("Operator\s+#Hosts\s+#Inst\s+Avg\s+Time") + regex = re.compile(r"Operator\s+#Hosts\s+#Inst\s+Avg\s+Time") # We expect two query profiles. assert len(re.findall(regex, result_set.stdout)) == 2, \ "Could not detect two profiles, stdout: %s" % result_set.stdout @@ -626,24 +627,24 @@ class TestImpalaShell(ImpalaTestSuite): def test_international_characters(self, vector): """Sanity test to ensure that the shell can read international characters.""" - args = ['-B', '-q', "select '{0}'".format(RUSSIAN_CHARS.encode('utf-8'))] + args = ['-B', '-q', "select '{0}'".format(RUSSIAN_CHARS)] result = run_impala_shell_cmd(vector, args) assert 'UnicodeDecodeError' not in result.stderr - assert RUSSIAN_CHARS.encode('utf-8') in result.stdout + assert RUSSIAN_CHARS in result.stdout def test_international_characters_prettyprint(self, vector): """IMPALA-2717: ensure we can handle international characters in pretty-printed output""" - args = ['-q', "select '{0}'".format(RUSSIAN_CHARS.encode('utf-8'))] + args = ['-q', "select '{0}'".format(RUSSIAN_CHARS)] result = run_impala_shell_cmd(vector, args) assert 'UnicodeDecodeError' not in result.stderr - assert RUSSIAN_CHARS.encode('utf-8') in result.stdout + assert RUSSIAN_CHARS in result.stdout def test_international_characters_prettyprint_tabs(self, vector): """IMPALA-2717: ensure we can handle international characters in pretty-printed output when pretty-printing falls back to delimited output.""" - args = ['-q', "select '{0}\\t'".format(RUSSIAN_CHARS.encode('utf-8'))] + args = ['-q', "select '{0}\\t'".format(RUSSIAN_CHARS)] result = run_impala_shell_cmd(vector, args) protocol = vector.get_value('protocol') @@ -654,13 +655,13 @@ class TestImpalaShell(ImpalaTestSuite): assert protocol in ('hs2', 'hs2-http'), protocol assert 'Reverting to tab delimited text' not in result.stderr assert 'UnicodeDecodeError' not in result.stderr - assert RUSSIAN_CHARS.encode('utf-8') in result.stdout + assert RUSSIAN_CHARS in result.stdout def test_international_characters_profile(self, vector): """IMPALA-12145: ensure we can handle international characters in the profile. """ if vector.get_value('strict_hs2_protocol'): pytest.skip("Profile not supported in strict hs2 mode.") - text = RUSSIAN_CHARS.encode('utf-8') + text = RUSSIAN_CHARS args = ['-o', '/dev/null', '-p', '-q', "select '{0}'".format(text)] result = run_impala_shell_cmd(vector, args) assert 'UnicodeDecodeError' not in result.stderr @@ -687,7 +688,7 @@ class TestImpalaShell(ImpalaTestSuite): # is running against Hive which is another variable. On thrift 0.14 and higher, # talking to Hive, the result is b'\\xaa', so allow this as another possibility. assert '\xef\xbf\xbd' in result.stdout or '\xaa' in result.stdout or \ - '\\xaa' in result.stdout + '\\xaa' in result.stdout or '�' in result.stdout def test_global_config_file(self, vector): """Test global and user configuration files.""" @@ -776,13 +777,13 @@ class TestImpalaShell(ImpalaTestSuite): assert "WARNING: Option 'config_file' can be only set from shell." in result.stderr err_msg = ("WARNING: Unable to read configuration file correctly. " "Ignoring unrecognized config option: 'invalid_option'\n") - assert err_msg in result.stderr + assert err_msg in result.stderr args = ['--config_file=%s/impalarc_with_error' % QUERY_FILE_PATH] result = run_impala_shell_cmd(vector, args, expect_success=False) err_msg = ("Unexpected value in configuration file. " "'maybe' is not a valid value for a boolean option.") - assert err_msg in result.stderr + assert err_msg in result.stderr # live_progress and live_summary are not supported with strict_hs2_protocol if not vector.get_value('strict_hs2_protocol'): @@ -886,7 +887,7 @@ class TestImpalaShell(ImpalaTestSuite): # Test with an escaped variable. result = run_impala_shell_cmd(vector, ['--var=msg1=1', '--var=msg2=${var:msg1}2', - '--var=msg3=\${var:msg1}${var:msg2}', + '--var=msg3=\\${var:msg1}${var:msg2}', "--query=select '${var:msg3}'"]) self._validate_shell_messages(result.stderr, ['${var:msg1}12', 'Fetched 1 row(s)'], should_exist=True) @@ -894,7 +895,7 @@ class TestImpalaShell(ImpalaTestSuite): # Referencing a non-existent variable will result in an error. result = run_impala_shell_cmd(vector, [ '--var=msg1=1', '--var=msg2=${var:doesnotexist}2', - '--var=msg3=\${var:msg1}${var:msg2}', "--query=select '${var:msg3}'"], + '--var=msg3=\\${var:msg1}${var:msg2}', "--query=select '${var:msg3}'"], expect_success=False) self._validate_shell_messages(result.stderr, ['Error: Unknown variable DOESNOTEXIST', @@ -1045,7 +1046,7 @@ class TestImpalaShell(ImpalaTestSuite): if e.errno != errno.EINTR: raise data = connection.recv(1024) - assert expected_output in data + assert expected_output in str(data) finally: if impala_shell.poll() is None: impala_shell.kill() @@ -1063,7 +1064,7 @@ class TestImpalaShell(ImpalaTestSuite): try: socket.setdefaulttimeout(10) s = socket.socket() - s.bind(("",0)) + s.bind(("", 0)) s.listen(1) test_impalad_port = s.getsockname()[1] load_balancer_fqdn = "my-load-balancer.local" @@ -1087,7 +1088,7 @@ class TestImpalaShell(ImpalaTestSuite): assert "Could not execute command:" in result.stderr else: assert "Encountered: EOF" in result.stderr - args = ['-q', 'with v as (select 1) \;"'] + args = ['-q', 'with v as (select 1) \\;"'] result = run_impala_shell_cmd(vector, args, expect_success=False) if vector.get_value('strict_hs2_protocol'): assert "cannot recognize input near" @@ -1103,15 +1104,16 @@ class TestImpalaShell(ImpalaTestSuite): sql_file, sql_path = tempfile.mkstemp() # This generates a sql file size of ~50K. num_cols = 1000 - os.write(sql_file, "select \n") - for i in range(num_cols): - if i < num_cols: - os.write(sql_file, "col_{0} as a{1},\n".format(i, i)) - os.write(sql_file, "col_{0} as b{1},\n".format(i, i)) - os.write(sql_file, "col_{0} as c{1}{2}\n".format( - i, i, "," if i < num_cols - 1 else "")) - os.write(sql_file, "from non_existence_large_table;") - os.close(sql_file) + with open(sql_path, 'w') as f: + f.write("select \n") + for i in range(num_cols): + if i < num_cols: + f.write("col_{0} as a{1},\n".format(i, i)) + f.write("col_{0} as b{1},\n".format(i, i)) + f.write("col_{0} as c{1}{2}\n".format( + i, i, "," if i < num_cols - 1 else "")) + f.write("from non_existence_large_table;") + f.close() try: args = ['-f', sql_path, '-d', unique_database] @@ -1148,7 +1150,7 @@ class TestImpalaShell(ImpalaTestSuite): tzname = find_query_option("TIMEZONE", result_set.stdout) assert os.path.isfile("/usr/share/zoneinfo/" + tzname) - def test_find_query_option(self, vector): + def test_find_query_option(self): """Test utility function find_query_option().""" test_input = """ not_an_option @@ -1177,8 +1179,8 @@ class TestImpalaShell(ImpalaTestSuite): # --connect_timeout_ms not supported with HTTP transport. Refer to the comment # in ImpalaClient::_get_http_transport() for details. # --http_socket_timeout_s not supported for strict_hs2_protocol. - if (vector.get_value('protocol') == 'hs2-http' and - vector.get_value('strict_hs2_protocol')): + if (vector.get_value('protocol') == 'hs2-http' + and vector.get_value('strict_hs2_protocol')): pytest.skip("THRIFT-4600") with closing(socket.socket()) as s: @@ -1259,9 +1261,9 @@ class TestImpalaShell(ImpalaTestSuite): assert "| |" in result.stdout, result.stdout assert "| árvíztűrőtükörfúró |" in result.stdout, result.stdout assert "| 你好hello |" in result.stdout, result.stdout - assert "| \x00\xef\xbf\xbd\x00\xef\xbf\xbd |" in result.stdout, \ - result.stdout - assert '| \xef\xbf\xbdD3"\x11\x00 |' in result.stdout, result.stdout + # The last two output lines are malformed UTF-8 strings. + assert "| " + utf8_encode_if_needed("\0�\0�") in result.stdout, result.stdout + assert "| " + utf8_encode_if_needed("�D3\"") in result.stdout, result.stdout def test_binary_as_string(self, vector): query = """select cast(binary_col as string) from functional.binary_tbl @@ -1292,8 +1294,8 @@ class TestImpalaShell(ImpalaTestSuite): # way that python2 and python3 represent floating point values, the output # from the shell will differ with regard to which version of python the # shell is running under. - assert("3\t3\t30.299999999999997" in result.stdout or - "3\t3\t30.3" in result.stdout), result.stdout + assert ("3\t3\t30.299999999999997" in result.stdout + or "3\t3\t30.3" in result.stdout), result.stdout assert "4\t4\t40.4" in result.stdout, result.stdout @@ -1410,8 +1412,8 @@ class TestImpalaShell(ImpalaTestSuite): def test_http_socket_timeout(self, vector): """Test setting different http_socket_timeout_s values.""" - if (vector.get_value('strict_hs2_protocol') or - vector.get_value('protocol') != 'hs2-http'): + if (vector.get_value('strict_hs2_protocol') + or vector.get_value('protocol') != 'hs2-http'): pytest.skip("http socket timeout not supported in strict hs2 mode." " Only supported with hs2-http protocol.") # Test http_socket_timeout_s=0, expect errors diff --git a/tests/shell/test_shell_interactive.py b/tests/shell/test_shell_interactive.py index bedc10216..3ce3d6cfe 100755 --- a/tests/shell/test_shell_interactive.py +++ b/tests/shell/test_shell_interactive.py @@ -35,6 +35,7 @@ from time import sleep import pexpect import pytest +from impala_shell.impala_client import utf8_encode_if_needed # This import is the actual ImpalaShell class from impala_shell.py. # We rename it to ImpalaShellClass here because we later import another # class called ImpalaShell from tests/shell/util.py, and we don't want @@ -63,6 +64,7 @@ from tests.shell.util import ( spawn_shell, stderr_get_first_error_msg, ) +from tests.util.parse_util import bytes_to_str QUERY_FILE_PATH = os.path.join(os.environ['IMPALA_HOME'], 'tests', 'shell') @@ -117,7 +119,7 @@ class RequestHandler503(http.server.SimpleHTTPRequestHandler): self.end_headers() if self.should_send_body_text(): # Optionally send body text with 503 message. - self.wfile.write("EXTRA") + self.wfile.write(b"EXTRA") class RequestHandler503Extra(RequestHandler503): @@ -195,7 +197,7 @@ class TestImpalaShellInteractive(ImpalaTestSuite): proc.expect(":{0}] {1}>".format(get_impalad_port(vector), db)) if not expectations: return for e in expectations: - assert e in proc.before + assert e in bytes_to_str(proc.before) def _wait_for_num_open_sessions(self, vector, impala_service, expected, err): """Helper method to wait for the number of open sessions to reach 'expected'.""" @@ -403,7 +405,7 @@ class TestImpalaShellInteractive(ImpalaTestSuite): def test_cancellation_mid_command(self, vector): """Test that keyboard interrupt cancels multiline query strings""" if vector.get_value('strict_hs2_protocol'): - pytest.skip("IMPALA-10827: Cancellation infrastructure does not " + + pytest.skip("IMPALA-10827: Cancellation infrastructure does not " "work in strict hs2 mode.") shell_cmd = get_shell_cmd(vector) multiline_query = ["select column_1\n", "from table_1\n", "where ..."] @@ -413,7 +415,7 @@ class TestImpalaShellInteractive(ImpalaTestSuite): for query_line in multiline_query: child_proc.send(query_line) child_proc.sendintr() - child_proc.expect("\^C") + child_proc.expect(r"\^C") child_proc.expect(PROMPT_REGEX) child_proc.sendline('quit;') child_proc.wait() @@ -425,7 +427,7 @@ class TestImpalaShellInteractive(ImpalaTestSuite): child_proc.send("\n") child_proc.expect(">") child_proc.sendintr() - child_proc.expect("> \^C") + child_proc.expect(r"> \^C") child_proc.expect(PROMPT_REGEX) child_proc.sendline('quit;') child_proc.wait() @@ -435,7 +437,7 @@ class TestImpalaShellInteractive(ImpalaTestSuite): if vector.get_value('strict_hs2_protocol'): pytest.skip("IMPALA-10827: Failed, need to investigate.") # test a unicode query spanning multiple lines - unicode_bytes = u'\ufffd'.encode('utf-8') + unicode_bytes = utf8_encode_if_needed(u'\ufffd') args = "select '{0}'\n;".format(unicode_bytes) result = run_impala_shell_interactive(vector, args) assert "Fetched 1 row(s)" in result.stderr, result.stderr @@ -449,7 +451,7 @@ class TestImpalaShellInteractive(ImpalaTestSuite): child_proc = spawn_shell(shell_cmd) child_proc.expect(PROMPT_REGEX) child_proc.sendline("select '{0}'\n;".format(unicode_bytes)) - child_proc.expect("Fetched 1 row\(s\) in [0-9]+\.?[0-9]*s") + child_proc.expect(r"Fetched 1 row\(s\) in [0-9]+\.?[0-9]*s") child_proc.expect(PROMPT_REGEX) child_proc.sendline('quit;') child_proc.wait() @@ -477,7 +479,6 @@ class TestImpalaShellInteractive(ImpalaTestSuite): assert "Fetched 1 row(s)" in result.stderr, result.stderr assert "세율중분류구분코드" in result.stdout - def test_welcome_string(self, vector): """Test that the shell's welcome message is only printed once when the shell is started. Ensure it is not reprinted on errors. @@ -616,7 +617,7 @@ class TestImpalaShellInteractive(ImpalaTestSuite): run_impala_shell_interactive(vector, "drop database if exists foo;") self.create_impala_clients() - def test_multiline_queries_in_history(self, vector, tmp_history_file): + def test_multiline_queries_in_history(self, vector, tmp_history_file): # noqa: U100 """Test to ensure that multiline queries with comments are preserved in history Ensure that multiline queries are preserved when they're read back from history. @@ -638,7 +639,7 @@ class TestImpalaShellInteractive(ImpalaTestSuite): for query, _ in queries: child_proc.expect(PROMPT_REGEX) child_proc.sendline(query) - child_proc.expect("Fetched 1 row\(s\) in [0-9]+\.?[0-9]*s") + child_proc.expect(r"Fetched 1 row\(s\) in [0-9]+\.?[0-9]*s") child_proc.expect(PROMPT_REGEX) child_proc.sendline('quit;') child_proc.wait() @@ -649,7 +650,8 @@ class TestImpalaShellInteractive(ImpalaTestSuite): assert history_entry in result.stderr, "'%s' not in '%s'" % (history_entry, result.stderr) - def test_history_does_not_duplicate_on_interrupt(self, vector, tmp_history_file): + def test_history_does_not_duplicate_on_interrupt( + self, vector, tmp_history_file): # noqa: U100 """This test verifies that once the cmdloop is broken the history file will not be re-read. The cmdloop can be broken when the user sends a SIGINT or exceptions occur.""" @@ -662,7 +664,7 @@ class TestImpalaShellInteractive(ImpalaTestSuite): # initialize history child_proc.expect(PROMPT_REGEX) child_proc.sendline("select 1;") - child_proc.expect("Fetched 1 row\(s\) in [0-9]+\.?[0-9]*s") + child_proc.expect(r"Fetched 1 row\(s\) in [0-9]+\.?[0-9]*s") child_proc.expect(PROMPT_REGEX) child_proc.sendline("quit;") child_proc.wait() @@ -671,9 +673,9 @@ class TestImpalaShellInteractive(ImpalaTestSuite): child_proc = spawn_shell(shell_cmd) child_proc.expect(PROMPT_REGEX) child_proc.sendintr() - child_proc.expect("\^C") + child_proc.expect(r"\^C") child_proc.sendline("select 2;") - child_proc.expect("Fetched 1 row\(s\) in [0-9]+\.?[0-9]*s") + child_proc.expect(r"Fetched 1 row\(s\) in [0-9]+\.?[0-9]*s") child_proc.expect(PROMPT_REGEX) child_proc.sendline("quit;") child_proc.wait() @@ -682,12 +684,14 @@ class TestImpalaShellInteractive(ImpalaTestSuite): p = ImpalaShell(vector) p.send_cmd('history') result = p.get_result().stderr.splitlines() - assert "[1]: select 1;" == result[1] - assert "[2]: quit;" == result[2] - assert "[3]: select 2;" == result[3] - assert "[4]: quit;" == result[4] - - def test_history_file_option(self, vector, tmp_history_file): + # Python 2 and Python 3 shell have different first lines in the history output. + start_idx = 1 if "Server version: " in result[0] else 0 + assert "[1]: select 1;" == result[start_idx] + assert "[2]: quit;" == result[start_idx + 1] + assert "[3]: select 2;" == result[start_idx + 2] + assert "[4]: quit;" == result[start_idx + 3] + + def test_history_file_option(self, vector, tmp_history_file): # noqa: U100 """ Setting the 'tmp_history_file' fixture above means that the IMPALA_HISTFILE environment will be overridden. Here we override that environment by passing @@ -704,7 +708,7 @@ class TestImpalaShellInteractive(ImpalaTestSuite): history_contents = open(new_hist.name).read() assert "select 'hi'" in history_contents - def test_rerun(self, vector, tmp_history_file): + def test_rerun(self, vector, tmp_history_file): # noqa: U100 """Smoke test for the 'rerun' command""" if vector.get_value('strict_hs2_protocol'): pytest.skip("Rerun not supported in strict hs2 mode.") @@ -721,11 +725,12 @@ class TestImpalaShellInteractive(ImpalaTestSuite): ("second_command")) child_proc.sendline('history;') child_proc.expect(":{0}] default>".format(get_impalad_port(vector))) - assert '[1]: select \'first_command\';' in child_proc.before - assert '[2]: select \'second_command\';' in child_proc.before - assert '[3]: history;' in child_proc.before + before_line = child_proc.before.decode('UTF-8', 'replace') + assert '[1]: select \'first_command\';' in before_line + assert '[2]: select \'second_command\';' in before_line + assert '[3]: history;' in before_line # Rerunning command should not add an entry into history. - assert '[4]' not in child_proc.before + assert '[4]' not in before_line self._expect_with_cmd(child_proc, "@0", vector, ("Command index out of range")) self._expect_with_cmd(child_proc, "rerun 4", vector, ("Command index out of range")) self._expect_with_cmd(child_proc, "@-4", vector, ("Command index out of range")) @@ -861,7 +866,7 @@ class TestImpalaShellInteractive(ImpalaTestSuite): # IMPALA-5416: Test that two source commands on a line won't crash the shell. result = run_impala_shell_interactive( vector, "source shell.cmds;source shell.cmds;") - assert len(re.findall("version\(\)", result.stdout)) == 2 + assert len(re.findall(r"version\(\)", result.stdout)) == 2 finally: os.chdir(cwd) @@ -888,11 +893,11 @@ class TestImpalaShellInteractive(ImpalaTestSuite): # the client does not fetch. For statements returning 0 rows we do not # want an empty line in stdout. result = run_impala_shell_interactive(vector, "-- foo \n use default;") - assert re.search('> \[', result.stdout) + assert re.search(r'> \[', result.stdout) result = run_impala_shell_interactive(vector, "select * from functional.alltypes limit 0;") assert "Fetched 0 row(s)" in result.stderr - assert re.search('> \[', result.stdout) + assert re.search(r'> \[', result.stdout) def test_set_and_set_all(self, vector): """IMPALA-2181. Tests the outputs of SET and SET ALL commands. SET should contain the @@ -1050,15 +1055,15 @@ class TestImpalaShellInteractive(ImpalaTestSuite): assert '| \'--\' |' in result.stdout assert '| -- |' in result.stdout - query = ('select * from (\n' + - 'select count(*) from functional.alltypes\n' + + query = ('select * from (\n' + 'select count(*) from functional.alltypes\n' ') v; -- Incomplete SQL statement in this line') result = run_impala_shell_interactive(vector, query) assert '| count(*) |' in result.stdout - query = ('select id from functional.alltypes\n' + - 'order by id; /*\n' + - '* Multi-line comment\n' + + query = ('select id from functional.alltypes\n' + 'order by id; /*\n' + '* Multi-line comment\n' '*/') result = run_impala_shell_interactive(vector, query) assert '| id |' in result.stdout @@ -1154,7 +1159,7 @@ class TestImpalaShellInteractive(ImpalaTestSuite): proc.sendeof() proc.wait() - def test_strip_leading_comment(self, vector): + def test_strip_leading_comment(self): """Test stripping leading comments from SQL statements""" assert ('--delete\n', 'select 1') == \ ImpalaShellClass.strip_leading_comment('--delete\nselect 1') @@ -1230,7 +1235,6 @@ class TestImpalaShellInteractive(ImpalaTestSuite): "Bad date/time conversion format: yyyy年MM月dd日" ) - def test_timezone_validation(self, vector): """Test that query option TIMEZONE is validated when executing a query. @@ -1300,7 +1304,7 @@ class TestImpalaShellInteractive(ImpalaTestSuite): "-i{0}:{1}".format(http_503_server_extra.HOST, http_503_server_extra.PORT)] shell_proc = spawn_shell(impala_shell_executable + shell_args) - shell_proc.expect("HTTP code 503: Service Unavailable \[EXTRA\]", timeout=10) + shell_proc.expect(r"HTTP code 503: Service Unavailable \[EXTRA\]", timeout=10) def run_impala_shell_interactive(vector, input_lines, shell_args=None, diff --git a/tests/shell/util.py b/tests/shell/util.py index 191f24849..30f0afb3d 100755 --- a/tests/shell/util.py +++ b/tests/shell/util.py @@ -23,10 +23,8 @@ from contextlib import closing import logging import os import re -import shlex import socket from subprocess import PIPE, Popen -import sys import time import pexpect @@ -54,6 +52,7 @@ LOG.addHandler(logging.StreamHandler()) SHELL_HISTORY_FILE = os.path.expanduser("~/.impalahistory") IMPALA_HOME = os.environ['IMPALA_HOME'] + def build_shell_env(env=None): """ Construct the environment for the shell to run in based on 'env', or the current process's environment if env is None.""" @@ -69,24 +68,24 @@ def build_shell_env(env=None): def assert_var_substitution(result): - assert_pattern(r'\bfoo_number=.*$', 'foo_number= 123123', result.stdout, \ + assert_pattern(r'\bfoo_number=.*$', 'foo_number= 123123', result.stdout, 'Numeric values not replaced correctly') - assert_pattern(r'\bfoo_string=.*$', 'foo_string=123', result.stdout, \ + assert_pattern(r'\bfoo_string=.*$', 'foo_string=123', result.stdout, 'String values not replaced correctly') - assert_pattern(r'\bVariables:[\s\n]*BAR:\s*[0-9]*\n\s*FOO:\s*[0-9]*', \ - 'Variables:\n\tBAR: 456\n\tFOO: 123', result.stdout, \ + assert_pattern(r'\bVariables:[\s\n]*BAR:\s*[0-9]*\n\s*FOO:\s*[0-9]*', + 'Variables:\n\tBAR: 456\n\tFOO: 123', result.stdout, "Set variable not listed correctly by the first SET command") - assert_pattern(r'\bError: Unknown variable FOO1$', \ - 'Error: Unknown variable FOO1', result.stderr, \ + assert_pattern(r'\bError: Unknown variable FOO1$', + 'Error: Unknown variable FOO1', result.stderr, 'Missing variable FOO1 not reported correctly') - assert_pattern(r'\bmulti_test=.*$', 'multi_test=456_123_456_123', \ + assert_pattern(r'\bmulti_test=.*$', 'multi_test=456_123_456_123', result.stdout, 'Multiple replaces not working correctly') - assert_pattern(r'\bError:\s*Unknown\s*substitution\s*syntax\s*' + - r'\(RANDOM_NAME\). Use \${VAR:var_name}', \ - 'Error: Unknown substitution syntax (RANDOM_NAME). Use ${VAR:var_name}', \ + assert_pattern(r'\bError:\s*Unknown\s*substitution\s*syntax\s*' + r'\(RANDOM_NAME\). Use \${VAR:var_name}', + 'Error: Unknown substitution syntax (RANDOM_NAME). Use ${VAR:var_name}', result.stderr, "Invalid variable reference") assert_pattern(r'"This should be not replaced: \${VAR:foo} \${HIVEVAR:bar}"', - '"This should be not replaced: ${VAR:foo} ${HIVEVAR:bar}"', \ + '"This should be not replaced: ${VAR:foo} ${HIVEVAR:bar}"', result.stdout, "Variable escaping not working") assert_pattern(r'\bVariable MYVAR set to.*$', 'Variable MYVAR set to foo123', result.stderr, 'No evidence of MYVAR variable being set.') @@ -97,11 +96,11 @@ def assert_var_substitution(result): result.stdout, 'No evidence of variable FOO being unset') assert_pattern(r'\bUnsetting variable BAR$', 'Unsetting variable BAR', result.stdout, 'No evidence of variable BAR being unset') - assert_pattern(r'\bVariables:[\s\n]*No variables defined\.$', \ - 'Variables:\n\tNo variables defined.', result.stdout, \ + assert_pattern(r'\bVariables:[\s\n]*No variables defined\.$', + 'Variables:\n\tNo variables defined.', result.stdout, 'Unset variables incorrectly listed by third SET command.') - assert_pattern(r'\bNo variable called NONEXISTENT is set', \ - 'No variable called NONEXISTENT is set', result.stdout, \ + assert_pattern(r'\bNo variable called NONEXISTENT is set', + 'No variable called NONEXISTENT is set', result.stdout, 'Problem unsetting non-existent variable.') assert_pattern(r'\bVariable COMMENT_TYPE1 set to.*$', 'Variable COMMENT_TYPE1 set to ok', result.stderr, @@ -112,11 +111,12 @@ def assert_var_substitution(result): assert_pattern(r'\bVariable COMMENT_TYPE3 set to.*$', 'Variable COMMENT_TYPE3 set to ok', result.stderr, 'No evidence of COMMENT_TYPE3 variable being set.') - assert_pattern(r'\bVariables:[\s\n]*COMMENT_TYPE1:.*[\s\n]*' + \ - 'COMMENT_TYPE2:.*[\s\n]*COMMENT_TYPE3:.*$', - 'Variables:\n\tCOMMENT_TYPE1: ok\n\tCOMMENT_TYPE2: ok\n\tCOMMENT_TYPE3: ok', \ + assert_pattern(r'\bVariables:[\s\n]*COMMENT_TYPE1:.*[\s\n]*' + r'COMMENT_TYPE2:.*[\s\n]*COMMENT_TYPE3:.*$', + 'Variables:\n\tCOMMENT_TYPE1: ok\n\tCOMMENT_TYPE2: ok\n\tCOMMENT_TYPE3: ok', result.stdout, 'Set variables not listed correctly by the SET command') + def assert_pattern(pattern, result, text, message): """Asserts that the pattern, when applied to text, returns the expected result""" m = re.search(pattern, text, re.MULTILINE) @@ -217,6 +217,7 @@ def get_open_sessions_metric(vector): assert protocol == 'beeswax', protocol return 'impala-server.num-open-beeswax-sessions' + class ImpalaShellResult(object): def __init__(self): self.rc = 0 @@ -380,12 +381,13 @@ def get_dev_impala_shell_executable(): def create_impala_shell_executable_dimension(dev_only=False): _, include_pypi = get_dev_impala_shell_executable() dimensions = [] - if os.getenv("IMPALA_SYSTEM_PYTHON2"): + python3_pytest = (os.getenv("IMPALA_USE_PYTHON3_TESTS", "false") == "true") + if os.getenv("IMPALA_SYSTEM_PYTHON2") and not python3_pytest: dimensions.append('dev') if os.getenv("IMPALA_SYSTEM_PYTHON3"): dimensions.append('dev3') if include_pypi and not dev_only: - if os.getenv("IMPALA_SYSTEM_PYTHON2"): + if os.getenv("IMPALA_SYSTEM_PYTHON2") and not python3_pytest: dimensions.append('python2') if os.getenv("IMPALA_SYSTEM_PYTHON3"): dimensions.append('python3') diff --git a/tests/statestore/test_statestore.py b/tests/statestore/test_statestore.py index e6417721a..3661d09d1 100644 --- a/tests/statestore/test_statestore.py +++ b/tests/statestore/test_statestore.py @@ -20,11 +20,14 @@ from collections import defaultdict import json import logging import socket +import sys import threading import time import traceback import uuid +import pytest + from builtins import range from thrift.protocol import TBinaryProtocol from thrift.server.TServer import TServer @@ -115,6 +118,7 @@ class KillableThreadedServer(TServer): self.port = self.serverTransport.port def shutdown(self): + LOG.info('Server localhost:{} is shutting down'.format(self.port)) self.is_shutdown = True self.serverTransport.close() self.wait_until_down() @@ -127,20 +131,22 @@ class KillableThreadedServer(TServer): cnxn = TSocket.TSocket('localhost', self.port) try: cnxn.open() + LOG.info('Server localhost:{} is up'.format(cnxn.port)) return except Exception: if i == num_tries - 1: raise - time.sleep(0.1) + time.sleep(0.5) def wait_until_down(self, num_tries=10): for i in range(num_tries): cnxn = TSocket.TSocket('localhost', self.port) try: cnxn.open() - time.sleep(0.1) except Exception: + LOG.info('Server localhost:{} is down'.format(cnxn.port)) return - raise Exception("Server did not stop") + time.sleep(0.5) + raise Exception("Server localhost:{} did not stop".format(cnxn.port)) def serve(self): self.serverTransport.listen() @@ -149,8 +155,12 @@ class KillableThreadedServer(TServer): # Since accept() can take a while, check again if the server is shutdown to avoid # starting an unnecessary thread. if self.is_shutdown: return - t = threading.Thread(target=self.handle, args=(client,)) - t.setDaemon(self.daemon) + t = None + if sys.version_info.major < 3: + t = threading.Thread(target=self.handle, args=(client,)) + t.setDaemon(True) + else: + t = threading.Thread(target=self.handle, args=(client,), daemon=self.daemon) t.start() def handle(self, client): @@ -196,6 +206,9 @@ class StatestoreSubscriber(object): self.heartbeat_cb, self.update_cb = heartbeat_cb, update_cb self.subscriber_id = "python-test-client-%s" % uuid.uuid1() self.exception = None + self.server = None + self.server_thread = None + self.client_transport = None def __enter__(self): return self @@ -239,19 +252,24 @@ class StatestoreSubscriber(object): return response def __init_server(self): + LOG.info('Initializing server') processor = Subscriber.Processor(self) transport = WildcardServerSocket() tfactory = TTransport.TBufferedTransportFactory() pfactory = TBinaryProtocol.TBinaryProtocolFactory() self.server = KillableThreadedServer(processor, transport, tfactory, pfactory, daemon=True) - self.server_thread = threading.Thread(target=self.server.serve) - self.server_thread.setDaemon(True) + if sys.version_info.major < 3: + self.server_thread = threading.Thread(target=self.server.serve) + self.server_thread.setDaemon(True) + else: + self.server_thread = threading.Thread(target=self.server.serve, daemon=True) self.server_thread.start() self.server.wait_until_up() self.port = self.server.port def __init_client(self): + LOG.info('Initializing client') self.client_transport = \ TTransport.TBufferedTransport(TSocket.TSocket('localhost', 24000)) self.protocol = TBinaryProtocol.TBinaryProtocol(self.client_transport) @@ -352,6 +370,7 @@ class StatestoreSubscriber(object): time.sleep(0.2) +@pytest.mark.execute_serially @SkipIfDockerizedCluster.statestore_not_exposed class TestStatestore(BaseTestSuite): def make_topic_update(self, topic_name, key_template="foo", value_template="bar", diff --git a/tests/stress/test_update_stress.py b/tests/stress/test_update_stress.py index 5d76451af..013cefc02 100644 --- a/tests/stress/test_update_stress.py +++ b/tests/stress/test_update_stress.py @@ -29,6 +29,7 @@ from tests.common.skip import SkipIfFS from tests.common.test_dimensions import create_exec_option_dimension from tests.stress.stress_util import run_tasks, Task from tests.util.filesystem_utils import FILESYSTEM_PREFIX, IS_HDFS +from tests.util.parse_util import bytes_to_str from tests.conftest import DEFAULT_HIVE_SERVER2 @@ -339,7 +340,8 @@ class TestIcebergConcurrentOperations(ImpalaTestSuite): table_location = "{0}/test-warehouse/{1}.db/{2}/data".format( FILESYSTEM_PREFIX, unique_database, table_name) - data_files_on_fs_result = check_output(["hdfs", "dfs", "-ls", table_location]) + data_files_on_fs_result = bytes_to_str( + check_output(["hdfs", "dfs", "-ls", table_location])) # The first row of the HDFS result is a summary, the following lines contain # 1 file each. data_files_on_fs_rows = data_files_on_fs_result.strip().split('\n')[1:] diff --git a/tests/util/hdfs_util.py b/tests/util/hdfs_util.py index 125f57dcd..1dd2ce4e2 100644 --- a/tests/util/hdfs_util.py +++ b/tests/util/hdfs_util.py @@ -31,6 +31,7 @@ from xml.etree.ElementTree import parse from tests.util.filesystem_base import BaseFilesystem from tests.util.filesystem_utils import FILESYSTEM_PREFIX +from tests.util.parse_util import bytes_to_str class HdfsConfig(object): @@ -220,13 +221,13 @@ class HadoopFsCommandLineClient(BaseFilesystem): stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = process.communicate() status = process.returncode - return (status, stdout, stderr) + return (status, bytes_to_str(stdout), bytes_to_str(stderr)) def create_file(self, path, file_data, overwrite=True): """Creates a temporary file with the specified file_data on the local filesystem, then puts it into the specified path.""" if not overwrite and self.exists(path): return False - with tempfile.NamedTemporaryFile(delete=False) as tmp_file: + with tempfile.NamedTemporaryFile(mode='w+t', delete=False) as tmp_file: tmp_file.write(file_data) put_cmd_params = ['-put', '-d'] if overwrite: put_cmd_params.append('-f') diff --git a/tests/util/parse_util.py b/tests/util/parse_util.py index 05289ebaa..d96beecc0 100644 --- a/tests/util/parse_util.py +++ b/tests/util/parse_util.py @@ -17,6 +17,7 @@ from __future__ import absolute_import, division, print_function import re +import sys from datetime import datetime # IMPALA-6715: Every so often the stress test or the TPC workload directories get @@ -285,3 +286,11 @@ def get_time_summary_stats_counter(counter_name, runtime_profile): max_value=parse_duration_string_ns(summary_stat['max']))) return summary_stats + + +def bytes_to_str(bytes): + """Utility function to convert bytes to string. + This is needed to handle the differences between Python 2 and 3.""" + if sys.version_info.major < 3: + return str(bytes) + return bytes.decode('utf-8', errors='replace') diff --git a/tests/util/shell_util.py b/tests/util/shell_util.py index 2e63e3ff3..130d805b4 100644 --- a/tests/util/shell_util.py +++ b/tests/util/shell_util.py @@ -21,6 +21,8 @@ from __future__ import absolute_import, division, print_function import logging import os import shlex +import sys + from select import select from subprocess import PIPE, Popen, STDOUT, call from textwrap import dedent @@ -35,6 +37,7 @@ def dump_server_stacktraces(): LOG.debug('Dumping stacktraces of running servers') call([os.path.join(os.environ['IMPALA_HOME'], "bin/dump-stacktraces.sh")]) + def exec_process(cmd): """Executes a subprocess, waiting for completion. The process exit code, stdout and stderr are returned as a tuple.""" @@ -46,6 +49,7 @@ def exec_process(cmd): rc = p.returncode return rc, stdout, stderr + def exec_process_async(cmd): """Executes a subprocess, returning immediately. The process object is returned for later retrieval of the exit code etc. """ @@ -55,6 +59,7 @@ def exec_process_async(cmd): return Popen(shlex.split(cmd), shell=False, stdout=PIPE, stderr=PIPE, universal_newlines=True) + def shell(cmd, cmd_prepend="set -euo pipefail\n", stdout=PIPE, stderr=STDOUT, timeout_secs=None, **popen_kwargs): """Executes a command and returns its output. If the command's return code is non-zero @@ -77,6 +82,7 @@ def shell(cmd, cmd_prepend="set -euo pipefail\n", stdout=PIPE, stderr=STDOUT, remaining_fds.append(stderr_fileno) stdout = list() stderr = list() + def _read_available_output(): while True: available_fds, _, _ = select(remaining_fds, [], [], 0) @@ -88,7 +94,7 @@ def shell(cmd, cmd_prepend="set -euo pipefail\n", stdout=PIPE, stderr=STDOUT, if not data: del remaining_fds[0] else: - stdout.append(data) + stdout.append(data if sys.version_info.major < 3 else data.decode()) elif fd == stderr_fileno: if not data: del remaining_fds[-1]