Repository: kudu Updated Branches: refs/heads/master d9e703713 -> 723ced836
KUDU-1704: add READ_YOUR_WRITES scan mode This patch adds a new read mode READ_YOUR_WRITES on tserver. In this mode, the server will pick a snapshot in the past, subject to the propagated timestamp bound, and perform a read. Moreover, the chosen snapshot timestamp is returned back to the client. The major difference between READ_AT_SNAPSHOT scan without a timestamp and READ_YOUR_WRITES scan is the latter will choose the newest timestamp within the staleness bound that allows execution of the reads without being blocked by the in-flight transactions if possible. READ_YOUR_WRITES mode is not repeatable. However, it allows client local read-your-writes/read-your-reads. Design doc: https://docs.google.com/document/d/1WRLzKdCmRxXjUpi-DZsz7l2gl6215BsNdORtDXrmgl0/edit?usp=sharing Change-Id: I84ddb981a1a0f199d4e66f5d5097318f8c785a48 Reviewed-on: http://gerrit.cloudera.org:8080/8804 Tested-by: Kudu Jenkins Reviewed-by: Mike Percy <[email protected]> Reviewed-by: David Ribeiro Alves <[email protected]> Reviewed-by: Alexey Serbin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/723ced83 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/723ced83 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/723ced83 Branch: refs/heads/master Commit: 723ced836aa5e9729fa2891d558f0f70171c4db1 Parents: d9e7037 Author: hahao <[email protected]> Authored: Fri Dec 8 14:49:35 2017 -0800 Committer: Hao Hao <[email protected]> Committed: Wed Feb 21 19:20:28 2018 +0000 ---------------------------------------------------------------------- src/kudu/common/common.proto | 25 ++- src/kudu/tserver/tablet_server-test-base.cc | 4 +- src/kudu/tserver/tablet_server-test-base.h | 5 +- src/kudu/tserver/tablet_server-test.cc | 231 ++++++++++++++++++----- src/kudu/tserver/tablet_service.cc | 147 ++++++++++----- src/kudu/tserver/tablet_service.h | 12 ++ 6 files changed, 326 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/723ced83/src/kudu/common/common.proto ---------------------------------------------------------------------- diff --git a/src/kudu/common/common.proto b/src/kudu/common/common.proto index 4879b18..54016bc 100644 --- a/src/kudu/common/common.proto +++ b/src/kudu/common/common.proto @@ -217,7 +217,7 @@ enum ReadMode { // is lower than the snapshot's timestamp to complete. // // When mixing reads and writes clients that specify COMMIT_WAIT as their - // external consistency mode and then use the returned write_timestamp to + // external consistency mode and then use the returned write_timestamp // to perform snapshot reads are guaranteed that that snapshot time is // considered in the past by all servers and no additional action is // necessary. Clients using CLIENT_PROPAGATED however must forcibly propagate @@ -231,6 +231,29 @@ enum ReadMode { // the former. // TODO implement actually signing the propagated timestamp. READ_AT_SNAPSHOT = 2; + + // When READ_YOUR_WRITES is specified, the server will pick a timestamp to use + // for a server-local snapshot scan subject to the following criteria: + // (1) It will be higher than the propagated timestamp, + // (2) It will attempt to minimize latency caused by waiting for outstanding + // write transactions to complete. + // More specifically, the server will choose the latest timestamp higher than + // the provided propagated timestamp bound that allows execution of the + // reads without being blocked by the in-flight transactions (however the + // read can be blocked if the propagated timestamp is higher than some in-flight + // transactions). If no propagated timestamp is provided the server will choose + // a timestamp such that all transactions before it are committed. The chosen + // timestamp will be returned back to the client as 'snapshot timestamp'. The Kudu + // client library will use it as the propagated timestamp for subsequent reads + // to avoid unnecessarily waiting. + // + // Reads in this mode are not repeatable: two READ_YOUR_WRITES reads, even if + // they provide the same propagated timestamp bound, can execute at different + // timestamps and thus return different results. However, it allows + // read-your-writes and read-your-reads for each client, as the chosen + // timestamp must be higher than the one of the last write or read, + // known from the propagated timestamp. + READ_YOUR_WRITES = 3; } // The possible order modes for clients. http://git-wip-us.apache.org/repos/asf/kudu/blob/723ced83/src/kudu/tserver/tablet_server-test-base.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/tablet_server-test-base.cc b/src/kudu/tserver/tablet_server-test-base.cc index 5107934..1168601 100644 --- a/src/kudu/tserver/tablet_server-test-base.cc +++ b/src/kudu/tserver/tablet_server-test-base.cc @@ -441,7 +441,8 @@ void TabletServerTestBase::VerifyScanRequestFailure( } // Open a new scanner which scans all of the columns in the table. -void TabletServerTestBase::OpenScannerWithAllColumns(ScanResponsePB* resp) { +void TabletServerTestBase::OpenScannerWithAllColumns(ScanResponsePB* resp, + ReadMode read_mode) { ScanRequestPB req; RpcController rpc; @@ -449,6 +450,7 @@ void TabletServerTestBase::OpenScannerWithAllColumns(ScanResponsePB* resp) { const Schema& projection = schema_; NewScanRequestPB* scan = req.mutable_new_scan_request(); scan->set_tablet_id(kTabletId); + scan->set_read_mode(read_mode); ASSERT_OK(SchemaToColumnPBs(projection, scan->mutable_projected_columns())); req.set_call_seq_id(0); req.set_batch_size_bytes(0); // so it won't return data right away http://git-wip-us.apache.org/repos/asf/kudu/blob/723ced83/src/kudu/tserver/tablet_server-test-base.h ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/tablet_server-test-base.h b/src/kudu/tserver/tablet_server-test-base.h index 8aa793e..9841021 100644 --- a/src/kudu/tserver/tablet_server-test-base.h +++ b/src/kudu/tserver/tablet_server-test-base.h @@ -23,10 +23,10 @@ #include <utility> #include <vector> +#include "kudu/common/common.pb.h" #include "kudu/common/row.h" #include "kudu/common/schema.h" #include "kudu/consensus/consensus.proxy.h" -#include "kudu/gutil/gscoped_ptr.h" #include "kudu/gutil/ref_counted.h" #include "kudu/server/server_base.proxy.h" #include "kudu/tablet/tablet_replica.h" @@ -117,7 +117,8 @@ class TabletServerTestBase : public KuduTest { const char *expected_message); // Open a new scanner which scans all of the columns in the table. - void OpenScannerWithAllColumns(ScanResponsePB* resp); + void OpenScannerWithAllColumns(ScanResponsePB* resp, + ReadMode read_mode = READ_LATEST); protected: static const char* kTableId; http://git-wip-us.apache.org/repos/asf/kudu/blob/723ced83/src/kudu/tserver/tablet_server-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc index a87b03f..137f4a5 100644 --- a/src/kudu/tserver/tablet_server-test.cc +++ b/src/kudu/tserver/tablet_server-test.cc @@ -183,6 +183,8 @@ class TabletServerTest : public TabletServerTestBase { } void DoOrderedScanTest(const Schema& projection, const string& expected_rows_as_string); + + void ScanYourWritesTest(uint64_t propagated_timestamp, ScanResponsePB* resp); }; TEST_F(TabletServerTest, TestPingServer) { @@ -1412,7 +1414,7 @@ TEST_F(TabletServerTest, TestClientGetsErrorBackWhenRecoveryFailed) { "Log file corruption detected"); } -TEST_F(TabletServerTest, TestScan) { +TEST_F(TabletServerTest, TestReadLatest) { int num_rows = AllowSlowTests() ? 10000 : 1000; InsertTestRowsDirect(0, num_rows); @@ -1467,7 +1469,14 @@ TEST_F(TabletServerTest, TestScan) { ASSERT_EQ(0, tablet_active_scanners->value()); } -TEST_F(TabletServerTest, TestExpiredScanner) { +class ExpiredScannerParamTest : + public TabletServerTest, + public ::testing::WithParamInterface<ReadMode> { +}; + +TEST_P(ExpiredScannerParamTest, Test) { + const ReadMode mode = GetParam(); + // Make scanners expire quickly. FLAGS_scanner_ttl_ms = 1; @@ -1484,7 +1493,7 @@ TEST_F(TabletServerTest, TestExpiredScanner) { // Open a scanner but don't read from it. ScanResponsePB resp; - ASSERT_NO_FATAL_FAILURE(OpenScannerWithAllColumns(&resp)); + ASSERT_NO_FATAL_FAILURE(OpenScannerWithAllColumns(&resp, mode)); // The scanner should expire after a short time. ASSERT_EVENTUALLY([&]() { @@ -1502,7 +1511,22 @@ TEST_F(TabletServerTest, TestExpiredScanner) { ASSERT_EQ(TabletServerErrorPB::SCANNER_EXPIRED, resp.error().code()); } -TEST_F(TabletServerTest, TestScanCorruptedDeltas) { +const ReadMode read_modes[] = { + READ_LATEST, + READ_AT_SNAPSHOT, + READ_YOUR_WRITES, +}; + +INSTANTIATE_TEST_CASE_P(Params, ExpiredScannerParamTest, + testing::ValuesIn(read_modes)); + +class ScanCorruptedDeltasParamTest : + public TabletServerTest, + public ::testing::WithParamInterface<ReadMode> { +}; + +TEST_P(ScanCorruptedDeltasParamTest, Test) { + const ReadMode mode = GetParam(); // Ensure some rows get to disk with deltas. InsertTestRowsDirect(0, 100); ASSERT_OK(tablet_replica_->tablet()->Flush()); @@ -1540,6 +1564,7 @@ TEST_F(TabletServerTest, TestScanCorruptedDeltas) { RpcController rpc; NewScanRequestPB* scan = req.mutable_new_scan_request(); scan->set_tablet_id(kTabletId); + scan->set_read_mode(mode); ASSERT_OK(SchemaToColumnPBs(schema_, scan->mutable_projected_columns())); // Send the call. This first call should attempt to init the corrupted @@ -1557,17 +1582,32 @@ TEST_F(TabletServerTest, TestScanCorruptedDeltas) { } } -TEST_F(TabletServerTest, TestScannerOpenWhenServerShutsDown) { - InsertTestRowsDirect(0, 1); +INSTANTIATE_TEST_CASE_P(Params, ScanCorruptedDeltasParamTest, + testing::ValuesIn(read_modes)); + +class ScannerOpenWhenServerShutsDownParamTest : + public TabletServerTest, + public ::testing::WithParamInterface<ReadMode> { +}; +TEST_P(ScannerOpenWhenServerShutsDownParamTest, Test) { + const ReadMode mode = GetParam(); + // Write and flush the write, so we have some rows in MRS and DRS + InsertTestRowsDirect(0, 100); + ASSERT_OK(tablet_replica_->tablet()->Flush()); + UpdateTestRowRemote(1, 100); + ASSERT_OK(tablet_replica_->tablet()->Flush()); ScanResponsePB resp; - ASSERT_NO_FATAL_FAILURE(OpenScannerWithAllColumns(&resp)); + ASSERT_NO_FATAL_FAILURE(OpenScannerWithAllColumns(&resp, mode)); // Scanner is now open. The test will now shut down the TS with the scanner still // out there. Due to KUDU-161 this used to fail, since the scanner (and thus the MRS) // stayed open longer than the anchor registry } +INSTANTIATE_TEST_CASE_P(Params, ScannerOpenWhenServerShutsDownParamTest, + testing::ValuesIn(read_modes)); + TEST_F(TabletServerTest, TestSnapshotScan) { const int num_rows = AllowSlowTests() ? 1000 : 100; const int num_batches = AllowSlowTests() ? 100 : 10; @@ -1721,40 +1761,6 @@ TEST_F(TabletServerTest, TestSnapshotScan_SnapshotInTheFutureFails) { } } - -// Test tserver shutdown with an active scanner open. -TEST_F(TabletServerTest, TestSnapshotScan_OpenScanner) { - vector<uint64_t> write_timestamps_collector; - // Write and flush and write, so we have some rows in MRS and DRS - InsertTestRowsRemote(0, 100, 2, nullptr, kTabletId, &write_timestamps_collector); - ASSERT_OK(tablet_replica_->tablet()->Flush()); - InsertTestRowsRemote(100, 100, 2, nullptr, kTabletId, &write_timestamps_collector); - - ScanRequestPB req; - ScanResponsePB resp; - RpcController rpc; - - // Set up a new request with no predicates, all columns. - const Schema& projection = schema_; - NewScanRequestPB* scan = req.mutable_new_scan_request(); - scan->set_tablet_id(kTabletId); - ASSERT_OK(SchemaToColumnPBs(projection, scan->mutable_projected_columns())); - req.set_call_seq_id(0); - req.set_batch_size_bytes(0); - scan->set_read_mode(READ_AT_SNAPSHOT); - - // Send the call - { - SCOPED_TRACE(SecureDebugString(req)); - ASSERT_OK(proxy_->Scan(req, &resp, &rpc)); - SCOPED_TRACE(SecureDebugString(resp)); - ASSERT_FALSE(resp.has_error()); - } - // Intentionally do not drain the scanner at the end, to leave it open. - // This tests tablet server shutdown with an active scanner. -} - - // Test retrying a snapshot scan using last_row. TEST_F(TabletServerTest, TestSnapshotScan_LastRow) { // Set the internal batching within the tserver to be small. Otherwise, @@ -1838,7 +1844,6 @@ TEST_F(TabletServerTest, TestSnapshotScan_LastRow) { } } - // Tests that a read in the future succeeds if a propagated_timestamp (that is even // further in the future) follows along. Also tests that the clock was updated so // that no writes will ever have a timestamp post this snapshot. @@ -1941,6 +1946,85 @@ TEST_F(TabletServerTest, TestSnapshotScan__SnapshotInTheFutureBeyondPropagatedTi } } +// Scan with READ_YOUR_WRITES mode to ensure it can +// satisfy read-your-writes/read-your-reads session guarantee. +TEST_F(TabletServerTest, TestScanYourWrites) { + vector<uint64_t> write_timestamps_collector; + const int kNumRows = 100; + // Perform a write. + InsertTestRowsRemote(0, kNumRows, 1, nullptr, kTabletId, &write_timestamps_collector); + + // Scan with READ_YOUR_WRITES mode and use the previous + // write response as the propagated timestamp. + ScanResponsePB resp; + int64_t propagated_timestamp = write_timestamps_collector[0]; + ScanYourWritesTest(propagated_timestamp, &resp); + + // Store the returned snapshot timestamp as the propagated + // timestamp for the next read. + propagated_timestamp = resp.snap_timestamp(); + // Drain all the rows from the scanner. + vector<string> results; + ASSERT_NO_FATAL_FAILURE(DrainScannerToStrings(resp.scanner_id(), schema_, &results)); + ASSERT_EQ(kNumRows, results.size()); + ASSERT_EQ(R"((int32 key=0, int32 int_val=0, string string_val="original0"))", results[0]); + ASSERT_EQ(R"((int32 key=99, int32 int_val=99, string string_val="original99"))", results[99]); + + // Rescan the tablet to ensure READ_YOUR_WRITES mode can + // satisfy read-your-reads session guarantee. + ScanResponsePB new_resp; + ScanYourWritesTest(propagated_timestamp, &new_resp); + // Drain all the rows from the scanner. + results.clear(); + ASSERT_NO_FATAL_FAILURE(DrainScannerToStrings(new_resp.scanner_id(), schema_, &results)); + ASSERT_EQ(kNumRows, results.size()); + ASSERT_EQ(R"((int32 key=0, int32 int_val=0, string string_val="original0"))", results[0]); + ASSERT_EQ(R"((int32 key=99, int32 int_val=99, string string_val="original99"))", results[99]); +} + +// Tests that a read succeeds even without propagated_timestamp. +TEST_F(TabletServerTest, TestScanYourWrites_WithoutPropagatedTimestamp) { + vector<uint64_t> write_timestamps_collector; + // Perform a write. + InsertTestRowsRemote(0, 1, 1, nullptr, kTabletId, &write_timestamps_collector); + + ScanResponsePB resp; + ScanYourWritesTest(Timestamp::kMin.ToUint64(), &resp); +} + +// Tests that a read succeeds even with a future propagated_timestamp. Also +// tests that the clock was updated so that no writes will ever have a +// timestamp before this snapshot. +TEST_F(TabletServerTest, TestScanYourWrites_PropagatedTimestampInTheFuture) { + vector<uint64_t> write_timestamps_collector; + // Perform a write. + InsertTestRowsRemote(0, 1, 1, nullptr, kTabletId, &write_timestamps_collector); + + ScanResponsePB resp; + // Increment the write timestamp by 5 secs: the server will definitely consider + // this in the future. + Timestamp propagated_timestamp(write_timestamps_collector[0]); + propagated_timestamp = HybridClock::TimestampFromMicroseconds( + HybridClock::GetPhysicalValueMicros(propagated_timestamp) + 5000000); + ScanYourWritesTest(propagated_timestamp.ToUint64(), &resp); + + // Make sure the server's current clock returns a value that is larger than the + // propagated timestamp. It should have the same physical time, but higher + // logical time (due to various calls to clock.Now() when processing the request). + Timestamp now = mini_server_->server()->clock()->Now(); + + ASSERT_EQ(HybridClock::GetPhysicalValueMicros(propagated_timestamp), + HybridClock::GetPhysicalValueMicros(now)); + + ASSERT_GT(HybridClock::GetLogicalValue(now), + HybridClock::GetLogicalValue(propagated_timestamp)); + + vector<string> results; + NO_FATALS(DrainScannerToStrings(resp.scanner_id(), schema_, &results)); + ASSERT_EQ(1, results.size()); + ASSERT_EQ(R"((int32 key=0, int32 int_val=0, string string_val="original0"))", results[0]); +} + TEST_F(TabletServerTest, TestScanWithStringPredicates) { InsertTestRowsDirect(0, 100); @@ -2088,13 +2172,19 @@ TEST_F(TabletServerTest, TestBadScannerID) { // Test passing a scanner ID, but also filling in some of the NewScanRequest // field. -TEST_F(TabletServerTest, TestInvalidScanRequest_NewScanAndScannerID) { +class InvalidScanRequest_NewScanAndScannerIDParamTest : + public TabletServerTest, + public ::testing::WithParamInterface<ReadMode> { +}; +TEST_P(InvalidScanRequest_NewScanAndScannerIDParamTest, Test) { + const ReadMode mode = GetParam(); ScanRequestPB req; ScanResponsePB resp; RpcController rpc; NewScanRequestPB* scan = req.mutable_new_scan_request(); scan->set_tablet_id(kTabletId); + scan->set_read_mode(mode); req.set_batch_size_bytes(0); // so it won't return data right away req.set_scanner_id("x"); SCOPED_TRACE(SecureDebugString(req)); @@ -2103,6 +2193,8 @@ TEST_F(TabletServerTest, TestInvalidScanRequest_NewScanAndScannerID) { ASSERT_STR_CONTAINS(s.ToString(), "Must not pass both a scanner_id and new_scan_request"); } +INSTANTIATE_TEST_CASE_P(Params, InvalidScanRequest_NewScanAndScannerIDParamTest, + testing::ValuesIn(read_modes)); // Test that passing a projection with fields not present in the tablet schema // throws an exception. @@ -2157,7 +2249,11 @@ TEST_F(TabletServerTest, TestInvalidScanRequest_BadProjectionTypes) { // Test that passing a projection with Column IDs throws an exception. // Column IDs are assigned to the user request schema on the tablet server // based on the latest schema. -TEST_F(TabletServerTest, TestInvalidScanRequest_WithIds) { +class InvalidScanRequest_WithIdsParamTest : + public TabletServerTest, + public ::testing::WithParamInterface<ReadMode> { +}; +TEST_P(InvalidScanRequest_WithIdsParamTest, Test) { const Schema* projection = tablet_replica_->tablet()->schema(); ASSERT_TRUE(projection->has_column_ids()); VerifyScanRequestFailure(*projection, @@ -2165,6 +2261,9 @@ TEST_F(TabletServerTest, TestInvalidScanRequest_WithIds) { "User requests should not have Column IDs"); } +INSTANTIATE_TEST_CASE_P(Params, InvalidScanRequest_WithIdsParamTest, + testing::ValuesIn(read_modes)); + // Test scanning a tablet that has no entries. TEST_F(TabletServerTest, TestScan_NoResults) { ScanRequestPB req; @@ -2192,7 +2291,12 @@ TEST_F(TabletServerTest, TestScan_NoResults) { } // Test scanning a tablet that has no entries. -TEST_F(TabletServerTest, TestScan_InvalidScanSeqId) { +class InvalidScanSeqIdParamTest : + public TabletServerTest, + public ::testing::WithParamInterface<ReadMode> { +}; +TEST_P(InvalidScanSeqIdParamTest, Test) { + const ReadMode mode = GetParam(); InsertTestRowsDirect(0, 10); ScanRequestPB req; @@ -2204,6 +2308,7 @@ TEST_F(TabletServerTest, TestScan_InvalidScanSeqId) { const Schema& projection = schema_; NewScanRequestPB* scan = req.mutable_new_scan_request(); scan->set_tablet_id(kTabletId); + scan->set_read_mode(mode); ASSERT_OK(SchemaToColumnPBs(projection, scan->mutable_projected_columns())); req.set_call_seq_id(0); req.set_batch_size_bytes(0); // so it won't return data right away @@ -2233,6 +2338,9 @@ TEST_F(TabletServerTest, TestScan_InvalidScanSeqId) { } } +INSTANTIATE_TEST_CASE_P(Params, InvalidScanSeqIdParamTest, + testing::ValuesIn(read_modes)); + // Regression test for KUDU-1789: when ScannerKeepAlive is called on a non-existent // scanner, it should properly respond with an error. TEST_F(TabletServerTest, TestScan_KeepAliveExpiredScanner) { @@ -2247,6 +2355,39 @@ TEST_F(TabletServerTest, TestScan_KeepAliveExpiredScanner) { ASSERT_EQ(resp.error().code(), TabletServerErrorPB::SCANNER_EXPIRED); } +void TabletServerTest::ScanYourWritesTest(uint64_t propagated_timestamp, + ScanResponsePB* resp) { + ScanRequestPB req; + + // Set up a new request with no predicates, all columns. + const Schema &projection = schema_; + NewScanRequestPB *scan = req.mutable_new_scan_request(); + scan->set_tablet_id(kTabletId); + scan->set_read_mode(READ_YOUR_WRITES); + if (propagated_timestamp != Timestamp::kInvalidTimestamp.ToUint64()) { + scan->set_propagated_timestamp(propagated_timestamp); + } + ASSERT_OK(SchemaToColumnPBs(projection, scan->mutable_projected_columns())); + req.set_call_seq_id(0); + req.set_batch_size_bytes(0); // so it won't return data right away + + { + RpcController rpc; + SCOPED_TRACE(SecureDebugString(req)); + ASSERT_OK(proxy_->Scan(req, resp, &rpc)); + SCOPED_TRACE(SecureDebugString(*resp)); + ASSERT_FALSE(resp->has_error()); + } + + // Make sure that the chosen snapshot timestamp is sent back and + // it is larger than the previous propagation timestamp. + ASSERT_TRUE(resp->has_snap_timestamp()); + ASSERT_LT(propagated_timestamp, resp->snap_timestamp()); + // The 'propagated_timestamp' field must be set for 'success' responses. + ASSERT_TRUE(resp->has_propagated_timestamp()); + ASSERT_TRUE(resp->has_more_results()); +} + void TabletServerTest::DoOrderedScanTest(const Schema& projection, const string& expected_rows_as_string) { InsertTestRowsDirect(0, 10); http://git-wip-us.apache.org/repos/asf/kudu/blob/723ced83/src/kudu/tserver/tablet_service.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc index 0cbcc4c..4d79d29 100644 --- a/src/kudu/tserver/tablet_service.cc +++ b/src/kudu/tserver/tablet_service.cc @@ -1624,11 +1624,13 @@ static Status SetupScanSpec(const NewScanRequestPB& scan_pb, } namespace { -// Checks if 'timestamp' is before the 'tablet's AHM if this is a READ_AT_SNAPSHOT scan. -// Returns Status::OK() if it's not or Status::InvalidArgument() if it is. +// Checks if 'timestamp' is before the tablet's AHM if this is a +// READ_AT_SNAPSHOT/READ_YOUR_WRITES scan. Returns Status::OK() if it's +// not or Status::InvalidArgument() if it is. Status VerifyNotAncientHistory(Tablet* tablet, ReadMode read_mode, Timestamp timestamp) { tablet::HistoryGcOpts history_gc_opts = tablet->GetHistoryGcOpts(); - if (read_mode == READ_AT_SNAPSHOT && history_gc_opts.IsAncientHistory(timestamp)) { + if ((read_mode == READ_AT_SNAPSHOT || read_mode == READ_YOUR_WRITES) && + history_gc_opts.IsAncientHistory(timestamp)) { return Status::InvalidArgument( Substitute("Snapshot timestamp is earlier than the ancient history mark. Consider " "increasing the value of the configuration parameter " @@ -1752,6 +1754,7 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica, s = tablet->NewRowIterator(projection, &iter); break; } + case READ_YOUR_WRITES: // Fallthrough intended case READ_AT_SNAPSHOT: { s = HandleScanAtSnapshot(scan_pb, rpc_context, projection, replica, &iter, snap_timestamp); @@ -2032,59 +2035,24 @@ MonoTime ClampScanDeadlineForWait(const MonoTime& deadline, bool* was_clamped) { Status TabletServiceImpl::HandleScanAtSnapshot(const NewScanRequestPB& scan_pb, const RpcContext* rpc_context, const Schema& projection, - TabletReplica* replica, + TabletReplica* tablet_replica, gscoped_ptr<RowwiseIterator>* iter, Timestamp* snap_timestamp) { - // If the client sent a timestamp update our clock with it. - if (scan_pb.has_propagated_timestamp()) { - Timestamp propagated_timestamp(scan_pb.propagated_timestamp()); - - // Update the clock so that we never generate snapshots lower that - // 'propagated_timestamp'. If 'propagated_timestamp' is lower than - // 'now' this call has no effect. If 'propagated_timestamp' is too much - // into the future this will fail and we abort. - RETURN_NOT_OK(server_->clock()->Update(propagated_timestamp)); + switch (scan_pb.read_mode()) { + case READ_AT_SNAPSHOT: // Fallthrough intended + case READ_YOUR_WRITES: + break; + default: + LOG(FATAL) << "Unsupported snapshot scan mode specified."; } + // Based on the read mode, pick a timestamp and verify it. Timestamp tmp_snap_timestamp; - - // If the client provided no snapshot timestamp we take the current clock - // time as the snapshot timestamp. - if (!scan_pb.has_snap_timestamp()) { - tmp_snap_timestamp = server_->clock()->Now(); - // ... else we use the client provided one, but make sure it is not too far - // in the future as to be invalid. - } else { - - Timestamp max_allowed_ts; - Status s = server_->clock()->GetGlobalLatest(&max_allowed_ts); - if (s.IsNotSupported() && - PREDICT_TRUE(!FLAGS_scanner_allow_snapshot_scans_with_logical_timestamps)) { - return Status::NotSupported("Snapshot scans not supported on this server", - s.ToString()); - } - tmp_snap_timestamp.FromUint64(scan_pb.snap_timestamp()); - - // Note: if 'max_allowed_ts' is not obtained from clock_->GetGlobalLatest() it's guaranteed - // to be higher than 'tmp_snap_timestamp'. - if (tmp_snap_timestamp > max_allowed_ts) { - return Status::InvalidArgument( - Substitute("Snapshot time $0 in the future. Max allowed timestamp is $1", - server_->clock()->Stringify(tmp_snap_timestamp), - server_->clock()->Stringify(max_allowed_ts))); - } - } - - // Before we wait on anything check that the timestamp is after the AHM. - // This is not the final check. We'll check this again after the iterators are open but - // there is no point in waiting if we can't actually scan afterwards. - RETURN_NOT_OK(VerifyNotAncientHistory(replica->tablet(), - ReadMode::READ_AT_SNAPSHOT, - tmp_snap_timestamp)); + RETURN_NOT_OK(PickAndVerifyTimestamp(scan_pb, tablet_replica->tablet(), &tmp_snap_timestamp)); tablet::MvccSnapshot snap; - Tablet* tablet = replica->tablet(); - scoped_refptr<consensus::TimeManager> time_manager = replica->time_manager(); + Tablet* tablet = tablet_replica->tablet(); + scoped_refptr<consensus::TimeManager> time_manager = tablet_replica->time_manager(); tablet::MvccManager* mvcc_manager = tablet->mvcc_manager(); // Reduce the client's deadline by a few msecs to allow for overhead. @@ -2129,6 +2097,87 @@ Status TabletServiceImpl::HandleScanAtSnapshot(const NewScanRequestPB& scan_pb, return Status::InvalidArgument("Unknown order mode specified"); } RETURN_NOT_OK(tablet->NewRowIterator(projection, snap, scan_pb.order_mode(), iter)); + + // Return the picked snapshot timestamp for both READ_AT_SNAPSHOT + // and READ_YOUR_WRITES mode. + *snap_timestamp = tmp_snap_timestamp; + return Status::OK(); +} + +Status TabletServiceImpl::ValidateTimestamp(const Timestamp& snap_timestamp) { + Timestamp max_allowed_ts; + Status s = server_->clock()->GetGlobalLatest(&max_allowed_ts); + if (PREDICT_FALSE(s.IsNotSupported()) && + PREDICT_TRUE(!FLAGS_scanner_allow_snapshot_scans_with_logical_timestamps)) { + return Status::NotSupported("Snapshot scans not supported on this server", + s.ToString()); + } + + // Note: if 'max_allowed_ts' is not obtained from clock_->GetGlobalLatest(), e.g., + // in case logical clock is used, it's guaranteed to be higher than 'tmp_snap_timestamp', + // since 'max_allowed_ts' is default-constructed to kInvalidTimestamp (MAX_LONG - 1). + if (snap_timestamp > max_allowed_ts) { + return Status::InvalidArgument( + Substitute("Snapshot time $0 in the future. Max allowed timestamp is $1", + server_->clock()->Stringify(snap_timestamp), + server_->clock()->Stringify(max_allowed_ts))); + } + + return Status::OK(); +} + +Status TabletServiceImpl::PickAndVerifyTimestamp(const NewScanRequestPB& scan_pb, + Tablet* tablet, + Timestamp* snap_timestamp) { + // If the client sent a timestamp update our clock with it. + if (scan_pb.has_propagated_timestamp()) { + Timestamp propagated_timestamp(scan_pb.propagated_timestamp()); + + // Update the clock so that we never generate snapshots lower than + // 'propagated_timestamp'. If 'propagated_timestamp' is lower than + // 'now' this call has no effect. If 'propagated_timestamp' is too far + // into the future this will fail and we abort. + RETURN_NOT_OK(server_->clock()->Update(propagated_timestamp)); + } + + Timestamp tmp_snap_timestamp; + ReadMode read_mode = scan_pb.read_mode(); + tablet::MvccManager* mvcc_manager = tablet->mvcc_manager(); + + if (read_mode == READ_AT_SNAPSHOT) { + // For READ_AT_SNAPSHOT mode, + // 1) if the client provided no snapshot timestamp we take the current + // clock time as the snapshot timestamp. + // 2) else we use the client provided one, but make sure it is not too + // far in the future as to be invalid. + if (!scan_pb.has_snap_timestamp()) { + tmp_snap_timestamp = server_->clock()->Now(); + } else { + tmp_snap_timestamp.FromUint64(scan_pb.snap_timestamp()); + RETURN_NOT_OK(ValidateTimestamp(tmp_snap_timestamp)); + } + } else { + // For READ_YOUR_WRITES mode, we use the following to choose a + // snapshot timestamp: MAX(propagated timestamp + 1, 'clean' timestamp). + // There is no need to validate if the chosen timestamp is too far in + // the future, since: + // 1) MVCC 'clean' timestamp is by definition in the past (it's maximally + // bounded by safe time). + // 2) the propagated timestamp was used to update the clock above and the + // update would have returned an error if the the timestamp was too + // far in the future. + uint64_t clean_timestamp = mvcc_manager->GetCleanTimestamp().ToUint64(); + uint64_t propagated_timestamp = scan_pb.has_propagated_timestamp() ? + scan_pb.propagated_timestamp() : Timestamp::kMin.ToUint64(); + tmp_snap_timestamp = Timestamp(std::max(propagated_timestamp + 1, clean_timestamp)); + } + + // Before we wait on anything check that the timestamp is after the AHM. + // This is not the final check. We'll check this again after the iterators are open but + // there is no point in waiting if we can't actually scan afterwards. + RETURN_NOT_OK(VerifyNotAncientHistory(tablet, + read_mode, + tmp_snap_timestamp)); *snap_timestamp = tmp_snap_timestamp; return Status::OK(); } http://git-wip-us.apache.org/repos/asf/kudu/blob/723ced83/src/kudu/tserver/tablet_service.h ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/tablet_service.h b/src/kudu/tserver/tablet_service.h index cb24cd3..37e6bf2 100644 --- a/src/kudu/tserver/tablet_service.h +++ b/src/kudu/tserver/tablet_service.h @@ -73,6 +73,7 @@ class RpcContext; } // namespace rpc namespace tablet { +class Tablet; class TabletReplica; } // namespace tablet @@ -151,6 +152,17 @@ class TabletServiceImpl : public TabletServerServiceIf { gscoped_ptr<RowwiseIterator>* iter, Timestamp* snap_timestamp); + // Validates the given timestamp is not so far in the future that + // it exceeds the maximum allowed clock synchronization error time, + // as such a timestamp is invalid. + Status ValidateTimestamp(const Timestamp& snap_timestamp); + + // Pick a timestamp according to the scan mode, and verify that the + // timestamp is after the tablet's ancient history mark. + Status PickAndVerifyTimestamp(const NewScanRequestPB& scan_pb, + tablet::Tablet* tablet, + Timestamp* snap_timestamp); + TabletServer* server_; };
