Repository: kudu
Updated Branches:
  refs/heads/master d9e703713 -> 723ced836


KUDU-1704: add READ_YOUR_WRITES scan mode

This patch adds a new read mode READ_YOUR_WRITES on tserver. In this mode,
the server will pick a snapshot in the past, subject to the propagated
timestamp bound, and perform a read. Moreover, the chosen snapshot
timestamp is returned back to the client.

The major difference between READ_AT_SNAPSHOT scan without a timestamp
and READ_YOUR_WRITES scan is the latter will choose the newest timestamp
within the staleness bound that allows execution of the reads without
being blocked by the in-flight transactions if possible. READ_YOUR_WRITES
mode is not repeatable. However, it allows client local
read-your-writes/read-your-reads.

Design doc: 
https://docs.google.com/document/d/1WRLzKdCmRxXjUpi-DZsz7l2gl6215BsNdORtDXrmgl0/edit?usp=sharing

Change-Id: I84ddb981a1a0f199d4e66f5d5097318f8c785a48
Reviewed-on: http://gerrit.cloudera.org:8080/8804
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <[email protected]>
Reviewed-by: David Ribeiro Alves <[email protected]>
Reviewed-by: Alexey Serbin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/723ced83
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/723ced83
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/723ced83

Branch: refs/heads/master
Commit: 723ced836aa5e9729fa2891d558f0f70171c4db1
Parents: d9e7037
Author: hahao <[email protected]>
Authored: Fri Dec 8 14:49:35 2017 -0800
Committer: Hao Hao <[email protected]>
Committed: Wed Feb 21 19:20:28 2018 +0000

----------------------------------------------------------------------
 src/kudu/common/common.proto                |  25 ++-
 src/kudu/tserver/tablet_server-test-base.cc |   4 +-
 src/kudu/tserver/tablet_server-test-base.h  |   5 +-
 src/kudu/tserver/tablet_server-test.cc      | 231 ++++++++++++++++++-----
 src/kudu/tserver/tablet_service.cc          | 147 ++++++++++-----
 src/kudu/tserver/tablet_service.h           |  12 ++
 6 files changed, 326 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/723ced83/src/kudu/common/common.proto
