Repository: impala Updated Branches: refs/heads/master a90b42911 -> 13b82624b
IMPALA-6812: Fix flaky Kudu scan tests Many of our Kudu related tests have been flaky with the symptom that scans appear to not return rows that were just inserted. This occurs because our default Kudu scan level of READ_LATEST doesn't make any consistency guarantees. This patch adds a query option 'kudu_read_mode', which overrides the startup flag of the same name, and then set that option to READ_AT_SNAPSHOT for all tests with Kudu inserts and scans, which should give us more consistent test results. Testing: - Passed a full exhaustive run. Does not appear to increase time to run by any significant amount. Change-Id: I70df84f2cbc663107f2ad029565d3c15bdfbd47c Reviewed-on: http://gerrit.cloudera.org:8080/10503 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/13b82624 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/13b82624 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/13b82624 Branch: refs/heads/master Commit: 13b82624b58275469988147c33981446c86e4174 Parents: a90b429 Author: Thomas Tauber-Marshall <[email protected]> Authored: Wed May 23 23:11:01 2018 +0000 Committer: Impala Public Jenkins <[email protected]> Committed: Mon Jun 18 20:19:53 2018 +0000 ---------------------------------------------------------------------- be/src/exec/kudu-scanner.cc | 14 +++++---- be/src/exec/kudu-util.cc | 18 +++++++++++ be/src/exec/kudu-util.h | 6 ++++ be/src/service/query-options.cc | 13 ++++++++ be/src/service/query-options.h | 3 +- be/src/util/debug-util.cc | 1 + be/src/util/debug-util.h | 1 + common/thrift/ImpalaInternalService.thrift | 10 +++++++ common/thrift/ImpalaService.thrift | 4 +++ tests/common/test_dimensions.py | 8 +++++ tests/custom_cluster/test_kudu.py | 8 +++++ tests/metadata/test_ddl.py | 1 + tests/query_test/test_kudu.py | 40 ++++++++++++++++++++++++- 13 files changed, 119 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/13b82624/be/src/exec/kudu-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc index e63a6fe..7353bf9 100644 --- a/be/src/exec/kudu-scanner.cc +++ b/be/src/exec/kudu-scanner.cc @@ -53,7 +53,8 @@ using kudu::client::KuduTable; using kudu::client::KuduValue; DEFINE_string(kudu_read_mode, "READ_LATEST", "(Advanced) Sets the Kudu scan ReadMode. " - "Supported Kudu read modes are READ_LATEST and READ_AT_SNAPSHOT."); + "Supported Kudu read modes are READ_LATEST and READ_AT_SNAPSHOT. Can be overridden " + "with the query option of the same name."); DEFINE_bool(pick_only_leaders_for_tests, false, "Whether to pick only leader replicas, for tests purposes only."); DEFINE_int32(kudu_scanner_keep_alive_period_sec, 15, @@ -64,7 +65,6 @@ DECLARE_int32(kudu_operation_timeout_ms); namespace impala { -const string MODE_READ_AT_SNAPSHOT = "READ_AT_SNAPSHOT"; KuduScanner::KuduScanner(KuduScanNodeBase* scan_node, RuntimeState* state) : scan_node_(scan_node), @@ -154,10 +154,12 @@ Status KuduScanner::OpenNextScanToken(const string& scan_token, bool* eos) { KUDU_RETURN_IF_ERROR(scanner_->SetSelection(kudu::client::KuduClient::LEADER_ONLY), BuildErrorString("Could not set replica selection")); } - kudu::client::KuduScanner::ReadMode mode = - MODE_READ_AT_SNAPSHOT == FLAGS_kudu_read_mode ? - kudu::client::KuduScanner::READ_AT_SNAPSHOT : - kudu::client::KuduScanner::READ_LATEST; + kudu::client::KuduScanner::ReadMode mode; + RETURN_IF_ERROR(StringToKuduReadMode(FLAGS_kudu_read_mode, &mode)); + if (state_->query_options().kudu_read_mode != TKuduReadMode::DEFAULT) { + RETURN_IF_ERROR(StringToKuduReadMode( + PrintThriftEnum(state_->query_options().kudu_read_mode), &mode)); + } KUDU_RETURN_IF_ERROR( scanner_->SetReadMode(mode), BuildErrorString("Could not set scanner ReadMode")); KUDU_RETURN_IF_ERROR(scanner_->SetTimeoutMillis(FLAGS_kudu_operation_timeout_ms), http://git-wip-us.apache.org/repos/asf/impala/blob/13b82624/be/src/exec/kudu-util.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-util.cc b/be/src/exec/kudu-util.cc index dd959e6..1751322 100644 --- a/be/src/exec/kudu-util.cc +++ b/be/src/exec/kudu-util.cc @@ -21,6 +21,7 @@ #include <string> #include <sstream> +#include <boost/algorithm/string.hpp> #include <kudu/client/callbacks.h> #include <kudu/client/schema.h> #include <kudu/common/partial_row.h> @@ -33,6 +34,7 @@ #include "runtime/timestamp-value.h" #include "runtime/timestamp-value.inline.h" +using boost::algorithm::iequals; using kudu::client::KuduSchema; using kudu::client::KuduClient; using kudu::client::KuduClientBuilder; @@ -46,6 +48,9 @@ DECLARE_int32(kudu_client_rpc_timeout_ms); namespace impala { +const string MODE_READ_LATEST = "READ_LATEST"; +const string MODE_READ_AT_SNAPSHOT = "READ_AT_SNAPSHOT"; + bool KuduClientIsSupported() { // The value below means the client is actually a stubbed client. This should mean // that no official client exists for the underlying OS. The value below should match @@ -287,4 +292,17 @@ Status CreateKuduValue(const ColumnType& col_type, void* value, KuduValue** out) return Status::OK(); } +Status StringToKuduReadMode( + const std::string& mode, kudu::client::KuduScanner::ReadMode* out) { + if (iequals(mode, MODE_READ_LATEST)) { + *out = kudu::client::KuduScanner::READ_LATEST; + } else if (iequals(mode, MODE_READ_AT_SNAPSHOT)) { + *out = kudu::client::KuduScanner::READ_AT_SNAPSHOT; + } else { + return Status(Substitute("Invalid kudu_read_mode '$0'. Valid values are READ_LATEST " + "and READ_AT_SNAPSHOT.", mode)); + } + return Status::OK(); +} + } // namespace impala http://git-wip-us.apache.org/repos/asf/impala/blob/13b82624/be/src/exec/kudu-util.h ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-util.h b/be/src/exec/kudu-util.h index 8fe4d01..1d9a9bd 100644 --- a/be/src/exec/kudu-util.h +++ b/be/src/exec/kudu-util.h @@ -108,5 +108,11 @@ inline Status FromKuduStatus( return Status::Expected(err_msg); } +/// Converts 'mode' to its equivalent ReadMode, stored in 'out'. Possible values for +/// 'mode' are 'READ_LATEST' and 'READ_AT_SNAPSHOT'. If 'mode' is invalid, an error is +/// returned. +Status StringToKuduReadMode( + const std::string& mode, kudu::client::KuduScanner::ReadMode* out) WARN_UNUSED_RESULT; + } /// namespace impala #endif http://git-wip-us.apache.org/repos/asf/impala/blob/13b82624/be/src/service/query-options.cc ---------------------------------------------------------------------- diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc index 0e71e95..218e4e6 100644 --- a/be/src/service/query-options.cc +++ b/be/src/service/query-options.cc @@ -679,6 +679,19 @@ Status impala::SetQueryOption(const string& key, const string& value, } break; } + case TImpalaQueryOptions::KUDU_READ_MODE: { + if (iequals(value, "DEFAULT") || iequals(value, "0")) { + query_options->__set_kudu_read_mode(TKuduReadMode::DEFAULT); + } else if (iequals(value, "READ_LATEST") || iequals(value, "1")) { + query_options->__set_kudu_read_mode(TKuduReadMode::READ_LATEST); + } else if (iequals(value, "READ_AT_SNAPSHOT") || iequals(value, "2")) { + query_options->__set_kudu_read_mode(TKuduReadMode::READ_AT_SNAPSHOT); + } else { + return Status(Substitute("Invalid kudu_read_mode '$0'. Valid values are " + "DEFAULT, READ_LATEST, and READ_AT_SNAPSHOT.", value)); + } + break; + } default: if (IsRemovedQueryOption(key)) { LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'"; http://git-wip-us.apache.org/repos/asf/impala/blob/13b82624/be/src/service/query-options.h ---------------------------------------------------------------------- diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h index 1b775c2..5c4f51d 100644 --- a/be/src/service/query-options.h +++ b/be/src/service/query-options.h @@ -41,7 +41,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type> // the DCHECK. #define QUERY_OPTS_TABLE\ DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\ - TImpalaQueryOptions::THREAD_RESERVATION_AGGREGATE_LIMIT + 1);\ + TImpalaQueryOptions::KUDU_READ_MODE + 1);\ REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\ QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\ QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS,\ @@ -137,6 +137,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type> TQueryOptionLevel::REGULAR)\ QUERY_OPT_FN(thread_reservation_aggregate_limit, THREAD_RESERVATION_AGGREGATE_LIMIT,\ TQueryOptionLevel::REGULAR)\ + QUERY_OPT_FN(kudu_read_mode, KUDU_READ_MODE, TQueryOptionLevel::ADVANCED)\ ; /// Enforce practical limits on some query options to avoid undesired query state. http://git-wip-us.apache.org/repos/asf/impala/blob/13b82624/be/src/util/debug-util.cc ---------------------------------------------------------------------- diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc index eb55a7c..7a31748 100644 --- a/be/src/util/debug-util.cc +++ b/be/src/util/debug-util.cc @@ -68,6 +68,7 @@ PRINT_THRIFT_ENUM_IMPL(THdfsFileFormat) PRINT_THRIFT_ENUM_IMPL(THdfsSeqCompressionMode) PRINT_THRIFT_ENUM_IMPL(TImpalaQueryOptions) PRINT_THRIFT_ENUM_IMPL(TJoinDistributionMode) +PRINT_THRIFT_ENUM_IMPL(TKuduReadMode) PRINT_THRIFT_ENUM_IMPL(TMetricKind) PRINT_THRIFT_ENUM_IMPL(TParquetArrayResolution) PRINT_THRIFT_ENUM_IMPL(TParquetFallbackSchemaResolution) http://git-wip-us.apache.org/repos/asf/impala/blob/13b82624/be/src/util/debug-util.h ---------------------------------------------------------------------- diff --git a/be/src/util/debug-util.h b/be/src/util/debug-util.h index 813f460..0978e50 100644 --- a/be/src/util/debug-util.h +++ b/be/src/util/debug-util.h @@ -57,6 +57,7 @@ std::string PrintThriftEnum(const THdfsFileFormat::type& value); std::string PrintThriftEnum(const THdfsSeqCompressionMode::type& value); std::string PrintThriftEnum(const TImpalaQueryOptions::type& value); std::string PrintThriftEnum(const TJoinDistributionMode::type& value); +std::string PrintThriftEnum(const TKuduReadMode::type& value); std::string PrintThriftEnum(const TMetricKind::type& value); std::string PrintThriftEnum(const TParquetArrayResolution::type& value); std::string PrintThriftEnum(const TParquetFallbackSchemaResolution::type& value); http://git-wip-us.apache.org/repos/asf/impala/blob/13b82624/common/thrift/ImpalaInternalService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index cb33800..96314d5 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -63,6 +63,13 @@ enum TJoinDistributionMode { SHUFFLE } +// Consistency level options for Kudu scans. +enum TKuduReadMode { + DEFAULT, + READ_LATEST, + READ_AT_SNAPSHOT +} + // Query options that correspond to ImpalaService.ImpalaQueryOptions, with their // respective defaults. Query options can be set in the following ways: // @@ -288,6 +295,9 @@ struct TQueryOptions { // See comment in ImpalaService.thrift. 67: optional i32 thread_reservation_aggregate_limit = 0; + + // See comment in ImpalaService.thrift. + 68: optional TKuduReadMode kudu_read_mode = TKuduReadMode.DEFAULT; } // Impala currently has two types of sessions: Beeswax and HiveServer2 http://git-wip-us.apache.org/repos/asf/impala/blob/13b82624/common/thrift/ImpalaService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index 5b01f61..91b4de8 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -313,6 +313,10 @@ enum TImpalaQueryOptions { // across all backends for the query exceeds this number. 0 or -1 means this has no // effect. THREAD_RESERVATION_AGGREGATE_LIMIT, + + // Overrides the -kudu_read_mode flag to set the consistency level for Kudu scans. + // Possible values are DEFAULT, READ_LATEST, and READ_AT_SNAPSHOT. + KUDU_READ_MODE, } // The summary of a DML statement. http://git-wip-us.apache.org/repos/asf/impala/blob/13b82624/tests/common/test_dimensions.py ---------------------------------------------------------------------- diff --git a/tests/common/test_dimensions.py b/tests/common/test_dimensions.py index a9ba7a8..785cfa9 100644 --- a/tests/common/test_dimensions.py +++ b/tests/common/test_dimensions.py @@ -182,6 +182,14 @@ def create_exec_option_dimension_from_dict(exec_option_dimensions): # Build a test vector out of it return ImpalaTestDimension('exec_option', *exec_option_dimension_values) +def add_exec_option_dimension(test_suite, key, value): + """ + Takes an ImpalaTestSuite object 'test_suite' and adds 'key=value' to every exec option + test dimension, leaving the number of tests that will be run unchanged. + """ + for v in test_suite.ImpalaTestMatrix.dimensions["exec_option"]: + v.value[key] = value + def extend_exec_option_dimension(test_suite, key, value): """ Takes an ImpalaTestSuite object 'test_suite' and extends the exec option test dimension http://git-wip-us.apache.org/repos/asf/impala/blob/13b82624/tests/custom_cluster/test_kudu.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_kudu.py b/tests/custom_cluster/test_kudu.py index 7563236..d79703d 100644 --- a/tests/custom_cluster/test_kudu.py +++ b/tests/custom_cluster/test_kudu.py @@ -21,6 +21,7 @@ from kudu.schema import INT32 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.kudu_test_suite import KuduTestSuite +from tests.common.test_dimensions import add_exec_option_dimension KUDU_MASTER_HOSTS = pytest.config.option.kudu_master_hosts LOG = logging.getLogger(__name__) @@ -31,6 +32,13 @@ class TestKuduOperations(CustomClusterTestSuite, KuduTestSuite): def get_workload(cls): return 'functional-query' + @classmethod + def add_test_dimensions(cls): + super(TestKuduOperations, cls).add_test_dimensions() + # The default read mode of READ_LATEST does not provide high enough consistency for + # these tests. + add_exec_option_dimension(cls, "kudu_read_mode", "READ_AT_SNAPSHOT") + @pytest.mark.execute_serially @CustomClusterTestSuite.with_args(impalad_args=\ "--use_local_tz_for_unix_timestamp_conversions=true") http://git-wip-us.apache.org/repos/asf/impala/blob/13b82624/tests/metadata/test_ddl.py ---------------------------------------------------------------------- diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py index bcafd8e..39f7f3c 100644 --- a/tests/metadata/test_ddl.py +++ b/tests/metadata/test_ddl.py @@ -259,6 +259,7 @@ class TestDdlStatements(TestDdlBase): @UniqueDatabase.parametrize(sync_ddl=True) def test_create_kudu(self, vector, unique_database): vector.get_value('exec_option')['abort_on_error'] = False + vector.get_value('exec_option')['kudu_read_mode'] = "READ_AT_SNAPSHOT" self.run_test_case('QueryTest/kudu_create', vector, use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) http://git-wip-us.apache.org/repos/asf/impala/blob/13b82624/tests/query_test/test_kudu.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py index d6a0d2a..3efea37 100644 --- a/tests/query_test/test_kudu.py +++ b/tests/query_test/test_kudu.py @@ -40,6 +40,7 @@ from pytz import utc from tests.common.kudu_test_suite import KuduTestSuite from tests.common.impala_cluster import ImpalaCluster from tests.common.skip import SkipIfNotHdfsMinicluster +from tests.common.test_dimensions import add_exec_option_dimension from tests.verifiers.metric_verifier import MetricVerifier KUDU_MASTER_HOSTS = pytest.config.option.kudu_master_hosts @@ -51,6 +52,13 @@ class TestKuduOperations(KuduTestSuite): This suite tests the different modification operations when using a kudu table. """ + @classmethod + def add_test_dimensions(cls): + super(TestKuduOperations, cls).add_test_dimensions() + # The default read mode of READ_LATEST does not provide high enough consistency for + # these tests. + add_exec_option_dimension(cls, "kudu_read_mode", "READ_AT_SNAPSHOT") + def test_out_of_range_timestamps(self, vector, cursor, kudu_client, unique_database): """Test timestamp values that are outside of Impala's supported date range.""" cursor.execute("""CREATE TABLE %s.times (a INT PRIMARY KEY, ts TIMESTAMP) @@ -299,6 +307,7 @@ class TestKuduOperations(KuduTestSuite): def test_kudu_col_removed(self, cursor, kudu_client, unique_database): """Test removing a Kudu column outside of Impala.""" + cursor.execute("set kudu_read_mode=READ_AT_SNAPSHOT") cursor.execute("""CREATE TABLE %s.foo (a INT PRIMARY KEY, s STRING) PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""" % unique_database) assert kudu_client.table_exists( @@ -366,7 +375,7 @@ class TestKuduOperations(KuduTestSuite): table_name = "%s.storage_attrs" % unique_database types = ['boolean', 'tinyint', 'smallint', 'int', 'bigint', 'float', 'double', \ 'string', 'timestamp', 'decimal'] - + cursor.execute("set kudu_read_mode=READ_AT_SNAPSHOT") create_query = "create table %s (id int primary key" % table_name for t in types: create_query += ", %s_col %s" % (t, t) @@ -436,6 +445,33 @@ class TestKuduOperations(KuduTestSuite): or "Column col1 has unexpected type." in msg \ or "Client provided column col1[int64 NULLABLE] not present in tablet" in msg + def _retry_query(self, cursor, query, expected): + retries = 0 + while retries < 3: + cursor.execute(query) + result = cursor.fetchall() + if result == expected: + break + retries += 1 + time.sleep(1) + assert retries < 3, \ + "Did not get a correct result for %s after 3 retries: %s" % (query, result) + + def test_read_modes(self, cursor, unique_database): + """Other Kudu tests are run with a scan level of READ_AT_SNAPSHOT to have predicable + scan results. This test verifies that scans work as expected at the scan level of + READ_LATEST by retrying the scan if the results are incorrect.""" + table_name = "%s.test_read_latest" % unique_database + cursor.execute("set kudu_read_mode=READ_LATEST") + cursor.execute("""create table %s (a int primary key, b string) partition by hash(a) + partitions 8 stored as kudu""" % table_name) + cursor.execute("insert into %s values (0, 'a'), (1, 'b'), (2, 'c')" % table_name) + self._retry_query(cursor, "select * from %s order by a" % table_name, + [(0, 'a'), (1, 'b'), (2, 'c')]) + cursor.execute("""insert into %s select id, string_col from functional.alltypes + where id > 2 limit 100""" % table_name) + self._retry_query(cursor, "select count(*) from %s" % table_name, [(103,)]) + class TestCreateExternalTable(KuduTestSuite): def test_external_timestamp_default_value(self, cursor, kudu_client, unique_database): @@ -606,6 +642,7 @@ class TestCreateExternalTable(KuduTestSuite): unbounded partition). It is not possible to create such a table in Impala, but it can be created directly in Kudu and then loaded as an external table. Regression test for IMPALA-5154.""" + cursor.execute("set kudu_read_mode=READ_AT_SNAPSHOT") schema_builder = SchemaBuilder() column_spec = schema_builder.add_column("id", INT64) column_spec.nullable(False) @@ -637,6 +674,7 @@ class TestCreateExternalTable(KuduTestSuite): def test_column_name_case(self, cursor, kudu_client, unique_database): """IMPALA-5286: Tests that an external Kudu table that was created with a column name containing upper case letters is handled correctly.""" + cursor.execute("set kudu_read_mode=READ_AT_SNAPSHOT") table_name = '%s.kudu_external_test' % unique_database if kudu_client.table_exists(table_name): kudu_client.delete_table(table_name)
