IMPALA-3788: Add flag for Kudu read-your-writes The previous attempt to support for Kudu 'read-your-writes' consistency successfully captured the latest observed ts from the Kudu client after a write, and to propagate it to future Kudu clients within the same session. That alone made writes within a session linearizable, but it did not fully address 'read-your-writes' semantics because the Kudu client in the KuduScanner needed further configuration.
The Kudu client exposes an option to set the 'ReadMode', which can be either READ_LATEST or READ_AT_SNAPSHOT. The former is the default and allows the client to read the latest known value for every row, and there is no consistency among the version of the rows read within that scan. When READ_AT_SNAPSHOT is enabled, the client will pick a ts that is after the latest observed session ts (propagated and set with SetLatestObservedTimestamp() by the previous commit for IMPALA-3788) and perform a snapshot read at that time. This timestamp is still determined per-client, so that does not mean that the entire query performs a snapshot read at the same timestamp-- doing that requires further work in Kudu and will require another change in Impala as well. That said, this behavior is sufficient to satisfy 'read-your-writes' consistency in all cases _except_ when a DML statement is reading and writing the same table, e.g. INSERT INTO foo SELECT ... from foo This case may result in reading rows that were inserted by a different node of the same query. This case will be handled when a global snapshot timestamp is supported and configured by Impala. Because this is performing a snapshot read, some rows may be read from lagging replicas and thus those replicas will have to wait before returning rows. This has implications for the query execution behavior (e.g. queries may be more likely to time out, may affect number of queries that can be run), so the behavior is not yet enabled by default. It can be enabled with the flag --kudu_read_mode READ_AT_SNAPSHOT The goal is to make this the default behavior after sufficient testing. Change-Id: I003aba410548bc9158d1e11abbdcf710c31a82ff Reviewed-on: http://gerrit.cloudera.org:8080/5288 Reviewed-by: Matthew Jacobs <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/0d4bdc1b Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/0d4bdc1b Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/0d4bdc1b Branch: refs/heads/hadoop-next Commit: 0d4bdc1b70464e71cd3dc44f6fbaf0aa619932e0 Parents: a65864a Author: Matthew Jacobs <[email protected]> Authored: Tue Nov 29 15:25:40 2016 -0800 Committer: Internal Jenkins <[email protected]> Committed: Wed Dec 7 05:01:01 2016 +0000 ---------------------------------------------------------------------- be/src/exec/kudu-scanner.cc | 11 +++++++++++ 1 file changed, 11 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d4bdc1b/be/src/exec/kudu-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc index c13b6a8..ff9ca27 100644 --- a/be/src/exec/kudu-scanner.cc +++ b/be/src/exec/kudu-scanner.cc @@ -20,6 +20,7 @@ #include <kudu/client/row_result.h> #include <thrift/protocol/TDebugProtocol.h> #include <vector> +#include <string> #include "exprs/expr.h" #include "exprs/expr-context.h" @@ -43,6 +44,9 @@ using kudu::client::KuduScanBatch; using kudu::client::KuduSchema; using kudu::client::KuduTable; +DEFINE_string(kudu_read_mode, "READ_LATEST", "(Advanced) Sets the Kudu scan ReadMode. " + "Supported Kudu read modes are READ_LATEST and READ_AT_SNAPSHOT. Invalid values " + "result in using READ_LATEST."); DEFINE_bool(pick_only_leaders_for_tests, false, "Whether to pick only leader replicas, for tests purposes only."); DEFINE_int32(kudu_scanner_keep_alive_period_sec, 15, @@ -53,6 +57,8 @@ DECLARE_int32(kudu_operation_timeout_ms); namespace impala { +const string MODE_READ_AT_SNAPSHOT = "READ_AT_SNAPSHOT"; + KuduScanner::KuduScanner(KuduScanNode* scan_node, RuntimeState* state) : scan_node_(scan_node), state_(state), @@ -132,6 +138,11 @@ Status KuduScanner::OpenNextScanToken(const string& scan_token) { KUDU_RETURN_IF_ERROR(scanner_->SetSelection(kudu::client::KuduClient::LEADER_ONLY), "Could not set replica selection."); } + kudu::client::KuduScanner::ReadMode mode = + MODE_READ_AT_SNAPSHOT.compare(FLAGS_kudu_read_mode) ? + kudu::client::KuduScanner::READ_AT_SNAPSHOT : + kudu::client::KuduScanner::READ_LATEST; + KUDU_RETURN_IF_ERROR(scanner_->SetReadMode(mode), "Could not set scanner ReadMode"); KUDU_RETURN_IF_ERROR(scanner_->SetTimeoutMillis(FLAGS_kudu_operation_timeout_ms), "Could not set scanner timeout");