----------------------------------------------------------------------
diff --git a/src/kudu/common/common.proto b/src/kudu/common/common.proto
index 4879b18..54016bc 100644
--- a/src/kudu/common/common.proto
+++ b/src/kudu/common/common.proto
@@ -217,7 +217,7 @@ enum ReadMode {
   // is lower than the snapshot's timestamp to complete.
   //
   // When mixing reads and writes clients that specify COMMIT_WAIT as their
-  // external consistency mode and then use the returned write_timestamp to
+  // external consistency mode and then use the returned write_timestamp
   // to perform snapshot reads are guaranteed that that snapshot time is
   // considered in the past by all servers and no additional action is
   // necessary. Clients using CLIENT_PROPAGATED however must forcibly propagate
@@ -231,6 +231,29 @@ enum ReadMode {
   // the former.
   // TODO implement actually signing the propagated timestamp.
   READ_AT_SNAPSHOT = 2;
+
+  // When READ_YOUR_WRITES is specified, the server will pick a timestamp to 
use
+  // for a server-local snapshot scan subject to the following criteria:
+  // (1) It will be higher than the propagated timestamp,
+  // (2) It will attempt to minimize latency caused by waiting for outstanding
+  //     write transactions to complete.
+  // More specifically, the server will choose the latest timestamp higher than
+  // the provided propagated timestamp bound that allows execution of the
+  // reads without being blocked by the in-flight transactions (however the
+  // read can be blocked if the propagated timestamp is higher than some 
in-flight
+  // transactions). If no propagated timestamp is provided the server will 
choose
+  // a timestamp such that all transactions before it are committed. The chosen
+  // timestamp will be returned back to the client as 'snapshot timestamp'. 
The Kudu
+  // client library will use it as the propagated timestamp for subsequent 
reads
+  // to avoid unnecessarily waiting.
+  //
+  // Reads in this mode are not repeatable: two READ_YOUR_WRITES reads, even if
+  // they provide the same propagated timestamp bound, can execute at different
+  // timestamps and thus return different results. However, it allows
+  // read-your-writes and read-your-reads for each client, as the chosen
+  // timestamp must be higher than the one of the last write or read,
+  // known from the propagated timestamp.
+  READ_YOUR_WRITES = 3;
 }
 
 // The possible order modes for clients.

http://git-wip-us.apache.org/repos/asf/kudu/blob/723ced83/src/kudu/tserver/tablet_server-test-base.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test-base.cc 
b/src/kudu/tserver/tablet_server-test-base.cc
index 5107934..1168601 100644
--- a/src/kudu/tserver/tablet_server-test-base.cc
+++ b/src/kudu/tserver/tablet_server-test-base.cc
@@ -441,7 +441,8 @@ void TabletServerTestBase::VerifyScanRequestFailure(
 }
 
 // Open a new scanner which scans all of the columns in the table.
-void TabletServerTestBase::OpenScannerWithAllColumns(ScanResponsePB* resp) {
+void TabletServerTestBase::OpenScannerWithAllColumns(ScanResponsePB* resp,
+                                                     ReadMode read_mode) {
   ScanRequestPB req;
   RpcController rpc;
 
@@ -449,6 +450,7 @@ void 
TabletServerTestBase::OpenScannerWithAllColumns(ScanResponsePB* resp) {
   const Schema& projection = schema_;
   NewScanRequestPB* scan = req.mutable_new_scan_request();
   scan->set_tablet_id(kTabletId);
+  scan->set_read_mode(read_mode);
   ASSERT_OK(SchemaToColumnPBs(projection, scan->mutable_projected_columns()));
   req.set_call_seq_id(0);
   req.set_batch_size_bytes(0); // so it won't return data right away

http://git-wip-us.apache.org/repos/asf/kudu/blob/723ced83/src/kudu/tserver/tablet_server-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test-base.h 
b/src/kudu/tserver/tablet_server-test-base.h
index 8aa793e..9841021 100644
--- a/src/kudu/tserver/tablet_server-test-base.h
+++ b/src/kudu/tserver/tablet_server-test-base.h
@@ -23,10 +23,10 @@
 #include <utility>
 #include <vector>
 
+#include "kudu/common/common.pb.h"
 #include "kudu/common/row.h"
 #include "kudu/common/schema.h"
 #include "kudu/consensus/consensus.proxy.h"
-#include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/server/server_base.proxy.h"
 #include "kudu/tablet/tablet_replica.h"
@@ -117,7 +117,8 @@ class TabletServerTestBase : public KuduTest {
                                 const char *expected_message);
 
   // Open a new scanner which scans all of the columns in the table.
-  void OpenScannerWithAllColumns(ScanResponsePB* resp);
+  void OpenScannerWithAllColumns(ScanResponsePB* resp,
+                                 ReadMode read_mode = READ_LATEST);
 
  protected:
   static const char* kTableId;

http://git-wip-us.apache.org/repos/asf/kudu/blob/723ced83/src/kudu/tserver/tablet_server-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test.cc 
b/src/kudu/tserver/tablet_server-test.cc
index a87b03f..137f4a5 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -183,6 +183,8 @@ class TabletServerTest : public TabletServerTestBase {
   }
 
   void DoOrderedScanTest(const Schema& projection, const string& 
expected_rows_as_string);
+
+  void ScanYourWritesTest(uint64_t propagated_timestamp, ScanResponsePB* resp);
 };
 
 TEST_F(TabletServerTest, TestPingServer) {
@@ -1412,7 +1414,7 @@ TEST_F(TabletServerTest, 
TestClientGetsErrorBackWhenRecoveryFailed) {
                       "Log file corruption detected");
 }
 
-TEST_F(TabletServerTest, TestScan) {
+TEST_F(TabletServerTest, TestReadLatest) {
   int num_rows = AllowSlowTests() ? 10000 : 1000;
   InsertTestRowsDirect(0, num_rows);
 
@@ -1467,7 +1469,14 @@ TEST_F(TabletServerTest, TestScan) {
   ASSERT_EQ(0, tablet_active_scanners->value());
 }
 
-TEST_F(TabletServerTest, TestExpiredScanner) {
+class ExpiredScannerParamTest :
+    public TabletServerTest,
+    public ::testing::WithParamInterface<ReadMode> {
+};
+
+TEST_P(ExpiredScannerParamTest, Test) {
+  const ReadMode mode = GetParam();
+
   // Make scanners expire quickly.
   FLAGS_scanner_ttl_ms = 1;
 
@@ -1484,7 +1493,7 @@ TEST_F(TabletServerTest, TestExpiredScanner) {
 
   // Open a scanner but don't read from it.
   ScanResponsePB resp;
-  ASSERT_NO_FATAL_FAILURE(OpenScannerWithAllColumns(&resp));
+  ASSERT_NO_FATAL_FAILURE(OpenScannerWithAllColumns(&resp, mode));
 
   // The scanner should expire after a short time.
   ASSERT_EVENTUALLY([&]() {
@@ -1502,7 +1511,22 @@ TEST_F(TabletServerTest, TestExpiredScanner) {
   ASSERT_EQ(TabletServerErrorPB::SCANNER_EXPIRED, resp.error().code());
 }
 
-TEST_F(TabletServerTest, TestScanCorruptedDeltas) {
+const ReadMode read_modes[] = {
+    READ_LATEST,
+    READ_AT_SNAPSHOT,
+    READ_YOUR_WRITES,
+};
+
+INSTANTIATE_TEST_CASE_P(Params, ExpiredScannerParamTest,
+                        testing::ValuesIn(read_modes));
+
+class ScanCorruptedDeltasParamTest :
+    public TabletServerTest,
+    public ::testing::WithParamInterface<ReadMode> {
+};
+
+TEST_P(ScanCorruptedDeltasParamTest, Test) {
+  const ReadMode mode = GetParam();
   // Ensure some rows get to disk with deltas.
   InsertTestRowsDirect(0, 100);
   ASSERT_OK(tablet_replica_->tablet()->Flush());
@@ -1540,6 +1564,7 @@ TEST_F(TabletServerTest, TestScanCorruptedDeltas) {
   RpcController rpc;
   NewScanRequestPB* scan = req.mutable_new_scan_request();
   scan->set_tablet_id(kTabletId);
+  scan->set_read_mode(mode);
   ASSERT_OK(SchemaToColumnPBs(schema_, scan->mutable_projected_columns()));
 
   // Send the call. This first call should attempt to init the corrupted
@@ -1557,17 +1582,32 @@ TEST_F(TabletServerTest, TestScanCorruptedDeltas) {
   }
 }
 
-TEST_F(TabletServerTest, TestScannerOpenWhenServerShutsDown) {
-  InsertTestRowsDirect(0, 1);
+INSTANTIATE_TEST_CASE_P(Params, ScanCorruptedDeltasParamTest,
+                        testing::ValuesIn(read_modes));
+
+class ScannerOpenWhenServerShutsDownParamTest :
+    public TabletServerTest,
+    public ::testing::WithParamInterface<ReadMode> {
+};
+TEST_P(ScannerOpenWhenServerShutsDownParamTest, Test) {
+  const ReadMode mode = GetParam();
+  // Write and flush the write, so we have some rows in MRS and DRS
+  InsertTestRowsDirect(0, 100);
+  ASSERT_OK(tablet_replica_->tablet()->Flush());
+  UpdateTestRowRemote(1, 100);
+  ASSERT_OK(tablet_replica_->tablet()->Flush());
 
   ScanResponsePB resp;
-  ASSERT_NO_FATAL_FAILURE(OpenScannerWithAllColumns(&resp));
+  ASSERT_NO_FATAL_FAILURE(OpenScannerWithAllColumns(&resp, mode));
 
   // Scanner is now open. The test will now shut down the TS with the scanner 
still
   // out there. Due to KUDU-161 this used to fail, since the scanner (and thus 
the MRS)
   // stayed open longer than the anchor registry
 }
 
+INSTANTIATE_TEST_CASE_P(Params, ScannerOpenWhenServerShutsDownParamTest,
+                        testing::ValuesIn(read_modes));
+
 TEST_F(TabletServerTest, TestSnapshotScan) {
   const int num_rows = AllowSlowTests() ? 1000 : 100;
   const int num_batches = AllowSlowTests() ? 100 : 10;
@@ -1721,40 +1761,6 @@ TEST_F(TabletServerTest, 
TestSnapshotScan_SnapshotInTheFutureFails) {
   }
 }
 
-
-// Test tserver shutdown with an active scanner open.
-TEST_F(TabletServerTest, TestSnapshotScan_OpenScanner) {
-  vector<uint64_t> write_timestamps_collector;
-  // Write and flush and write, so we have some rows in MRS and DRS
-  InsertTestRowsRemote(0, 100, 2, nullptr, kTabletId, 
&write_timestamps_collector);
-  ASSERT_OK(tablet_replica_->tablet()->Flush());
-  InsertTestRowsRemote(100, 100, 2, nullptr, kTabletId, 
&write_timestamps_collector);
-
-  ScanRequestPB req;
-  ScanResponsePB resp;
-  RpcController rpc;
-
-  // Set up a new request with no predicates, all columns.
-  const Schema& projection = schema_;
-  NewScanRequestPB* scan = req.mutable_new_scan_request();
-  scan->set_tablet_id(kTabletId);
-  ASSERT_OK(SchemaToColumnPBs(projection, scan->mutable_projected_columns()));
-  req.set_call_seq_id(0);
-  req.set_batch_size_bytes(0);
-  scan->set_read_mode(READ_AT_SNAPSHOT);
-
-  // Send the call
-  {
-    SCOPED_TRACE(SecureDebugString(req));
-    ASSERT_OK(proxy_->Scan(req, &resp, &rpc));
-    SCOPED_TRACE(SecureDebugString(resp));
-    ASSERT_FALSE(resp.has_error());
-  }
-  // Intentionally do not drain the scanner at the end, to leave it open.
-  // This tests tablet server shutdown with an active scanner.
-}
-
-
 // Test retrying a snapshot scan using last_row.
 TEST_F(TabletServerTest, TestSnapshotScan_LastRow) {
   // Set the internal batching within the tserver to be small. Otherwise,
@@ -1838,7 +1844,6 @@ TEST_F(TabletServerTest, TestSnapshotScan_LastRow) {
   }
 }
 
-
 // Tests that a read in the future succeeds if a propagated_timestamp (that is 
even
 // further in the future) follows along. Also tests that the clock was updated 
so
 // that no writes will ever have a timestamp post this snapshot.
@@ -1941,6 +1946,85 @@ TEST_F(TabletServerTest, 
TestSnapshotScan__SnapshotInTheFutureBeyondPropagatedTi
   }
 }
 
+// Scan with READ_YOUR_WRITES mode to ensure it can
+// satisfy read-your-writes/read-your-reads session guarantee.
+TEST_F(TabletServerTest, TestScanYourWrites) {
+  vector<uint64_t> write_timestamps_collector;
+  const int kNumRows = 100;
+  // Perform a write.
+  InsertTestRowsRemote(0, kNumRows, 1, nullptr, kTabletId, 
&write_timestamps_collector);
+
+  // Scan with READ_YOUR_WRITES mode and use the previous
+  // write response as the propagated timestamp.
+  ScanResponsePB resp;
+  int64_t propagated_timestamp = write_timestamps_collector[0];
+  ScanYourWritesTest(propagated_timestamp, &resp);
+
+  // Store the returned snapshot timestamp as the propagated
+  // timestamp for the next read.
+  propagated_timestamp = resp.snap_timestamp();
+  // Drain all the rows from the scanner.
+  vector<string> results;
+  ASSERT_NO_FATAL_FAILURE(DrainScannerToStrings(resp.scanner_id(), schema_, 
&results));
+  ASSERT_EQ(kNumRows, results.size());
+  ASSERT_EQ(R"((int32 key=0, int32 int_val=0, string 
string_val="original0"))", results[0]);
+  ASSERT_EQ(R"((int32 key=99, int32 int_val=99, string 
string_val="original99"))", results[99]);
+
+  // Rescan the tablet to ensure READ_YOUR_WRITES mode can
+  // satisfy read-your-reads session guarantee.
+  ScanResponsePB new_resp;
+  ScanYourWritesTest(propagated_timestamp, &new_resp);
+  // Drain all the rows from the scanner.
+  results.clear();
+  ASSERT_NO_FATAL_FAILURE(DrainScannerToStrings(new_resp.scanner_id(), 
schema_, &results));
+  ASSERT_EQ(kNumRows, results.size());
+  ASSERT_EQ(R"((int32 key=0, int32 int_val=0, string 
string_val="original0"))", results[0]);
+  ASSERT_EQ(R"((int32 key=99, int32 int_val=99, string 
string_val="original99"))", results[99]);
+}
+
+// Tests that a read succeeds even without propagated_timestamp.
+TEST_F(TabletServerTest, TestScanYourWrites_WithoutPropagatedTimestamp) {
+  vector<uint64_t> write_timestamps_collector;
+  // Perform a write.
+  InsertTestRowsRemote(0, 1, 1, nullptr, kTabletId, 
&write_timestamps_collector);
+
+  ScanResponsePB resp;
+  ScanYourWritesTest(Timestamp::kMin.ToUint64(), &resp);
+}
+
+// Tests that a read succeeds even with a future propagated_timestamp. Also
+// tests that the clock was updated so that no writes will ever have a
+// timestamp before this snapshot.
+TEST_F(TabletServerTest, TestScanYourWrites_PropagatedTimestampInTheFuture) {
+  vector<uint64_t> write_timestamps_collector;
+  // Perform a write.
+  InsertTestRowsRemote(0, 1, 1, nullptr, kTabletId, 
&write_timestamps_collector);
+
+  ScanResponsePB resp;
+  // Increment the write timestamp by 5 secs: the server will definitely 
consider
+  // this in the future.
+  Timestamp propagated_timestamp(write_timestamps_collector[0]);
+  propagated_timestamp = HybridClock::TimestampFromMicroseconds(
+      HybridClock::GetPhysicalValueMicros(propagated_timestamp) + 5000000);
+  ScanYourWritesTest(propagated_timestamp.ToUint64(), &resp);
+
+  // Make sure the server's current clock returns a value that is larger than 
the
+  // propagated timestamp. It should have the same physical time, but higher
+  // logical time (due to various calls to clock.Now() when processing the 
request).
+  Timestamp now = mini_server_->server()->clock()->Now();
+
+  ASSERT_EQ(HybridClock::GetPhysicalValueMicros(propagated_timestamp),
+            HybridClock::GetPhysicalValueMicros(now));
+
+  ASSERT_GT(HybridClock::GetLogicalValue(now),
+            HybridClock::GetLogicalValue(propagated_timestamp));
+
+  vector<string> results;
+  NO_FATALS(DrainScannerToStrings(resp.scanner_id(), schema_, &results));
+  ASSERT_EQ(1, results.size());
+  ASSERT_EQ(R"((int32 key=0, int32 int_val=0, string 
string_val="original0"))", results[0]);
+}
+
 TEST_F(TabletServerTest, TestScanWithStringPredicates) {
   InsertTestRowsDirect(0, 100);
 
@@ -2088,13 +2172,19 @@ TEST_F(TabletServerTest, TestBadScannerID) {
 
 // Test passing a scanner ID, but also filling in some of the NewScanRequest
 // field.
-TEST_F(TabletServerTest, TestInvalidScanRequest_NewScanAndScannerID) {
+class InvalidScanRequest_NewScanAndScannerIDParamTest :
+    public TabletServerTest,
+    public ::testing::WithParamInterface<ReadMode> {
+};
+TEST_P(InvalidScanRequest_NewScanAndScannerIDParamTest, Test) {
+  const ReadMode mode = GetParam();
   ScanRequestPB req;
   ScanResponsePB resp;
   RpcController rpc;
 
   NewScanRequestPB* scan = req.mutable_new_scan_request();
   scan->set_tablet_id(kTabletId);
+  scan->set_read_mode(mode);
   req.set_batch_size_bytes(0); // so it won't return data right away
   req.set_scanner_id("x");
   SCOPED_TRACE(SecureDebugString(req));
@@ -2103,6 +2193,8 @@ TEST_F(TabletServerTest, 
TestInvalidScanRequest_NewScanAndScannerID) {
   ASSERT_STR_CONTAINS(s.ToString(), "Must not pass both a scanner_id and 
new_scan_request");
 }
 
+INSTANTIATE_TEST_CASE_P(Params, 
InvalidScanRequest_NewScanAndScannerIDParamTest,
+                        testing::ValuesIn(read_modes));
 
 // Test that passing a projection with fields not present in the tablet schema
 // throws an exception.
@@ -2157,7 +2249,11 @@ TEST_F(TabletServerTest, 
TestInvalidScanRequest_BadProjectionTypes) {
 // Test that passing a projection with Column IDs throws an exception.
 // Column IDs are assigned to the user request schema on the tablet server
 // based on the latest schema.
-TEST_F(TabletServerTest, TestInvalidScanRequest_WithIds) {
+class InvalidScanRequest_WithIdsParamTest :
+    public TabletServerTest,
+    public ::testing::WithParamInterface<ReadMode> {
+};
+TEST_P(InvalidScanRequest_WithIdsParamTest, Test) {
   const Schema* projection = tablet_replica_->tablet()->schema();
   ASSERT_TRUE(projection->has_column_ids());
   VerifyScanRequestFailure(*projection,
@@ -2165,6 +2261,9 @@ TEST_F(TabletServerTest, TestInvalidScanRequest_WithIds) {
                            "User requests should not have Column IDs");
 }
 
+INSTANTIATE_TEST_CASE_P(Params, InvalidScanRequest_WithIdsParamTest,
+                        testing::ValuesIn(read_modes));
+
 // Test scanning a tablet that has no entries.
 TEST_F(TabletServerTest, TestScan_NoResults) {
   ScanRequestPB req;
@@ -2192,7 +2291,12 @@ TEST_F(TabletServerTest, TestScan_NoResults) {
 }
 
 // Test scanning a tablet that has no entries.
-TEST_F(TabletServerTest, TestScan_InvalidScanSeqId) {
+class InvalidScanSeqIdParamTest :
+    public TabletServerTest,
+    public ::testing::WithParamInterface<ReadMode> {
+};
+TEST_P(InvalidScanSeqIdParamTest, Test) {
+  const ReadMode mode = GetParam();
   InsertTestRowsDirect(0, 10);
 
   ScanRequestPB req;
@@ -2204,6 +2308,7 @@ TEST_F(TabletServerTest, TestScan_InvalidScanSeqId) {
     const Schema& projection = schema_;
     NewScanRequestPB* scan = req.mutable_new_scan_request();
     scan->set_tablet_id(kTabletId);
+    scan->set_read_mode(mode);
     ASSERT_OK(SchemaToColumnPBs(projection, 
scan->mutable_projected_columns()));
     req.set_call_seq_id(0);
     req.set_batch_size_bytes(0); // so it won't return data right away
@@ -2233,6 +2338,9 @@ TEST_F(TabletServerTest, TestScan_InvalidScanSeqId) {
   }
 }
 
+INSTANTIATE_TEST_CASE_P(Params, InvalidScanSeqIdParamTest,
+                        testing::ValuesIn(read_modes));
+
 // Regression test for KUDU-1789: when ScannerKeepAlive is called on a 
non-existent
 // scanner, it should properly respond with an error.
 TEST_F(TabletServerTest, TestScan_KeepAliveExpiredScanner) {
@@ -2247,6 +2355,39 @@ TEST_F(TabletServerTest, 
TestScan_KeepAliveExpiredScanner) {
   ASSERT_EQ(resp.error().code(), TabletServerErrorPB::SCANNER_EXPIRED);
 }
 
+void TabletServerTest::ScanYourWritesTest(uint64_t propagated_timestamp,
+                                          ScanResponsePB* resp) {
+  ScanRequestPB req;
+
+  // Set up a new request with no predicates, all columns.
+  const Schema &projection = schema_;
+  NewScanRequestPB *scan = req.mutable_new_scan_request();
+  scan->set_tablet_id(kTabletId);
+  scan->set_read_mode(READ_YOUR_WRITES);
+  if (propagated_timestamp != Timestamp::kInvalidTimestamp.ToUint64()) {
+    scan->set_propagated_timestamp(propagated_timestamp);
+  }
+  ASSERT_OK(SchemaToColumnPBs(projection, scan->mutable_projected_columns()));
+  req.set_call_seq_id(0);
+  req.set_batch_size_bytes(0); // so it won't return data right away
+
+  {
+    RpcController rpc;
+    SCOPED_TRACE(SecureDebugString(req));
+    ASSERT_OK(proxy_->Scan(req, resp, &rpc));
+    SCOPED_TRACE(SecureDebugString(*resp));
+    ASSERT_FALSE(resp->has_error());
+  }
+
+  // Make sure that the chosen snapshot timestamp is sent back and
+  // it is larger than the previous propagation timestamp.
+  ASSERT_TRUE(resp->has_snap_timestamp());
+  ASSERT_LT(propagated_timestamp, resp->snap_timestamp());
+  // The 'propagated_timestamp' field must be set for 'success' responses.
+  ASSERT_TRUE(resp->has_propagated_timestamp());
+  ASSERT_TRUE(resp->has_more_results());
+}
+
 void TabletServerTest::DoOrderedScanTest(const Schema& projection,
                                          const string& 
expected_rows_as_string) {
   InsertTestRowsDirect(0, 10);

http://git-wip-us.apache.org/repos/asf/kudu/blob/723ced83/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc 
b/src/kudu/tserver/tablet_service.cc
index 0cbcc4c..4d79d29 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -1624,11 +1624,13 @@ static Status SetupScanSpec(const NewScanRequestPB& 
scan_pb,
 }
 
 namespace {
-// Checks if 'timestamp' is before the 'tablet's AHM if this is a 
READ_AT_SNAPSHOT scan.
-// Returns Status::OK() if it's not or Status::InvalidArgument() if it is.
+// Checks if 'timestamp' is before the tablet's AHM if this is a
+// READ_AT_SNAPSHOT/READ_YOUR_WRITES scan. Returns Status::OK() if it's
+// not or Status::InvalidArgument() if it is.
 Status VerifyNotAncientHistory(Tablet* tablet, ReadMode read_mode, Timestamp 
timestamp) {
   tablet::HistoryGcOpts history_gc_opts = tablet->GetHistoryGcOpts();
-  if (read_mode == READ_AT_SNAPSHOT && 
history_gc_opts.IsAncientHistory(timestamp)) {
+  if ((read_mode == READ_AT_SNAPSHOT || read_mode == READ_YOUR_WRITES) &&
+      history_gc_opts.IsAncientHistory(timestamp)) {
     return Status::InvalidArgument(
         Substitute("Snapshot timestamp is earlier than the ancient history 
mark. Consider "
                        "increasing the value of the configuration parameter "
@@ -1752,6 +1754,7 @@ Status 
TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
         s = tablet->NewRowIterator(projection, &iter);
         break;
       }
+      case READ_YOUR_WRITES: // Fallthrough intended
       case READ_AT_SNAPSHOT: {
         s = HandleScanAtSnapshot(scan_pb, rpc_context, projection, replica,
                                  &iter, snap_timestamp);
@@ -2032,59 +2035,24 @@ MonoTime ClampScanDeadlineForWait(const MonoTime& 
deadline, bool* was_clamped) {
 Status TabletServiceImpl::HandleScanAtSnapshot(const NewScanRequestPB& scan_pb,
                                                const RpcContext* rpc_context,
                                                const Schema& projection,
-                                               TabletReplica* replica,
+                                               TabletReplica* tablet_replica,
                                                gscoped_ptr<RowwiseIterator>* 
iter,
                                                Timestamp* snap_timestamp) {
-  // If the client sent a timestamp update our clock with it.
-  if (scan_pb.has_propagated_timestamp()) {
-    Timestamp propagated_timestamp(scan_pb.propagated_timestamp());
-
-    // Update the clock so that we never generate snapshots lower that
-    // 'propagated_timestamp'. If 'propagated_timestamp' is lower than
-    // 'now' this call has no effect. If 'propagated_timestamp' is too much
-    // into the future this will fail and we abort.
-    RETURN_NOT_OK(server_->clock()->Update(propagated_timestamp));
+  switch (scan_pb.read_mode()) {
+    case READ_AT_SNAPSHOT: // Fallthrough intended
+    case READ_YOUR_WRITES:
+      break;
+    default:
+      LOG(FATAL) << "Unsupported snapshot scan mode specified.";
   }
 
+  // Based on the read mode, pick a timestamp and verify it.
   Timestamp tmp_snap_timestamp;
-
-  // If the client provided no snapshot timestamp we take the current clock
-  // time as the snapshot timestamp.
-  if (!scan_pb.has_snap_timestamp()) {
-    tmp_snap_timestamp = server_->clock()->Now();
-  // ... else we use the client provided one, but make sure it is not too far
-  // in the future as to be invalid.
-  } else {
-
-    Timestamp max_allowed_ts;
-    Status s = server_->clock()->GetGlobalLatest(&max_allowed_ts);
-    if (s.IsNotSupported() &&
-        
PREDICT_TRUE(!FLAGS_scanner_allow_snapshot_scans_with_logical_timestamps)) {
-      return Status::NotSupported("Snapshot scans not supported on this 
server",
-                                  s.ToString());
-    }
-    tmp_snap_timestamp.FromUint64(scan_pb.snap_timestamp());
-
-    // Note: if 'max_allowed_ts' is not obtained from 
clock_->GetGlobalLatest() it's guaranteed
-    // to be higher than 'tmp_snap_timestamp'.
-    if (tmp_snap_timestamp > max_allowed_ts) {
-      return Status::InvalidArgument(
-          Substitute("Snapshot time $0 in the future. Max allowed timestamp is 
$1",
-                     server_->clock()->Stringify(tmp_snap_timestamp),
-                     server_->clock()->Stringify(max_allowed_ts)));
-    }
-  }
-
-  // Before we wait on anything check that the timestamp is after the AHM.
-  // This is not the final check. We'll check this again after the iterators 
are open but
-  // there is no point in waiting if we can't actually scan afterwards.
-  RETURN_NOT_OK(VerifyNotAncientHistory(replica->tablet(),
-                                        ReadMode::READ_AT_SNAPSHOT,
-                                        tmp_snap_timestamp));
+  RETURN_NOT_OK(PickAndVerifyTimestamp(scan_pb, tablet_replica->tablet(), 
&tmp_snap_timestamp));
 
   tablet::MvccSnapshot snap;
-  Tablet* tablet = replica->tablet();
-  scoped_refptr<consensus::TimeManager> time_manager = replica->time_manager();
+  Tablet* tablet = tablet_replica->tablet();
+  scoped_refptr<consensus::TimeManager> time_manager = 
tablet_replica->time_manager();
   tablet::MvccManager* mvcc_manager = tablet->mvcc_manager();
 
   // Reduce the client's deadline by a few msecs to allow for overhead.
@@ -2129,6 +2097,87 @@ Status TabletServiceImpl::HandleScanAtSnapshot(const 
NewScanRequestPB& scan_pb,
     return Status::InvalidArgument("Unknown order mode specified");
   }
   RETURN_NOT_OK(tablet->NewRowIterator(projection, snap, scan_pb.order_mode(), 
iter));
+
+  // Return the picked snapshot timestamp for both READ_AT_SNAPSHOT
+  // and READ_YOUR_WRITES mode.
+  *snap_timestamp = tmp_snap_timestamp;
+  return Status::OK();
+}
+
+Status TabletServiceImpl::ValidateTimestamp(const Timestamp& snap_timestamp) {
+  Timestamp max_allowed_ts;
+  Status s = server_->clock()->GetGlobalLatest(&max_allowed_ts);
+  if (PREDICT_FALSE(s.IsNotSupported()) &&
+      
PREDICT_TRUE(!FLAGS_scanner_allow_snapshot_scans_with_logical_timestamps)) {
+    return Status::NotSupported("Snapshot scans not supported on this server",
+                                s.ToString());
+  }
+
+  // Note: if 'max_allowed_ts' is not obtained from clock_->GetGlobalLatest(), 
e.g.,
+  // in case logical clock is used, it's guaranteed to be higher than 
'tmp_snap_timestamp',
+  // since 'max_allowed_ts' is default-constructed to kInvalidTimestamp 
(MAX_LONG - 1).
+  if (snap_timestamp > max_allowed_ts) {
+    return Status::InvalidArgument(
+        Substitute("Snapshot time $0 in the future. Max allowed timestamp is 
$1",
+                   server_->clock()->Stringify(snap_timestamp),
+                   server_->clock()->Stringify(max_allowed_ts)));
+  }
+
+  return Status::OK();
+}
+
+Status TabletServiceImpl::PickAndVerifyTimestamp(const NewScanRequestPB& 
scan_pb,
+                                                 Tablet* tablet,
+                                                 Timestamp* snap_timestamp) {
+  // If the client sent a timestamp update our clock with it.
+  if (scan_pb.has_propagated_timestamp()) {
+    Timestamp propagated_timestamp(scan_pb.propagated_timestamp());
+
+    // Update the clock so that we never generate snapshots lower than
+    // 'propagated_timestamp'. If 'propagated_timestamp' is lower than
+    // 'now' this call has no effect. If 'propagated_timestamp' is too far
+    // into the future this will fail and we abort.
+    RETURN_NOT_OK(server_->clock()->Update(propagated_timestamp));
+  }
+
+  Timestamp tmp_snap_timestamp;
+  ReadMode read_mode = scan_pb.read_mode();
+  tablet::MvccManager* mvcc_manager = tablet->mvcc_manager();
+
+  if (read_mode == READ_AT_SNAPSHOT) {
+    // For READ_AT_SNAPSHOT mode,
+    //   1) if the client provided no snapshot timestamp we take the current
+    //      clock time as the snapshot timestamp.
+    //   2) else we use the client provided one, but make sure it is not too
+    //      far in the future as to be invalid.
+    if (!scan_pb.has_snap_timestamp()) {
+      tmp_snap_timestamp = server_->clock()->Now();
+    } else {
+      tmp_snap_timestamp.FromUint64(scan_pb.snap_timestamp());
+      RETURN_NOT_OK(ValidateTimestamp(tmp_snap_timestamp));
+    }
+  } else {
+    // For READ_YOUR_WRITES mode, we use the following to choose a
+    // snapshot timestamp: MAX(propagated timestamp + 1, 'clean' timestamp).
+    // There is no need to validate if the chosen timestamp is too far in
+    // the future, since:
+    //   1) MVCC 'clean' timestamp is by definition in the past (it's maximally
+    //      bounded by safe time).
+    //   2) the propagated timestamp was used to update the clock above and the
+    //      update would have returned an error if the the timestamp was too
+    //      far in the future.
+    uint64_t clean_timestamp = mvcc_manager->GetCleanTimestamp().ToUint64();
+    uint64_t propagated_timestamp = scan_pb.has_propagated_timestamp() ?
+                                    scan_pb.propagated_timestamp() : 
Timestamp::kMin.ToUint64();
+    tmp_snap_timestamp = Timestamp(std::max(propagated_timestamp + 1, 
clean_timestamp));
+  }
+
+  // Before we wait on anything check that the timestamp is after the AHM.
+  // This is not the final check. We'll check this again after the iterators 
are open but
+  // there is no point in waiting if we can't actually scan afterwards.
+  RETURN_NOT_OK(VerifyNotAncientHistory(tablet,
+                                        read_mode,
+                                        tmp_snap_timestamp));
   *snap_timestamp = tmp_snap_timestamp;
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/723ced83/src/kudu/tserver/tablet_service.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.h 
b/src/kudu/tserver/tablet_service.h
index cb24cd3..37e6bf2 100644
--- a/src/kudu/tserver/tablet_service.h
+++ b/src/kudu/tserver/tablet_service.h
@@ -73,6 +73,7 @@ class RpcContext;
 } // namespace rpc
 
 namespace tablet {
+class Tablet;
 class TabletReplica;
 } // namespace tablet
 
@@ -151,6 +152,17 @@ class TabletServiceImpl : public TabletServerServiceIf {
                               gscoped_ptr<RowwiseIterator>* iter,
                               Timestamp* snap_timestamp);
 
+  // Validates the given timestamp is not so far in the future that
+  // it exceeds the maximum allowed clock synchronization error time,
+  // as such a timestamp is invalid.
+  Status ValidateTimestamp(const Timestamp& snap_timestamp);
+
+  // Pick a timestamp according to the scan mode, and verify that the
+  // timestamp is after the tablet's ancient history mark.
+  Status PickAndVerifyTimestamp(const NewScanRequestPB& scan_pb,
+                                tablet::Tablet* tablet,
+                                Timestamp* snap_timestamp);
+
   TabletServer* server_;
 };
 

Reply via email to