Repository: impala
Updated Branches:
  refs/heads/master a90b42911 -> 13b82624b


IMPALA-6812: Fix flaky Kudu scan tests

Many of our Kudu related tests have been flaky with the symptom that
scans appear to not return rows that were just inserted. This occurs
because our default Kudu scan level of READ_LATEST doesn't make any
consistency guarantees.

This patch adds a query option 'kudu_read_mode', which overrides the
startup flag of the same name, and then set that option to
READ_AT_SNAPSHOT for all tests with Kudu inserts and scans, which
should give us more consistent test results.

Testing:
- Passed a full exhaustive run. Does not appear to increase time to
  run by any significant amount.

Change-Id: I70df84f2cbc663107f2ad029565d3c15bdfbd47c
Reviewed-on: http://gerrit.cloudera.org:8080/10503
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/13b82624
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/13b82624
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/13b82624

Branch: refs/heads/master
Commit: 13b82624b58275469988147c33981446c86e4174
Parents: a90b429
Author: Thomas Tauber-Marshall <[email protected]>
Authored: Wed May 23 23:11:01 2018 +0000
Committer: Impala Public Jenkins <[email protected]>
Committed: Mon Jun 18 20:19:53 2018 +0000

----------------------------------------------------------------------
 be/src/exec/kudu-scanner.cc                | 14 +++++----
 be/src/exec/kudu-util.cc                   | 18 +++++++++++
 be/src/exec/kudu-util.h                    |  6 ++++
 be/src/service/query-options.cc            | 13 ++++++++
 be/src/service/query-options.h             |  3 +-
 be/src/util/debug-util.cc                  |  1 +
 be/src/util/debug-util.h                   |  1 +
 common/thrift/ImpalaInternalService.thrift | 10 +++++++
 common/thrift/ImpalaService.thrift         |  4 +++
 tests/common/test_dimensions.py            |  8 +++++
 tests/custom_cluster/test_kudu.py          |  8 +++++
 tests/metadata/test_ddl.py                 |  1 +
 tests/query_test/test_kudu.py              | 40 ++++++++++++++++++++++++-
 13 files changed, 119 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/13b82624/be/src/exec/kudu-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc
index e63a6fe..7353bf9 100644
--- a/be/src/exec/kudu-scanner.cc
+++ b/be/src/exec/kudu-scanner.cc
@@ -53,7 +53,8 @@ using kudu::client::KuduTable;
 using kudu::client::KuduValue;
 
 DEFINE_string(kudu_read_mode, "READ_LATEST", "(Advanced) Sets the Kudu scan 
ReadMode. "
-    "Supported Kudu read modes are READ_LATEST and READ_AT_SNAPSHOT.");
+    "Supported Kudu read modes are READ_LATEST and READ_AT_SNAPSHOT. Can be 
overridden "
+    "with the query option of the same name.");
 DEFINE_bool(pick_only_leaders_for_tests, false,
             "Whether to pick only leader replicas, for tests purposes only.");
 DEFINE_int32(kudu_scanner_keep_alive_period_sec, 15,
@@ -64,7 +65,6 @@ DECLARE_int32(kudu_operation_timeout_ms);
 
 namespace impala {
 
-const string MODE_READ_AT_SNAPSHOT = "READ_AT_SNAPSHOT";
 
 KuduScanner::KuduScanner(KuduScanNodeBase* scan_node, RuntimeState* state)
   : scan_node_(scan_node),
@@ -154,10 +154,12 @@ Status KuduScanner::OpenNextScanToken(const string& 
scan_token, bool* eos) {
     
KUDU_RETURN_IF_ERROR(scanner_->SetSelection(kudu::client::KuduClient::LEADER_ONLY),
         BuildErrorString("Could not set replica selection"));
   }
-  kudu::client::KuduScanner::ReadMode mode =
-      MODE_READ_AT_SNAPSHOT == FLAGS_kudu_read_mode ?
-          kudu::client::KuduScanner::READ_AT_SNAPSHOT :
-          kudu::client::KuduScanner::READ_LATEST;
+  kudu::client::KuduScanner::ReadMode mode;
+  RETURN_IF_ERROR(StringToKuduReadMode(FLAGS_kudu_read_mode, &mode));
+  if (state_->query_options().kudu_read_mode != TKuduReadMode::DEFAULT) {
+    RETURN_IF_ERROR(StringToKuduReadMode(
+        PrintThriftEnum(state_->query_options().kudu_read_mode), &mode));
+  }
   KUDU_RETURN_IF_ERROR(
       scanner_->SetReadMode(mode), BuildErrorString("Could not set scanner 
ReadMode"));
   
KUDU_RETURN_IF_ERROR(scanner_->SetTimeoutMillis(FLAGS_kudu_operation_timeout_ms),

http://git-wip-us.apache.org/repos/asf/impala/blob/13b82624/be/src/exec/kudu-util.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-util.cc b/be/src/exec/kudu-util.cc
index dd959e6..1751322 100644
--- a/be/src/exec/kudu-util.cc
+++ b/be/src/exec/kudu-util.cc
@@ -21,6 +21,7 @@
 #include <string>
 #include <sstream>
 
+#include <boost/algorithm/string.hpp>
 #include <kudu/client/callbacks.h>
 #include <kudu/client/schema.h>
 #include <kudu/common/partial_row.h>
@@ -33,6 +34,7 @@
 #include "runtime/timestamp-value.h"
 #include "runtime/timestamp-value.inline.h"
 
+using boost::algorithm::iequals;
 using kudu::client::KuduSchema;
 using kudu::client::KuduClient;
 using kudu::client::KuduClientBuilder;
@@ -46,6 +48,9 @@ DECLARE_int32(kudu_client_rpc_timeout_ms);
 
 namespace impala {
 
+const string MODE_READ_LATEST = "READ_LATEST";
+const string MODE_READ_AT_SNAPSHOT = "READ_AT_SNAPSHOT";
+
 bool KuduClientIsSupported() {
   // The value below means the client is actually a stubbed client. This 
should mean
   // that no official client exists for the underlying OS. The value below 
should match
@@ -287,4 +292,17 @@ Status CreateKuduValue(const ColumnType& col_type, void* 
value, KuduValue** out)
   return Status::OK();
 }
 
+Status StringToKuduReadMode(
+    const std::string& mode, kudu::client::KuduScanner::ReadMode* out) {
+  if (iequals(mode, MODE_READ_LATEST)) {
+    *out = kudu::client::KuduScanner::READ_LATEST;
+  } else if (iequals(mode, MODE_READ_AT_SNAPSHOT)) {
+    *out = kudu::client::KuduScanner::READ_AT_SNAPSHOT;
+  } else {
+    return Status(Substitute("Invalid kudu_read_mode '$0'. Valid values are 
READ_LATEST "
+        "and READ_AT_SNAPSHOT.", mode));
+  }
+  return Status::OK();
+}
+
 }  // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/13b82624/be/src/exec/kudu-util.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-util.h b/be/src/exec/kudu-util.h
index 8fe4d01..1d9a9bd 100644
--- a/be/src/exec/kudu-util.h
+++ b/be/src/exec/kudu-util.h
@@ -108,5 +108,11 @@ inline Status FromKuduStatus(
   return Status::Expected(err_msg);
 }
 
+/// Converts 'mode' to its equivalent ReadMode, stored in 'out'. Possible 
values for
+/// 'mode' are 'READ_LATEST' and 'READ_AT_SNAPSHOT'. If 'mode' is invalid, an 
error is
+/// returned.
+Status StringToKuduReadMode(
+    const std::string& mode, kudu::client::KuduScanner::ReadMode* out) 
WARN_UNUSED_RESULT;
+
 } /// namespace impala
 #endif

http://git-wip-us.apache.org/repos/asf/impala/blob/13b82624/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 0e71e95..218e4e6 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -679,6 +679,19 @@ Status impala::SetQueryOption(const string& key, const 
string& value,
         }
         break;
       }
+      case TImpalaQueryOptions::KUDU_READ_MODE: {
+        if (iequals(value, "DEFAULT") || iequals(value, "0")) {
+          query_options->__set_kudu_read_mode(TKuduReadMode::DEFAULT);
+        } else if (iequals(value, "READ_LATEST") || iequals(value, "1")) {
+          query_options->__set_kudu_read_mode(TKuduReadMode::READ_LATEST);
+        } else if (iequals(value, "READ_AT_SNAPSHOT") || iequals(value, "2")) {
+          query_options->__set_kudu_read_mode(TKuduReadMode::READ_AT_SNAPSHOT);
+        } else {
+          return Status(Substitute("Invalid kudu_read_mode '$0'. Valid values 
are "
+              "DEFAULT, READ_LATEST, and READ_AT_SNAPSHOT.", value));
+        }
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << 
key << "'";

http://git-wip-us.apache.org/repos/asf/impala/blob/13b82624/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 1b775c2..5c4f51d 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -41,7 +41,7 @@ typedef std::unordered_map<string, 
beeswax::TQueryOptionLevel::type>
 // the DCHECK.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::THREAD_RESERVATION_AGGREGATE_LIMIT + 1);\
+      TImpalaQueryOptions::KUDU_READ_MODE + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, 
ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS,\
@@ -137,6 +137,7 @@ typedef std::unordered_map<string, 
beeswax::TQueryOptionLevel::type>
       TQueryOptionLevel::REGULAR)\
   QUERY_OPT_FN(thread_reservation_aggregate_limit, 
THREAD_RESERVATION_AGGREGATE_LIMIT,\
       TQueryOptionLevel::REGULAR)\
+  QUERY_OPT_FN(kudu_read_mode, KUDU_READ_MODE, TQueryOptionLevel::ADVANCED)\
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query 
state.

http://git-wip-us.apache.org/repos/asf/impala/blob/13b82624/be/src/util/debug-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc
index eb55a7c..7a31748 100644
--- a/be/src/util/debug-util.cc
+++ b/be/src/util/debug-util.cc
@@ -68,6 +68,7 @@ PRINT_THRIFT_ENUM_IMPL(THdfsFileFormat)
 PRINT_THRIFT_ENUM_IMPL(THdfsSeqCompressionMode)
 PRINT_THRIFT_ENUM_IMPL(TImpalaQueryOptions)
 PRINT_THRIFT_ENUM_IMPL(TJoinDistributionMode)
+PRINT_THRIFT_ENUM_IMPL(TKuduReadMode)
 PRINT_THRIFT_ENUM_IMPL(TMetricKind)
 PRINT_THRIFT_ENUM_IMPL(TParquetArrayResolution)
 PRINT_THRIFT_ENUM_IMPL(TParquetFallbackSchemaResolution)

http://git-wip-us.apache.org/repos/asf/impala/blob/13b82624/be/src/util/debug-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/debug-util.h b/be/src/util/debug-util.h
index 813f460..0978e50 100644
--- a/be/src/util/debug-util.h
+++ b/be/src/util/debug-util.h
@@ -57,6 +57,7 @@ std::string PrintThriftEnum(const THdfsFileFormat::type& 
value);
 std::string PrintThriftEnum(const THdfsSeqCompressionMode::type& value);
 std::string PrintThriftEnum(const TImpalaQueryOptions::type& value);
 std::string PrintThriftEnum(const TJoinDistributionMode::type& value);
+std::string PrintThriftEnum(const TKuduReadMode::type& value);
 std::string PrintThriftEnum(const TMetricKind::type& value);
 std::string PrintThriftEnum(const TParquetArrayResolution::type& value);
 std::string PrintThriftEnum(const TParquetFallbackSchemaResolution::type& 
value);

http://git-wip-us.apache.org/repos/asf/impala/blob/13b82624/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift 
b/common/thrift/ImpalaInternalService.thrift
index cb33800..96314d5 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -63,6 +63,13 @@ enum TJoinDistributionMode {
   SHUFFLE
 }
 
+// Consistency level options for Kudu scans.
+enum TKuduReadMode {
+  DEFAULT,
+  READ_LATEST,
+  READ_AT_SNAPSHOT
+}
+
 // Query options that correspond to ImpalaService.ImpalaQueryOptions, with 
their
 // respective defaults. Query options can be set in the following ways:
 //
@@ -288,6 +295,9 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift.
   67: optional i32 thread_reservation_aggregate_limit = 0;
+
+  // See comment in ImpalaService.thrift.
+  68: optional TKuduReadMode kudu_read_mode = TKuduReadMode.DEFAULT;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2

http://git-wip-us.apache.org/repos/asf/impala/blob/13b82624/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift 
b/common/thrift/ImpalaService.thrift
index 5b01f61..91b4de8 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -313,6 +313,10 @@ enum TImpalaQueryOptions {
   // across all backends for the query exceeds this number. 0 or -1 means this 
has no
   // effect.
   THREAD_RESERVATION_AGGREGATE_LIMIT,
+
+  // Overrides the -kudu_read_mode flag to set the consistency level for Kudu 
scans.
+  // Possible values are DEFAULT, READ_LATEST, and READ_AT_SNAPSHOT.
+  KUDU_READ_MODE,
 }
 
 // The summary of a DML statement.

http://git-wip-us.apache.org/repos/asf/impala/blob/13b82624/tests/common/test_dimensions.py
----------------------------------------------------------------------
diff --git a/tests/common/test_dimensions.py b/tests/common/test_dimensions.py
index a9ba7a8..785cfa9 100644
--- a/tests/common/test_dimensions.py
+++ b/tests/common/test_dimensions.py
@@ -182,6 +182,14 @@ def 
create_exec_option_dimension_from_dict(exec_option_dimensions):
   # Build a test vector out of it
   return ImpalaTestDimension('exec_option', *exec_option_dimension_values)
 
+def add_exec_option_dimension(test_suite, key, value):
+  """
+  Takes an ImpalaTestSuite object 'test_suite' and adds 'key=value' to every 
exec option
+  test dimension, leaving the number of tests that will be run unchanged.
+  """
+  for v in test_suite.ImpalaTestMatrix.dimensions["exec_option"]:
+    v.value[key] = value
+
 def extend_exec_option_dimension(test_suite, key, value):
   """
   Takes an ImpalaTestSuite object 'test_suite' and extends the exec option 
test dimension

http://git-wip-us.apache.org/repos/asf/impala/blob/13b82624/tests/custom_cluster/test_kudu.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_kudu.py 
b/tests/custom_cluster/test_kudu.py
index 7563236..d79703d 100644
--- a/tests/custom_cluster/test_kudu.py
+++ b/tests/custom_cluster/test_kudu.py
@@ -21,6 +21,7 @@ from kudu.schema import INT32
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.kudu_test_suite import KuduTestSuite
+from tests.common.test_dimensions import add_exec_option_dimension
 
 KUDU_MASTER_HOSTS = pytest.config.option.kudu_master_hosts
 LOG = logging.getLogger(__name__)
@@ -31,6 +32,13 @@ class TestKuduOperations(CustomClusterTestSuite, 
KuduTestSuite):
   def get_workload(cls):
     return 'functional-query'
 
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestKuduOperations, cls).add_test_dimensions()
+    # The default read mode of READ_LATEST does not provide high enough 
consistency for
+    # these tests.
+    add_exec_option_dimension(cls, "kudu_read_mode", "READ_AT_SNAPSHOT")
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(impalad_args=\
       "--use_local_tz_for_unix_timestamp_conversions=true")

http://git-wip-us.apache.org/repos/asf/impala/blob/13b82624/tests/metadata/test_ddl.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py
index bcafd8e..39f7f3c 100644
--- a/tests/metadata/test_ddl.py
+++ b/tests/metadata/test_ddl.py
@@ -259,6 +259,7 @@ class TestDdlStatements(TestDdlBase):
   @UniqueDatabase.parametrize(sync_ddl=True)
   def test_create_kudu(self, vector, unique_database):
     vector.get_value('exec_option')['abort_on_error'] = False
+    vector.get_value('exec_option')['kudu_read_mode'] = "READ_AT_SNAPSHOT"
     self.run_test_case('QueryTest/kudu_create', vector, use_db=unique_database,
         multiple_impalad=self._use_multiple_impalad(vector))
 

http://git-wip-us.apache.org/repos/asf/impala/blob/13b82624/tests/query_test/test_kudu.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py
index d6a0d2a..3efea37 100644
--- a/tests/query_test/test_kudu.py
+++ b/tests/query_test/test_kudu.py
@@ -40,6 +40,7 @@ from pytz import utc
 from tests.common.kudu_test_suite import KuduTestSuite
 from tests.common.impala_cluster import ImpalaCluster
 from tests.common.skip import SkipIfNotHdfsMinicluster
+from tests.common.test_dimensions import add_exec_option_dimension
 from tests.verifiers.metric_verifier import MetricVerifier
 
 KUDU_MASTER_HOSTS = pytest.config.option.kudu_master_hosts
@@ -51,6 +52,13 @@ class TestKuduOperations(KuduTestSuite):
   This suite tests the different modification operations when using a kudu 
table.
   """
 
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestKuduOperations, cls).add_test_dimensions()
+    # The default read mode of READ_LATEST does not provide high enough 
consistency for
+    # these tests.
+    add_exec_option_dimension(cls, "kudu_read_mode", "READ_AT_SNAPSHOT")
+
   def test_out_of_range_timestamps(self, vector, cursor, kudu_client, 
unique_database):
     """Test timestamp values that are outside of Impala's supported date 
range."""
     cursor.execute("""CREATE TABLE %s.times (a INT PRIMARY KEY, ts TIMESTAMP)
@@ -299,6 +307,7 @@ class TestKuduOperations(KuduTestSuite):
 
   def test_kudu_col_removed(self, cursor, kudu_client, unique_database):
     """Test removing a Kudu column outside of Impala."""
+    cursor.execute("set kudu_read_mode=READ_AT_SNAPSHOT")
     cursor.execute("""CREATE TABLE %s.foo (a INT PRIMARY KEY, s STRING)
         PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""" % unique_database)
     assert kudu_client.table_exists(
@@ -366,7 +375,7 @@ class TestKuduOperations(KuduTestSuite):
     table_name = "%s.storage_attrs" % unique_database
     types = ['boolean', 'tinyint', 'smallint', 'int', 'bigint', 'float', 
'double', \
         'string', 'timestamp', 'decimal']
-
+    cursor.execute("set kudu_read_mode=READ_AT_SNAPSHOT")
     create_query = "create table %s (id int primary key" % table_name
     for t in types:
       create_query += ", %s_col %s" % (t, t)
@@ -436,6 +445,33 @@ class TestKuduOperations(KuduTestSuite):
         or "Column col1 has unexpected type." in msg \
         or "Client provided column col1[int64 NULLABLE] not present in tablet" 
in msg
 
+  def _retry_query(self, cursor, query, expected):
+    retries = 0
+    while retries < 3:
+      cursor.execute(query)
+      result = cursor.fetchall()
+      if result == expected:
+        break
+      retries += 1
+      time.sleep(1)
+    assert retries < 3, \
+        "Did not get a correct result for %s after 3 retries: %s" % (query, 
result)
+
+  def test_read_modes(self, cursor, unique_database):
+    """Other Kudu tests are run with a scan level of READ_AT_SNAPSHOT to have 
predicable
+    scan results. This test verifies that scans work as expected at the scan 
level of
+    READ_LATEST by retrying the scan if the results are incorrect."""
+    table_name = "%s.test_read_latest" % unique_database
+    cursor.execute("set kudu_read_mode=READ_LATEST")
+    cursor.execute("""create table %s (a int primary key, b string) partition 
by hash(a)
+    partitions 8 stored as kudu""" % table_name)
+    cursor.execute("insert into %s values (0, 'a'), (1, 'b'), (2, 'c')" % 
table_name)
+    self._retry_query(cursor, "select * from %s order by a" % table_name,
+        [(0, 'a'), (1, 'b'), (2, 'c')])
+    cursor.execute("""insert into %s select id, string_col from 
functional.alltypes
+    where id > 2 limit 100""" % table_name)
+    self._retry_query(cursor, "select count(*) from %s" % table_name, [(103,)])
+
 class TestCreateExternalTable(KuduTestSuite):
 
   def test_external_timestamp_default_value(self, cursor, kudu_client, 
unique_database):
@@ -606,6 +642,7 @@ class TestCreateExternalTable(KuduTestSuite):
        unbounded partition). It is not possible to create such a table in 
Impala, but
        it can be created directly in Kudu and then loaded as an external table.
        Regression test for IMPALA-5154."""
+    cursor.execute("set kudu_read_mode=READ_AT_SNAPSHOT")
     schema_builder = SchemaBuilder()
     column_spec = schema_builder.add_column("id", INT64)
     column_spec.nullable(False)
@@ -637,6 +674,7 @@ class TestCreateExternalTable(KuduTestSuite):
   def test_column_name_case(self, cursor, kudu_client, unique_database):
     """IMPALA-5286: Tests that an external Kudu table that was created with a 
column name
        containing upper case letters is handled correctly."""
+    cursor.execute("set kudu_read_mode=READ_AT_SNAPSHOT")
     table_name = '%s.kudu_external_test' % unique_database
     if kudu_client.table_exists(table_name):
       kudu_client.delete_table(table_name)

Reply via email to