KUDU-1189 integration test for reusing snap timestamp

Added a test for reusing snapshot timestamp when not set while running
scans in READ_AT_SNAPSHOT mode. This is a test for the functionality
introduced in the context of KUDU-1189.

The test is disabled as it currently fails. A follow up patch will fix
the bug and enable the test.

Change-Id: I7282976580cc15ef330871a838bbf7e46230ceb6
Reviewed-on: http://gerrit.cloudera.org:8080/5163
Reviewed-by: David Ribeiro Alves <[email protected]>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/b3a3420a
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/b3a3420a
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/b3a3420a

Branch: refs/heads/master
Commit: b3a3420af5bce6ec25ebcb6ed544456cbc3a988f
Parents: 61d22e2
Author: Alexey Serbin <[email protected]>
Authored: Mon Nov 21 10:11:12 2016 -0800
Committer: David Ribeiro Alves <[email protected]>
Committed: Wed Nov 23 20:14:30 2016 +0000

----------------------------------------------------------------------
 src/kudu/client/client.h                        |   1 +
 src/kudu/integration-tests/consistency-itest.cc | 226 ++++++++++++++++---
 2 files changed, 201 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/b3a3420a/src/kudu/client/client.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 62d631f..2d2d30b 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -1901,6 +1901,7 @@ class KUDU_EXPORT KuduScanner {
   FRIEND_TEST(ClientTest, TestScanFaultTolerance);
   FRIEND_TEST(ClientTest, TestScanNoBlockCaching);
   FRIEND_TEST(ClientTest, TestScanTimeout);
+  FRIEND_TEST(ConsistencyITest, DISABLED_TestSnapshotScanTimestampReuse);
 
   // Owned.
   Data* data_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/b3a3420a/src/kudu/integration-tests/consistency-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/consistency-itest.cc 
b/src/kudu/integration-tests/consistency-itest.cc
index 4ff31c1..3c6476c 100644
--- a/src/kudu/integration-tests/consistency-itest.cc
+++ b/src/kudu/integration-tests/consistency-itest.cc
@@ -25,6 +25,8 @@
 
 #include "kudu/client/client.h"
 #include "kudu/client/client-test-util.h"
+#include "kudu/client/scanner-internal.h"
+#include "kudu/client/scan_configuration.h"
 #include "kudu/client/shared_ptr.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/gutil/stringprintf.h"
@@ -48,9 +50,11 @@
 
 DECLARE_bool(enable_data_block_fsync);
 DECLARE_int32(heartbeat_interval_ms);
+DECLARE_int32(max_clock_sync_error_usec);
 DECLARE_int32(scanner_gc_check_interval_us);
 DECLARE_bool(use_mock_wall_clock);
 
+using kudu::client::ScanConfiguration;
 using kudu::client::sp::shared_ptr;
 using kudu::master::CatalogManager;
 using kudu::master::GetTableLocationsRequestPB;
@@ -68,9 +72,9 @@ namespace kudu {
 namespace client {
 
 
-class TimestampPropagationTest : public MiniClusterITestBase {
+class ConsistencyITest : public MiniClusterITestBase {
  public:
-  TimestampPropagationTest()
+  ConsistencyITest()
       : num_tablet_servers_(2),
         table_name_("timestamp_propagation_test_table"),
         key_column_name_("key"),
@@ -157,16 +161,13 @@ class TimestampPropagationTest : public 
MiniClusterITestBase {
     return Status::OK();
   }
 
-  Status GetTabletIdForKeyValue(int32_t key_value_begin,
-                                int32_t key_value_end,
-                                const string& table_name,
-                                vector<string>* tablet_ids) {
-    if (!tablet_ids) {
-      return Status::InvalidArgument("null output container");
+  Status GetTabletIdForKey(int32_t key_value, string* tablet_id) {
+    if (!tablet_id) {
+      return Status::InvalidArgument("null output string");
     }
-    tablet_ids->clear();
+    const int32_t key_value_begin = key_value;
+    const int32_t key_value_end = key_value_begin + 1;
 
-    // Find the tablet for the first range (i.e. for the rows to be inserted).
     unique_ptr<KuduPartialRow> split_row_start(schema_.NewRow());
     RETURN_NOT_OK(split_row_start->SetInt32(0, key_value_begin));
     string partition_key_start;
@@ -178,7 +179,7 @@ class TimestampPropagationTest : public 
MiniClusterITestBase {
     RETURN_NOT_OK(split_row_end->EncodeRowKey(&partition_key_end));
 
     GetTableLocationsRequestPB req;
-    req.mutable_table()->set_table_name(table_name);
+    req.mutable_table()->set_table_name(table_name_);
     req.set_partition_key_start(partition_key_start);
     req.set_partition_key_end(partition_key_end);
     master::CatalogManager* catalog =
@@ -187,9 +188,14 @@ class TimestampPropagationTest : public 
MiniClusterITestBase {
     CatalogManager::ScopedLeaderSharedLock l(catalog);
     RETURN_NOT_OK(l.first_failed_status());
     RETURN_NOT_OK(catalog->GetTableLocations(&req, &resp));
-    for (size_t i = 0; i < resp.tablet_locations_size(); ++i) {
-      tablet_ids->emplace_back(resp.tablet_locations(i).tablet_id());
+    if (resp.tablet_locations_size() < 1) {
+      return Status::NotFound(Substitute("$0: no tablets for key", key_value));
     }
+    if (resp.tablet_locations_size() > 1) {
+      return Status::IllegalState(
+          Substitute("$0: multiple tablet servers for key", key_value));
+    }
+    *tablet_id = resp.tablet_locations(0).tablet_id();
 
     return Status::OK();
   }
@@ -217,6 +223,17 @@ class TimestampPropagationTest : public 
MiniClusterITestBase {
     return Status::OK();
   }
 
+  Status UpdateClockForTabletHostingKey(int32_t key, const MonoDelta& offset) {
+    string tablet_id;
+    RETURN_NOT_OK(GetTabletIdForKey(key, &tablet_id));
+    scoped_refptr<TabletPeer> p;
+    RETURN_NOT_OK(FindPeerForTablet(tablet_id, &p));
+
+    HybridClock* clock = CHECK_NOTNULL(dynamic_cast<HybridClock*>(p->clock()));
+    UpdateClock(clock, offset);
+    return Status::OK();
+  }
+
   const size_t num_tablet_servers_;
   const string table_name_;
   const string key_column_name_;
@@ -266,7 +283,7 @@ class TimestampPropagationTest : public 
MiniClusterITestBase {
 //     behind Ta server's time, and scanning at Tb's write time would not
 //     include the rows inserted into Ta.
 //
-TEST_F(TimestampPropagationTest, TwoBatchesAndReadAtSnapshot) {
+TEST_F(ConsistencyITest, TestTimestampPropagationFromScans) {
   uint64_t ts_a;
   {
     shared_ptr<KuduClient> client;
@@ -275,18 +292,10 @@ TEST_F(TimestampPropagationTest, 
TwoBatchesAndReadAtSnapshot) {
     shared_ptr<KuduTable> table;
     ASSERT_OK(client->OpenTable(table_name_, &table));
 
-    // Find the tablet for the first range (i.e. for the rows to be inserted).
-    vector<string> tablet_ids;
-    ASSERT_OK(GetTabletIdForKeyValue(0, key_split_value_ - 1, // first range
-                                     table_name_, &tablet_ids));
-    ASSERT_EQ(1, tablet_ids.size());
-    scoped_refptr<TabletPeer> peer;
-    ASSERT_OK(FindPeerForTablet(tablet_ids[0], &peer));
-
-    // Advance tablet server's clock.
-    HybridClock* clock = dynamic_cast<HybridClock*>(peer->clock());
-    ASSERT_NE(nullptr, clock) << "unexpected clock for tablet server";
-    UpdateClock(clock, MonoDelta::FromMilliseconds(100));
+    // Advance tablet server's clock hosting the first key range
+    // (i.e. for the row which is about to be inserted below).
+    ASSERT_OK(UpdateClockForTabletHostingKey(
+        0, MonoDelta::FromMilliseconds(100)));
 
     // Insert data into the first tablet (a.k.a. Ta).
     ASSERT_OK(InsertTestRows(client.get(), table.get(), key_split_value_, 0));
@@ -345,5 +354,170 @@ TEST_F(TimestampPropagationTest, 
TwoBatchesAndReadAtSnapshot) {
   }
 }
 
+// This is a test for KUDU-1189. It verifies that in case of a READ_AT_SNAPSHOT
+// scan with unspecified snapshot timestamp, the scanner picks timestamp from
+// the first server that the data is read from. If the scan spans multiple
+// tablets, the timestamp picked when scanning the first tablet is then used
+// when scanning following tablets.
+//
+// The idea of the test is simple: have a scan spanned across two tablets
+// where the clocks of the corresponding tablet servers are skewed. The 
scenario
+// is as following:
+//
+//   1. Create a table which spans across two tablets.
+//
+//   2. Run the first scenario:
+//      * Advance the clock of the second tablet's server.
+//      * Create a client object and with it:
+//        ** Insert a row into the first tablet.
+//        ** Insert a row into the second tablet.
+//      * Discard the client object.
+//      * Create a new client object and perform a scan at READ_AT_SNAPSHOT
+//        mode, no timestamp specified.
+//      * Given the tight timings on the after-the-insert scan and difference 
in
+//        server clocks, there should only one row in the result if the 
snapshot
+//        timestamp is taken from the first server. Otherwise, if the snapshot
+//        timestamp was taken from the second server, both rows would be 
visible
+//        for the scan.
+//      * Discard the client object.
+//
+//   3. Run the second scenario:
+//      * Advance the clock of the first tablet's server, so the clock of the
+//        first tablet is ahead of the clock of the second one.
+//      * Create a client object and with it:
+//        ** Insert an additional row into the first tablet.
+//      * Discard the client object.
+//      * Create a new client object and perform a scan at READ_AT_SNAPSHOT
+//        mode, no timestamp specified.
+//      * All the inserted rows should be visible to the scan because we
+//        expect the snapshot timestamp to be taken from the first tablet
+//        server. If the snapshot timestamp was taken from the second server,
+//        given the tight timings on the scan following the prior insert into
+//        the first tablet and difference in server clocks, not all rows would
+//        be visible the the scan.
+//
+TEST_F(ConsistencyITest, DISABLED_TestSnapshotScanTimestampReuse) {
+  const int32_t offset_usec = FLAGS_max_clock_sync_error_usec / 2;
+  // Assuming the offset is specified as a positive number.
+  ASSERT_GT(offset_usec, 0);
+  // Need to have two rows in the first partition; the values start at 0.
+  ASSERT_LT(2, key_split_value_);
+
+  // Prepare the setup: create a proper disposition for tablet servers' clocks
+  // and populate the table with appropriate data.
+  {
+    shared_ptr<KuduClient> client;
+    ASSERT_OK(cluster_->CreateClient(nullptr, &client));
+    ASSERT_OK(CreateTable(client.get(), table_name_));
+    // Advance second partition's tablet server clock.
+    ASSERT_OK(UpdateClockForTabletHostingKey(
+        key_split_value_, MonoDelta::FromMicroseconds(offset_usec)));
+    shared_ptr<KuduTable> table;
+    ASSERT_OK(client->OpenTable(table_name_, &table));
+    // Insert a row into the first tablet.
+    ASSERT_OK(InsertTestRows(client.get(), table.get(), 1, 0));
+    // Insert a row into the second tablet.
+    ASSERT_OK(InsertTestRows(client.get(), table.get(), 1, key_split_value_));
+  }
+
+  // Discarding the prior client object: if using it to perform scans, due
+  // to the scan timestamp propagation the lagging tablet server's clock
+  // would be advanced and it was not possible to distinguish between
+  // the timestamps coming from the first and the second tablet servers.
+
+  // Now, perform the scan at READ_AT_SNAPSHOT where a timestamp is not
+  // specified: make sure the snapshot timestamp is taken from the first tablet
+  // server among those the data was fetched from. For this scenario, perform
+  // a scan which would try to fetch all the table's data
+  // (i.e. make calls to all tablet servers which host table's data).
+  {
+    shared_ptr<KuduClient> client;
+    ASSERT_OK(cluster_->CreateClient(nullptr, &client));
+    // Scan the table at a snapshot: let the servers pick the timestamp.
+    shared_ptr<KuduTable> table;
+    ASSERT_OK(client->OpenTable(table_name_, &table));
+    KuduScanner scanner(table.get());
+    ASSERT_OK(scanner.SetReadMode(KuduScanner::READ_AT_SNAPSHOT));
+    const ScanConfiguration& cfg(scanner.data_->configuration());
+    ASSERT_FALSE(cfg.has_snapshot_timestamp());
+
+    size_t row_count;
+    ASSERT_OK(CountRowsWithRetries(&scanner, &row_count));
+
+    // At this point, we have inserted 2 rows in total, where the second row
+    // was inserted into the tablet which server's clock was advanced
+    // (i.e. shifted into the future). We are expecting to get the timestamp
+    // for the scan from the first tablet server, so the second row should not
+    // be visible at that timestamp: from the second tablet server's view,
+    // it was inserted after the specified timestamp. Instead, if the timestamp
+    // for the scan were sampled at the second server's clock, then both rows
+    // would be visible to the scan.
+    ASSERT_EQ(1UL, row_count);
+    ASSERT_TRUE(cfg.has_snapshot_timestamp());
+  }
+
+  // Advance the clock of the first server even further, leaving the clock
+  // of the second server behind. Also, insert an additional row into the first
+  // tablet.
+  {
+    // Find the tablet for the first range to advance its server's clock.
+    ASSERT_OK(UpdateClockForTabletHostingKey(
+        0, MonoDelta::FromMicroseconds(2 * offset_usec)));
+    shared_ptr<KuduClient> client;
+    ASSERT_OK(cluster_->CreateClient(nullptr, &client));
+    shared_ptr<KuduTable> table;
+    ASSERT_OK(client->OpenTable(table_name_, &table));
+    // Insert an additional row into the first tablet.
+    // This is to check that the timestamp is taken from the first tablet
+    // server: since now the clocks of both tablet servers are ahead of the
+    // timestamps of the inserted rows so far, there would be no way to tell
+    // which server's clock is used for the scan using the number of rows
+    // returned by the scan. In either case, there will be two rows.
+    //
+    // Now, once we add a new row into the first tablet, given the big time
+    // margin provided by the current clock offset, we should see different
+    // outcomes from the subsequent scan:
+    //   * if the timestamp is taken from the first server, there should be
+    //     three rows in the result
+    //   * if the timestamp is taken from the second server, there should be
+    //     just two rows in the result
+    ASSERT_OK(InsertTestRows(client.get(), table.get(), 1, 1));
+  }
+
+  // Scan the table again and make sure the snapshot scan's timestamp is taken
+  // from the first tablet server, as before. However, now the clock of the
+  // first tablet server is ahead of the second tablet server's clock. If the
+  // timestamp was taken from the second server, there would be 2 rows
+  // in the result. The expected result is 3 rows, since the timestamp should
+  // be taken from the first server.
+  {
+    shared_ptr<KuduClient> client;
+    ASSERT_OK(cluster_->CreateClient(nullptr, &client));
+    // Scan the table at snapshot: let the servers pick the timestamp.
+    shared_ptr<KuduTable> table;
+    ASSERT_OK(client->OpenTable(table_name_, &table));
+    KuduScanner scanner(table.get());
+    ASSERT_OK(scanner.SetReadMode(KuduScanner::READ_AT_SNAPSHOT));
+    const ScanConfiguration& cfg(scanner.data_->configuration());
+    // Check the snapshot timestamp is unset -- it's a fresh object for a
+    // READ_AT_SNAPSHOT scan where the snapshot timestamp is not specified
+    // explicitly.
+    ASSERT_FALSE(cfg.has_snapshot_timestamp());
+
+    size_t row_count;
+    ASSERT_OK(CountRowsWithRetries(&scanner, &row_count));
+
+    // At this point, we have inserted 3 rows in total. Since the snapshot
+    // timestamp is taken from the first server's clock, all 3 rows should be
+    // visible to the scan at that timestamp. Given the tight timings on the
+    // after-the-intsert scan and difference in server clocks, that would not 
be
+    // the case if the snapshot was taken from the second server.
+    ASSERT_EQ(3UL, row_count);
+    // Check that the timestamp returned by the tablet server is set into the
+    // scan configuration.
+    ASSERT_TRUE(cfg.has_snapshot_timestamp());
+  }
+}
+
 } // namespace client
 } // namespace kudu

Reply via email to