Repository: kudu Updated Branches: refs/heads/master 726514abf -> 5047f091d
KUDU-1704: add c++ client support for READ_YOUR_WRITES mode Change-Id: I34214245a78aed172a28fbdb395ff5bccd0fc0e1 Reviewed-on: http://gerrit.cloudera.org:8080/8823 Reviewed-by: David Ribeiro Alves <[email protected]> Tested-by: Hao Hao <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/5047f091 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/5047f091 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/5047f091 Branch: refs/heads/master Commit: 5047f091d312698b5e1d2d9db75fbccfde24f687 Parents: 726514a Author: hahao <[email protected]> Authored: Tue Dec 12 16:35:49 2017 -0800 Committer: Hao Hao <[email protected]> Committed: Sun Mar 4 20:24:43 2018 +0000 ---------------------------------------------------------------------- src/kudu/client/client-test.cc | 161 ++++++++++++------- src/kudu/client/client.cc | 16 +- src/kudu/client/client.h | 14 +- src/kudu/client/scan_configuration.cc | 5 + src/kudu/client/scan_configuration.h | 19 +++ src/kudu/client/scan_token-internal.cc | 25 ++- src/kudu/client/scan_token-test.cc | 78 ++++++--- src/kudu/client/scanner-internal.cc | 36 ++++- src/kudu/integration-tests/consistency-itest.cc | 143 +++++++++++++++- .../integration-tests/delete_table-itest.cc | 5 +- 10 files changed, 406 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/5047f091/src/kudu/client/client-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc index 4a44761..531b5b1 100644 --- a/src/kudu/client/client-test.cc +++ b/src/kudu/client/client-test.cc @@ -537,14 +537,17 @@ class ClientTest : public KuduTest { } int CountRowsFromClient(KuduTable* table) { - return CountRowsFromClient(table, kNoBound, kNoBound); + return CountRowsFromClient(table, KuduScanner::READ_LATEST, kNoBound, kNoBound); } - int CountRowsFromClient(KuduTable* table, int32_t lower_bound, int32_t upper_bound) { - return CountRowsFromClient(table, KuduClient::LEADER_ONLY, lower_bound, upper_bound); + int CountRowsFromClient(KuduTable* table, KuduScanner::ReadMode scan_mode, + int32_t lower_bound, int32_t upper_bound) { + return CountRowsFromClient(table, KuduClient::LEADER_ONLY, scan_mode, + lower_bound, upper_bound); } int CountRowsFromClient(KuduTable* table, KuduClient::ReplicaSelection selection, + KuduScanner::ReadMode scan_mode, int32_t lower_bound, int32_t upper_bound) { KuduScanner scanner(table); CHECK_OK(scanner.SetSelection(selection)); @@ -559,7 +562,7 @@ class ClientTest : public KuduTest { client_table_->NewComparisonPredicate("key", KuduPredicate::LESS_EQUAL, KuduValue::FromInt(upper_bound)))); } - + CHECK_OK(scanner.SetReadMode(scan_mode)); CHECK_OK(scanner.Open()); int count = 0; @@ -865,7 +868,19 @@ TEST_F(ClientTest, TestScanAtFutureTimestamp) { ASSERT_STR_CONTAINS(s.ToString(), "in the future."); } -TEST_F(ClientTest, TestScanMultiTablet) { +const KuduScanner::ReadMode read_modes[] = { + KuduScanner::READ_LATEST, + KuduScanner::READ_AT_SNAPSHOT, + KuduScanner::READ_YOUR_WRITES, +}; + +class ScanMultiTabletParamTest : + public ClientTest, + public ::testing::WithParamInterface<KuduScanner::ReadMode> { +}; +// Tests multiple tablet scan with all scan modes. +TEST_P(ScanMultiTabletParamTest, Test) { + const KuduScanner::ReadMode read_mode = GetParam(); // 5 tablets, each with 10 rows worth of space. static const int kTabletsNum = 5; static const int kRowsPerTablet = 10; @@ -900,18 +915,17 @@ TEST_F(ClientTest, TestScanMultiTablet) { } FlushSessionOrDie(session); - // Run through various scans. ASSERT_EQ(4 * (kTabletsNum - 1), - CountRowsFromClient(table.get(), kNoBound, kNoBound)); - ASSERT_EQ(3, CountRowsFromClient(table.get(), kNoBound, 15)); - ASSERT_EQ(9, CountRowsFromClient(table.get(), 27, kNoBound)); - ASSERT_EQ(3, CountRowsFromClient(table.get(), 0, 15)); - ASSERT_EQ(0, CountRowsFromClient(table.get(), 0, 10)); - ASSERT_EQ(4, CountRowsFromClient(table.get(), 0, 20)); - ASSERT_EQ(8, CountRowsFromClient(table.get(), 0, 30)); - ASSERT_EQ(6, CountRowsFromClient(table.get(), 14, 30)); - ASSERT_EQ(0, CountRowsFromClient(table.get(), 30, 30)); - ASSERT_EQ(0, CountRowsFromClient(table.get(), kTabletsNum * kRowsPerTablet, + CountRowsFromClient(table.get(), read_mode, kNoBound, kNoBound)); + ASSERT_EQ(3, CountRowsFromClient(table.get(), read_mode, kNoBound, 15)); + ASSERT_EQ(9, CountRowsFromClient(table.get(), read_mode, 27, kNoBound)); + ASSERT_EQ(3, CountRowsFromClient(table.get(), read_mode, 0, 15)); + ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, 0, 10)); + ASSERT_EQ(4, CountRowsFromClient(table.get(), read_mode, 0, 20)); + ASSERT_EQ(8, CountRowsFromClient(table.get(), read_mode, 0, 30)); + ASSERT_EQ(6, CountRowsFromClient(table.get(), read_mode, 14, 30)); + ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, 30, 30)); + ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, kTabletsNum * kRowsPerTablet, kNoBound)); // Update every other row @@ -926,16 +940,16 @@ TEST_F(ClientTest, TestScanMultiTablet) { // Check all counts the same (make sure updates don't change # of rows) ASSERT_EQ(4 * (kTabletsNum - 1), - CountRowsFromClient(table.get(), kNoBound, kNoBound)); - ASSERT_EQ(3, CountRowsFromClient(table.get(), kNoBound, 15)); - ASSERT_EQ(9, CountRowsFromClient(table.get(), 27, kNoBound)); - ASSERT_EQ(3, CountRowsFromClient(table.get(), 0, 15)); - ASSERT_EQ(0, CountRowsFromClient(table.get(), 0, 10)); - ASSERT_EQ(4, CountRowsFromClient(table.get(), 0, 20)); - ASSERT_EQ(8, CountRowsFromClient(table.get(), 0, 30)); - ASSERT_EQ(6, CountRowsFromClient(table.get(), 14, 30)); - ASSERT_EQ(0, CountRowsFromClient(table.get(), 30, 30)); - ASSERT_EQ(0, CountRowsFromClient(table.get(), kTabletsNum * kRowsPerTablet, + CountRowsFromClient(table.get(), read_mode, kNoBound, kNoBound)); + ASSERT_EQ(3, CountRowsFromClient(table.get(), read_mode, kNoBound, 15)); + ASSERT_EQ(9, CountRowsFromClient(table.get(), read_mode, 27, kNoBound)); + ASSERT_EQ(3, CountRowsFromClient(table.get(), read_mode, 0, 15)); + ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, 0, 10)); + ASSERT_EQ(4, CountRowsFromClient(table.get(), read_mode, 0, 20)); + ASSERT_EQ(8, CountRowsFromClient(table.get(), read_mode, 0, 30)); + ASSERT_EQ(6, CountRowsFromClient(table.get(), read_mode, 14, 30)); + ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, 30, 30)); + ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, kTabletsNum * kRowsPerTablet, kNoBound)); // Delete half the rows @@ -950,16 +964,16 @@ TEST_F(ClientTest, TestScanMultiTablet) { // Check counts changed accordingly ASSERT_EQ(2 * (kTabletsNum - 1), - CountRowsFromClient(table.get(), kNoBound, kNoBound)); - ASSERT_EQ(2, CountRowsFromClient(table.get(), kNoBound, 15)); - ASSERT_EQ(4, CountRowsFromClient(table.get(), 27, kNoBound)); - ASSERT_EQ(2, CountRowsFromClient(table.get(), 0, 15)); - ASSERT_EQ(0, CountRowsFromClient(table.get(), 0, 10)); - ASSERT_EQ(2, CountRowsFromClient(table.get(), 0, 20)); - ASSERT_EQ(4, CountRowsFromClient(table.get(), 0, 30)); - ASSERT_EQ(2, CountRowsFromClient(table.get(), 14, 30)); - ASSERT_EQ(0, CountRowsFromClient(table.get(), 30, 30)); - ASSERT_EQ(0, CountRowsFromClient(table.get(), kTabletsNum * kRowsPerTablet, + CountRowsFromClient(table.get(), read_mode, kNoBound, kNoBound)); + ASSERT_EQ(2, CountRowsFromClient(table.get(), read_mode, kNoBound, 15)); + ASSERT_EQ(4, CountRowsFromClient(table.get(), read_mode, 27, kNoBound)); + ASSERT_EQ(2, CountRowsFromClient(table.get(), read_mode, 0, 15)); + ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, 0, 10)); + ASSERT_EQ(2, CountRowsFromClient(table.get(), read_mode, 0, 20)); + ASSERT_EQ(4, CountRowsFromClient(table.get(), read_mode, 0, 30)); + ASSERT_EQ(2, CountRowsFromClient(table.get(), read_mode, 14, 30)); + ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, 30, 30)); + ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, kTabletsNum * kRowsPerTablet, kNoBound)); // Delete rest of rows @@ -973,18 +987,20 @@ TEST_F(ClientTest, TestScanMultiTablet) { FlushSessionOrDie(session); // Check counts changed accordingly - ASSERT_EQ(0, CountRowsFromClient(table.get(), kNoBound, kNoBound)); - ASSERT_EQ(0, CountRowsFromClient(table.get(), kNoBound, 15)); - ASSERT_EQ(0, CountRowsFromClient(table.get(), 27, kNoBound)); - ASSERT_EQ(0, CountRowsFromClient(table.get(), 0, 15)); - ASSERT_EQ(0, CountRowsFromClient(table.get(), 0, 10)); - ASSERT_EQ(0, CountRowsFromClient(table.get(), 0, 20)); - ASSERT_EQ(0, CountRowsFromClient(table.get(), 0, 30)); - ASSERT_EQ(0, CountRowsFromClient(table.get(), 14, 30)); - ASSERT_EQ(0, CountRowsFromClient(table.get(), 30, 30)); - ASSERT_EQ(0, CountRowsFromClient(table.get(), kTabletsNum * kRowsPerTablet, + ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, kNoBound, kNoBound)); + ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, kNoBound, 15)); + ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, 27, kNoBound)); + ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, 0, 15)); + ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, 0, 10)); + ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, 0, 20)); + ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, 0, 30)); + ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, 14, 30)); + ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, 30, 30)); + ASSERT_EQ(0, CountRowsFromClient(table.get(), read_mode, kTabletsNum * kRowsPerTablet, kNoBound)); } +INSTANTIATE_TEST_CASE_P(Params, ScanMultiTabletParamTest, + testing::ValuesIn(read_modes)); TEST_F(ClientTest, TestScanEmptyTable) { KuduScanner scanner(client_table_.get()); @@ -1191,6 +1207,26 @@ TEST_F(ClientTest, TestRowPtrNoRedaction) { } } +TEST_F(ClientTest, TestScanYourWrites) { + // Insert the rows + ASSERT_NO_FATAL_FAILURE(InsertTestRows(client_table_.get(), + FLAGS_test_scan_num_rows)); + + // Verify that no matter which replica is selected, client could + // achieve read-your-writes/read-your-reads. + uint64_t count = CountRowsFromClient(client_table_.get(), + KuduClient::LEADER_ONLY, + KuduScanner::READ_YOUR_WRITES, + kNoBound, kNoBound); + ASSERT_EQ(FLAGS_test_scan_num_rows, count); + + count = CountRowsFromClient(client_table_.get(), + KuduClient::CLOSEST_REPLICA, + KuduScanner::READ_YOUR_WRITES, + kNoBound, kNoBound); + ASSERT_EQ(FLAGS_test_scan_num_rows, count); +} + namespace internal { static void ReadBatchToStrings(KuduScanner* scanner, vector<string>* rows) { @@ -3816,6 +3852,7 @@ TEST_F(ClientTest, TestReplicatedMultiTabletTableFailover) { tries++; int num_rows = CountRowsFromClient(table.get(), KuduClient::LEADER_ONLY, + KuduScanner::READ_LATEST, kNoBound, kNoBound); int master_rpcs = CountMasterLookupRPCs() - master_rpcs_before; @@ -3891,6 +3928,7 @@ TEST_F(ClientTest, TestReplicatedTabletWritesWithLeaderElection) { LOG(INFO) << "Counting rows..."; ASSERT_EQ(2 * kNumRowsToWrite, CountRowsFromClient(table.get(), KuduClient::FIRST_REPLICA, + KuduScanner::READ_LATEST, kNoBound, kNoBound)); } @@ -4477,9 +4515,14 @@ TEST_F(ClientTest, TestInsertEmptyPK) { ASSERT_EQ("<none>", ReadRowAsString()); } +class LatestObservedTimestampParamTest : + public ClientTest, + public ::testing::WithParamInterface<KuduScanner::ReadMode> { +}; // Check the behavior of the latest observed timestamp when performing // write and read operations. -TEST_F(ClientTest, TestLatestObservedTimestamp) { +TEST_P(LatestObservedTimestampParamTest, Test) { + const KuduScanner::ReadMode read_mode = GetParam(); // Check that a write updates the latest observed timestamp. const uint64_t ts0 = client_->GetLatestObservedTimestamp(); ASSERT_EQ(KuduClient::kNoTimestamp, ts0); @@ -4500,26 +4543,28 @@ TEST_F(ClientTest, TestLatestObservedTimestamp) { if (c != client_) { // Check that the new client has no latest observed timestamp. ASSERT_EQ(KuduClient::kNoTimestamp, c->GetLatestObservedTimestamp()); + // The observed timestamp may not move forward when scan in + // READ_YOUR_WRITES mode by another client. Since other client + // does not have the same propagation timestamp bound and the + // chosen snapshot timestamp is returned as the new propagation + // timestamp. + if (read_mode == KuduScanner::READ_YOUR_WRITES) break; } shared_ptr<KuduTable> table; ASSERT_OK(c->OpenTable(client_table_->name(), &table)); - static const KuduScanner::ReadMode kReadModes[] = { - KuduScanner::READ_AT_SNAPSHOT, - KuduScanner::READ_LATEST, - }; - for (auto read_mode : kReadModes) { - KuduScanner scanner(table.get()); - ASSERT_OK(scanner.SetReadMode(read_mode)); - if (read_mode == KuduScanner::READ_AT_SNAPSHOT) { - ASSERT_OK(scanner.SetSnapshotRaw(ts1)); - } - ASSERT_OK(scanner.Open()); + KuduScanner scanner(table.get()); + ASSERT_OK(scanner.SetReadMode(read_mode)); + if (read_mode == KuduScanner::READ_AT_SNAPSHOT) { + ASSERT_OK(scanner.SetSnapshotRaw(ts1)); } + ASSERT_OK(scanner.Open()); const uint64_t ts = c->GetLatestObservedTimestamp(); ASSERT_LT(latest_ts, ts); latest_ts = ts; } } +INSTANTIATE_TEST_CASE_P(Params, LatestObservedTimestampParamTest, + testing::ValuesIn(read_modes)); // Insert bunch of rows, delete a row, and then insert the row back. // Run scans several scan and check the results are consistent with the http://git-wip-us.apache.org/repos/asf/kudu/blob/5047f091/src/kudu/client/client.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc index 93431b7..e067c04 100644 --- a/src/kudu/client/client.cc +++ b/src/kudu/client/client.cc @@ -136,7 +136,7 @@ MAKE_ENUM_LIMITS(kudu::client::KuduSession::ExternalConsistencyMode, MAKE_ENUM_LIMITS(kudu::client::KuduScanner::ReadMode, kudu::client::KuduScanner::READ_LATEST, - kudu::client::KuduScanner::READ_AT_SNAPSHOT); + kudu::client::KuduScanner::READ_YOUR_WRITES); MAKE_ENUM_LIMITS(kudu::client::KuduScanner::OrderMode, kudu::client::KuduScanner::UNORDERED, @@ -1365,6 +1365,20 @@ Status KuduScanner::Open() { return Status::OK(); } + // For READ_YOUR_WRITES scan mode, get the latest observed timestamp and store it + // to scan config. Always use this one as propagation timestamp for the duration + // of the scan to avoid unnecessarily wait. + if (data_->configuration().read_mode() == READ_YOUR_WRITES) { + const uint64_t lo_ts = data_->table_->client()->data_->GetLatestObservedTimestamp(); + data_->mutable_configuration()->SetScanLowerBoundTimestampRaw(lo_ts); + } + + if (data_->configuration().read_mode() != READ_AT_SNAPSHOT && + data_->configuration().has_snapshot_timestamp()) { + return Status::InvalidArgument("Snapshot timestamp should only be configured " + "for READ_AT_SNAPSHOT scan mode."); + } + VLOG(2) << "Beginning " << data_->DebugString(); MonoTime deadline = MonoTime::Now() + data_->configuration().timeout(); http://git-wip-us.apache.org/repos/asf/kudu/blob/5047f091/src/kudu/client/client.h ---------------------------------------------------------------------- diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h index a9d09cf..dadf8dc 100644 --- a/src/kudu/client/client.h +++ b/src/kudu/client/client.h @@ -1735,7 +1735,19 @@ class KUDU_EXPORT KuduScanner { /// by which writes are sometimes not externally consistent even when /// action was taken to make them so. In these cases Isolation may /// degenerate to mode "Read Committed". See KUDU-430. - READ_AT_SNAPSHOT + READ_AT_SNAPSHOT, + + /// When @c READ_YOUR_WRITES is specified, the client will perform a read + /// such that it follows all previously known writes and reads from this client. + /// Specifically this mode: + /// (1) ensures read-your-writes and read-your-reads session guarantees, + /// (2) minimizes latency caused by waiting for outstanding write + /// transactions to complete. + /// + /// Reads in this mode are not repeatable: two READ_YOUR_WRITES reads, even if + /// they provide the same propagated timestamp bound, can execute at different + /// timestamps and thus return different results. + READ_YOUR_WRITES }; /// Whether the rows should be returned in order. http://git-wip-us.apache.org/repos/asf/kudu/blob/5047f091/src/kudu/client/scan_configuration.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/scan_configuration.cc b/src/kudu/client/scan_configuration.cc index fe751be..addb2e0 100644 --- a/src/kudu/client/scan_configuration.cc +++ b/src/kudu/client/scan_configuration.cc @@ -52,6 +52,7 @@ ScanConfiguration::ScanConfiguration(KuduTable* table) read_mode_(KuduScanner::READ_LATEST), is_fault_tolerant_(false), snapshot_timestamp_(kNoTimestamp), + lower_bound_propagation_timestamp_(kNoTimestamp), timeout_(MonoDelta::FromMilliseconds(KuduScanner::kScanTimeoutMillis)), arena_(256), row_format_flags_(KuduScanner::NO_FLAGS) { @@ -180,6 +181,10 @@ void ScanConfiguration::SetSnapshotRaw(uint64_t snapshot_timestamp) { snapshot_timestamp_ = snapshot_timestamp; } +void ScanConfiguration::SetScanLowerBoundTimestampRaw(uint64_t propagation_timestamp) { + lower_bound_propagation_timestamp_ = propagation_timestamp; +} + void ScanConfiguration::SetTimeoutMillis(int millis) { timeout_ = MonoDelta::FromMilliseconds(millis); } http://git-wip-us.apache.org/repos/asf/kudu/blob/5047f091/src/kudu/client/scan_configuration.h ---------------------------------------------------------------------- diff --git a/src/kudu/client/scan_configuration.h b/src/kudu/client/scan_configuration.h index 26c1bcc..4a1e5e1 100644 --- a/src/kudu/client/scan_configuration.h +++ b/src/kudu/client/scan_configuration.h @@ -82,10 +82,18 @@ class ScanConfiguration { Status SetFaultTolerant(bool fault_tolerant) WARN_UNUSED_RESULT; + // Sets the timestamp the scan must be executed at, in microseconds + // since the Unix epoch. Requires READ_AT_SNAPSHOT scan mode. void SetSnapshotMicros(uint64_t snapshot_timestamp_micros); + // Sets a previously encoded timestamp as a snapshot timestamp. + // Requires READ_AT_SNAPSHOT scan mode. void SetSnapshotRaw(uint64_t snapshot_timestamp); + // Set the lower bound of scan's propagation timestamp. + // It is only used in READ_YOUR_WRITES scan mode. + void SetScanLowerBoundTimestampRaw(uint64_t propagation_timestamp); + void SetTimeoutMillis(int millis); Status SetRowFormatFlags(uint64_t flags); @@ -144,6 +152,15 @@ class ScanConfiguration { return snapshot_timestamp_; } + bool has_lower_bound_propagation_timestamp() const { + return lower_bound_propagation_timestamp_ != kNoTimestamp; + } + + uint64_t lower_bound_propagation_timestamp() const { + CHECK(has_lower_bound_propagation_timestamp()); + return lower_bound_propagation_timestamp_; + } + const MonoDelta& timeout() const { return timeout_; } @@ -184,6 +201,8 @@ class ScanConfiguration { uint64_t snapshot_timestamp_; + uint64_t lower_bound_propagation_timestamp_; + MonoDelta timeout_; // Manages interior allocations for the scan spec and copied bounds. http://git-wip-us.apache.org/repos/asf/kudu/blob/5047f091/src/kudu/client/scan_token-internal.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/scan_token-internal.cc b/src/kudu/client/scan_token-internal.cc index de36be6..beb43ad 100644 --- a/src/kudu/client/scan_token-internal.cc +++ b/src/kudu/client/scan_token-internal.cc @@ -19,7 +19,6 @@ #include <cstdint> #include <memory> -#include <ostream> #include <string> #include <utility> #include <vector> @@ -164,6 +163,9 @@ Status KuduScanToken::Data::PBIntoScanner(KuduClient* client, case ReadMode::READ_AT_SNAPSHOT: RETURN_NOT_OK(scan_builder->SetReadMode(KuduScanner::READ_AT_SNAPSHOT)); break; + case ReadMode::READ_YOUR_WRITES: + RETURN_NOT_OK(scan_builder->SetReadMode(KuduScanner::READ_YOUR_WRITES)); + break; default: return Status::InvalidArgument("scan token has unrecognized read mode"); } @@ -179,6 +181,16 @@ Status KuduScanToken::Data::PBIntoScanner(KuduClient* client, RETURN_NOT_OK(scan_builder->SetCacheBlocks(message.cache_blocks())); + // Since the latest observed timestamp from the given client might be + // more recent than the one when the token is created, the performance + // of the scan could be affected if using READ_YOUR_WRITES mode. + // + // We choose to keep it this way because the code path is simpler. + // Beside, in practice it's very rarely the case that an active client + // is permanently being written to and read from (using scan tokens). + // + // However it is worth to note that this is a possible optimization, if + // we ever notice READ_YOUR_WRITES read stalling with scan tokens. if (message.has_propagated_timestamp()) { client->data_->UpdateLatestObservedTimestamp(message.propagated_timestamp()); } @@ -234,8 +246,8 @@ Status KuduScanTokenBuilder::Data::Build(vector<KuduScanToken*>* tokens) { case KuduScanner::READ_LATEST: pb.set_read_mode(kudu::READ_LATEST); if (configuration_.has_snapshot_timestamp()) { - LOG(WARNING) << "Ignoring snapshot timestamp since not in " - "READ_AT_SNAPSHOT mode."; + return Status::InvalidArgument("Snapshot timestamp should only be configured " + "for READ_AT_SNAPSHOT scan mode."); } break; case KuduScanner::READ_AT_SNAPSHOT: @@ -244,6 +256,13 @@ Status KuduScanTokenBuilder::Data::Build(vector<KuduScanToken*>* tokens) { pb.set_snap_timestamp(configuration_.snapshot_timestamp()); } break; + case KuduScanner::READ_YOUR_WRITES: + pb.set_read_mode(kudu::READ_YOUR_WRITES); + if (configuration_.has_snapshot_timestamp()) { + return Status::InvalidArgument("Snapshot timestamp should only be configured " + "for READ_AT_SNAPSHOT scan mode."); + } + break; default: LOG(FATAL) << Substitute("$0: unexpected read mode", read_mode); } http://git-wip-us.apache.org/repos/asf/kudu/blob/5047f091/src/kudu/client/scan_token-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/scan_token-test.cc b/src/kudu/client/scan_token-test.cc index 80d1179..bc6bc07 100644 --- a/src/kudu/client/scan_token-test.cc +++ b/src/kudu/client/scan_token-test.cc @@ -194,6 +194,28 @@ TEST_F(ScanTokenTest, TestScanTokens) { ASSERT_EQ(0, scanner->data_->last_response_.data().num_rows()); } + { // no predicates with READ_YOUR_WRITES mode + vector<KuduScanToken*> tokens; + ElementDeleter deleter(&tokens); + KuduScanTokenBuilder builder(table.get()); + ASSERT_OK(builder.SetReadMode(KuduScanner::READ_YOUR_WRITES)); + ASSERT_OK(builder.Build(&tokens)); + + ASSERT_EQ(8, tokens.size()); + ASSERT_EQ(200, CountRows(tokens)); + NO_FATALS(VerifyTabletInfo(tokens)); + } + + { // Set snapshot timestamp with READ_YOUR_WRITES mode + // gives InvalidArgument error. + vector<KuduScanToken*> tokens; + ElementDeleter deleter(&tokens); + KuduScanTokenBuilder builder(table.get()); + ASSERT_OK(builder.SetReadMode(KuduScanner::READ_YOUR_WRITES)); + ASSERT_OK(builder.SetSnapshotMicros(1)); + ASSERT_TRUE(builder.Build(&tokens).IsInvalidArgument()); + } + { // no predicates vector<KuduScanToken*> tokens; ElementDeleter deleter(&tokens); @@ -414,10 +436,21 @@ TEST_F(ScanTokenTest, TestScanTokensWithNonCoveringRange) { } } +const kudu::ReadMode read_modes[] = { + kudu::READ_LATEST, + kudu::READ_AT_SNAPSHOT, + kudu::READ_YOUR_WRITES, +}; + +class TimestampPropagationParamTest : + public ScanTokenTest, + public ::testing::WithParamInterface<kudu::ReadMode> { +}; // When building a scanner from a serialized scan token, // verify that the propagated timestamp from the token makes its way into the // latest observed timestamp of the client object. -TEST_F(ScanTokenTest, TestTimestampPropagation) { +TEST_P(TimestampPropagationParamTest, Test) { + const kudu::ReadMode read_mode = GetParam(); static const string kTableName = "p_ts_table"; // Create a table to work with: @@ -448,27 +481,25 @@ TEST_F(ScanTokenTest, TestTimestampPropagation) { } // Deserialize a scan token and make sure the client's last observed timestamp - // is updated accordingly. - { - const uint64_t ts_prev = client_->GetLatestObservedTimestamp(); - const uint64_t ts_propagated = ts_prev + 1000000; - - ScanTokenPB pb; - pb.set_table_name(kTableName); - pb.set_read_mode(::kudu::READ_AT_SNAPSHOT); - pb.set_propagated_timestamp(ts_propagated); - const string serialized_token = pb.SerializeAsString(); - EXPECT_EQ(ts_prev, client_->GetLatestObservedTimestamp()); - - KuduScanner* scanner_raw; - ASSERT_OK(KuduScanToken::DeserializeIntoScanner(client_.get(), - serialized_token, - &scanner_raw)); - // The caller of the DeserializeIntoScanner() is responsible for - // de-allocating the result scanner object. - unique_ptr<KuduScanner> scanner(scanner_raw); - EXPECT_EQ(ts_propagated, client_->GetLatestObservedTimestamp()); - } + // is always updated accordingly for any read modes. + const uint64_t ts_prev = client_->GetLatestObservedTimestamp(); + const uint64_t ts_propagated = ts_prev + 1000000; + + ScanTokenPB pb; + pb.set_table_name(kTableName); + pb.set_read_mode(read_mode); + pb.set_propagated_timestamp(ts_propagated); + const string serialized_token = pb.SerializeAsString(); + EXPECT_EQ(ts_prev, client_->GetLatestObservedTimestamp()); + + KuduScanner* scanner_raw; + ASSERT_OK(KuduScanToken::DeserializeIntoScanner(client_.get(), + serialized_token, + &scanner_raw)); + // The caller of the DeserializeIntoScanner() is responsible for + // de-allocating the result scanner object. + unique_ptr<KuduScanner> scanner(scanner_raw); + EXPECT_EQ(ts_propagated, client_->GetLatestObservedTimestamp()); // Build the set of scan tokens for the table, serialize them and // make sure the serialized tokens contain the propagated timestamp. @@ -493,6 +524,9 @@ TEST_F(ScanTokenTest, TestTimestampPropagation) { } } +INSTANTIATE_TEST_CASE_P(Params, TimestampPropagationParamTest, + testing::ValuesIn(read_modes)); + // Tests the results of creating scan tokens, altering the columns being // scanned, and then executing the scan tokens. TEST_F(ScanTokenTest, TestConcurrentAlterTable) { http://git-wip-us.apache.org/repos/asf/kudu/blob/5047f091/src/kudu/client/scanner-internal.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/scanner-internal.cc b/src/kudu/client/scanner-internal.cc index f26f588..ea67ec2 100644 --- a/src/kudu/client/scanner-internal.cc +++ b/src/kudu/client/scanner-internal.cc @@ -315,8 +315,8 @@ Status KuduScanner::Data::OpenTablet(const string& partition_key, case KuduScanner::READ_LATEST: scan->set_read_mode(kudu::READ_LATEST); if (configuration_.has_snapshot_timestamp()) { - LOG(WARNING) << "Ignoring snapshot timestamp since " - "not in READ_AT_SNAPSHOT mode."; + LOG(FATAL) << "Snapshot timestamp should only be configured " + "for READ_AT_SNAPSHOT scan mode."; } break; case KuduScanner::READ_AT_SNAPSHOT: @@ -325,6 +325,13 @@ Status KuduScanner::Data::OpenTablet(const string& partition_key, scan->set_snap_timestamp(configuration_.snapshot_timestamp()); } break; + case KuduScanner::READ_YOUR_WRITES: + scan->set_read_mode(kudu::READ_YOUR_WRITES); + if (configuration_.has_snapshot_timestamp()) { + LOG(FATAL) << "Snapshot timestamp should only be configured " + "for READ_AT_SNAPSHOT scan mode."; + } + break; default: LOG(FATAL) << Substitute("$0: unexpected read mode", read_mode); } @@ -344,10 +351,18 @@ Status KuduScanner::Data::OpenTablet(const string& partition_key, scan->set_cache_blocks(configuration_.spec().cache_blocks()); // For consistent operations, propagate the timestamp among all operations - // performed the context of the same client. - const uint64_t lo_ts = table_->client()->data_->GetLatestObservedTimestamp(); - if (lo_ts != KuduClient::kNoTimestamp) { - scan->set_propagated_timestamp(lo_ts); + // performed the context of the same client. For READ_YOUR_WRITES scan, use + // the propagation timestamp from the scan config. + uint64_t ts = KuduClient::kNoTimestamp; + if (read_mode == KuduScanner::READ_YOUR_WRITES) { + if (configuration_.has_lower_bound_propagation_timestamp()) { + ts = configuration_.lower_bound_propagation_timestamp(); + } + } else { + ts = table_->client()->data_->GetLatestObservedTimestamp(); + } + if (ts != KuduClient::kNoTimestamp) { + scan->set_propagated_timestamp(ts); } // Set up the predicates. @@ -473,7 +488,14 @@ Status KuduScanner::Data::OpenTablet(const string& partition_key, configuration_.SetSnapshotRaw(last_response_.snap_timestamp()); } - if (last_response_.has_propagated_timestamp()) { + // For READ_YOUR_WRITES mode, updates the latest observed timestamp with + // the chosen snapshot timestamp sent back from the server, to avoid + // unnecessarily wait for subsequent reads. + if (configuration_.read_mode() == KuduScanner::READ_YOUR_WRITES) { + CHECK(last_response_.has_snap_timestamp()); + table_->client()->data_->UpdateLatestObservedTimestamp( + last_response_.snap_timestamp()); + } else if (last_response_.has_propagated_timestamp()) { table_->client()->data_->UpdateLatestObservedTimestamp( last_response_.propagated_timestamp()); } http://git-wip-us.apache.org/repos/asf/kudu/blob/5047f091/src/kudu/integration-tests/consistency-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/consistency-itest.cc b/src/kudu/integration-tests/consistency-itest.cc index 76aad69..a0a2dc9 100644 --- a/src/kudu/integration-tests/consistency-itest.cc +++ b/src/kudu/integration-tests/consistency-itest.cc @@ -56,9 +56,12 @@ #include "kudu/tserver/tablet_server.h" #include "kudu/tserver/ts_tablet_manager.h" #include "kudu/util/monotime.h" +#include "kudu/util/random.h" #include "kudu/util/slice.h" #include "kudu/util/status.h" #include "kudu/util/test_macros.h" +#include "kudu/util/test_util.h" +#include "kudu/util/thread.h" DECLARE_int32(heartbeat_interval_ms); DECLARE_int32(max_clock_sync_error_usec); @@ -86,7 +89,7 @@ namespace client { class ConsistencyITest : public MiniClusterITestBase { public: ConsistencyITest() - : num_tablet_servers_(2), + : num_tablet_servers_(3), table_name_("timestamp_propagation_test_table"), key_column_name_("key"), key_split_value_(8) { @@ -112,6 +115,35 @@ class ConsistencyITest : public MiniClusterITestBase { StartCluster(num_tablet_servers_); } + void ScannerThread(KuduClient::ReplicaSelection selection, + int rows_to_insert, + int first_row, + int scans_to_perform) { + client::sp::shared_ptr<KuduClient> client; + CHECK_OK(cluster_->CreateClient(nullptr, &client)); + + shared_ptr<KuduTable> table; + CHECK_OK(client->OpenTable(table_name_, &table)); + + size_t row_count; + for (int i = 0; i < 3; i++) { + // Insert multiple rows into the tablets. + InsertTestRows(client.get(), table.get(), rows_to_insert, first_row * i); + int expected_count = rows_to_insert * (i + 1); + // Perform a bunch of READ_YOUR_WRITES scans to all the replicas + // that count the rows. And verify that the count of the rows + // never go down from what previously observed, to ensure subsequent + // reads will not "go back in time" regarding writes that other + // clients have done. + for (int j = 0; j < scans_to_perform; j++) { + CHECK_OK(GetRowCount(table.get(), KuduScanner::READ_YOUR_WRITES, + selection, 0, &row_count)); + EXPECT_LE(expected_count, row_count); + expected_count = row_count; + } + } + } + protected: static void UpdateClock(HybridClock* clock, MonoDelta delta) { const uint64_t new_time(HybridClock::GetPhysicalValueMicros(clock->Now()) + @@ -122,7 +154,8 @@ class ConsistencyITest : public MiniClusterITestBase { // Creates a table with the specified name and replication factor. Status CreateTable(KuduClient* client, - const string& table_name) { + const string& table_name, + int num_replicas = 1) { unique_ptr<KuduPartialRow> split_row(schema_.NewRow()); RETURN_NOT_OK(split_row->SetInt32(0, key_split_value_)); @@ -131,7 +164,7 @@ class ConsistencyITest : public MiniClusterITestBase { .schema(&schema_) .add_range_partition_split(split_row.release()) .set_range_partition_columns({ key_column_name_ }) - .num_replicas(1) + .num_replicas(num_replicas) .Create()); return Status::OK(); } @@ -162,8 +195,15 @@ class ConsistencyITest : public MiniClusterITestBase { Status GetRowCount(KuduTable* table, KuduScanner::ReadMode read_mode, uint64_t ts, size_t* row_count) { + return GetRowCount(table, read_mode, KuduClient::LEADER_ONLY, ts, row_count); + } + + Status GetRowCount(KuduTable* table, KuduScanner::ReadMode read_mode, + KuduClient::ReplicaSelection selection, uint64_t ts, + size_t* row_count) { KuduScanner scanner(table); RETURN_NOT_OK(scanner.SetReadMode(read_mode)); + RETURN_NOT_OK(scanner.SetSelection(selection)); if (read_mode == KuduScanner::READ_AT_SNAPSHOT && ts != 0) { RETURN_NOT_OK(scanner.SetSnapshotRaw(ts + 1)); } @@ -709,5 +749,102 @@ TEST_F(ConsistencyITest, TestScanTokenTimestampPropagation) { } } +const KuduClient::ReplicaSelection replica_selectors[] = { + KuduClient::LEADER_ONLY, + KuduClient::CLOSEST_REPLICA, + KuduClient::FIRST_REPLICA, +}; + +class ScanYourWritesParamTest : + public ConsistencyITest, + public ::testing::WithParamInterface<KuduClient::ReplicaSelection> { +}; + +// Verify that no matter which replica is selected, a single client could +// achieve read-your-writes on READ_YOUR_WRITES scan mode. +TEST_P(ScanYourWritesParamTest, Test) { + const KuduClient::ReplicaSelection sel = GetParam(); + shared_ptr<KuduClient> client; + ASSERT_OK(cluster_->CreateClient(nullptr, &client)); + ASSERT_OK(CreateTable(client.get(), table_name_, 3)); + shared_ptr<KuduTable> table; + ASSERT_OK(client->OpenTable(table_name_, &table)); + + // Insert multiple rows into the tablets. + const uint64_t rows_to_insert = 20000; + ASSERT_OK(InsertTestRows(client.get(), table.get(), rows_to_insert, 0)); + + size_t row_count; + ASSERT_OK(GetRowCount(table.get(), KuduScanner::READ_YOUR_WRITES, + sel, 0, &row_count)); + EXPECT_EQ(rows_to_insert, row_count); + + row_count = 0; + ASSERT_OK(GetRowCount(table.get(), KuduScanner::READ_YOUR_WRITES, + sel, 0, &row_count)); + EXPECT_EQ(rows_to_insert, row_count); +} + +INSTANTIATE_TEST_CASE_P(Params, ScanYourWritesParamTest, + testing::ValuesIn(replica_selectors)); + +class ScanYourWritesMultiClientsParamTest : + public ConsistencyITest, + public ::testing::WithParamInterface<KuduClient::ReplicaSelection> { +}; + +// This is a test that verifies, when multiple clients running +// simultaneously, a client can get read-your-writes and +// read-your-reads session guarantees using READ_YOUR_WRITES +// scan mode, no matter which replica is selected. +// +// Read-your-writes guarantees that a client can see all previous +// writes that it performed. +// +// Read-your-reads guarantees all subsequent reads to a given object +// "never return any previous values" regarding writes that other +// clients have done. +// +// The test scenario is as the following: +// +// 1) Have multiple clients running concurrently, +// +// 2) From the same client performs multiple writes and +// multiple sets of scans. Each client +// continuously performs inserts to a tablet, and then +// performs a bunch of READ_YOUR_WRITES scans to all the +// replicas that count the rows. The count of the rows +// should never go down from the previous observed one. +TEST_P(ScanYourWritesMultiClientsParamTest, Test) { + const KuduClient::ReplicaSelection sel = GetParam(); + shared_ptr<KuduClient> client; + ASSERT_OK(cluster_->CreateClient(nullptr, &client)); + ASSERT_OK(CreateTable(client.get(), table_name_, 3)); + + for (int run = 1; run <= 3; run++) { + vector<scoped_refptr<Thread>> threads; + const int kNumThreads = 5; + const int rows_to_insert = 1000; + const int scans_to_performs = AllowSlowTests() ? 10 : 3; + for (int i = 0; i < kNumThreads; i++) { + Random first_row(rows_to_insert * kNumThreads); + scoped_refptr<kudu::Thread> new_thread; + CHECK_OK(kudu::Thread::Create( + "test", strings::Substitute("test-scanner-$0", i), + &ConsistencyITest::ScannerThread, this, + sel, rows_to_insert, first_row.Next32(), + scans_to_performs, &new_thread)); + threads.push_back(new_thread); + } + SleepFor(MonoDelta::FromMilliseconds(50)); + + for (const scoped_refptr<kudu::Thread> &thr : threads) { + CHECK_OK(ThreadJoiner(thr.get()).Join()); + } + } +} + +INSTANTIATE_TEST_CASE_P(Params, ScanYourWritesMultiClientsParamTest, + testing::ValuesIn(replica_selectors)); } // namespace client } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/5047f091/src/kudu/integration-tests/delete_table-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/delete_table-itest.cc b/src/kudu/integration-tests/delete_table-itest.cc index 12b6848..43ed3b2 100644 --- a/src/kudu/integration-tests/delete_table-itest.cc +++ b/src/kudu/integration-tests/delete_table-itest.cc @@ -1497,6 +1497,8 @@ TEST_P(DeleteTableWhileScanInProgressParamTest, Test) { return "READ_LATEST"; case KuduScanner::READ_AT_SNAPSHOT: return "READ_AT_SNAPSHOT"; + case KuduScanner::READ_YOUR_WRITES: + return "READ_YOUR_WRITES"; default: return "UNKNOWN"; } @@ -1563,7 +1565,7 @@ TEST_P(DeleteTableWhileScanInProgressParamTest, Test) { using kudu::client::sp::shared_ptr; shared_ptr<KuduTable> table; - ASSERT_OK(client_->OpenTable(w.table_name(), &table)); + ASSERT_OK(w.client()->OpenTable(w.table_name(), &table)); KuduScanner scanner(table.get()); ASSERT_OK(scanner.SetReadMode(mode)); ASSERT_OK(scanner.SetSelection(sel)); @@ -1621,6 +1623,7 @@ TEST_P(DeleteTableWhileScanInProgressParamTest, Test) { const KuduScanner::ReadMode read_modes[] = { KuduScanner::READ_LATEST, KuduScanner::READ_AT_SNAPSHOT, + KuduScanner::READ_YOUR_WRITES, }; const KuduClient::ReplicaSelection replica_selectors[] = { KuduClient::LEADER_ONLY,
