KUDU-1189 integration test for reusing snap timestamp Added a test for reusing snapshot timestamp when not set while running scans in READ_AT_SNAPSHOT mode. This is a test for the functionality introduced in the context of KUDU-1189.
The test is disabled as it currently fails. A follow up patch will fix the bug and enable the test. Change-Id: I7282976580cc15ef330871a838bbf7e46230ceb6 Reviewed-on: http://gerrit.cloudera.org:8080/5163 Reviewed-by: David Ribeiro Alves <[email protected]> Tested-by: Kudu Jenkins Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/b3a3420a Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/b3a3420a Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/b3a3420a Branch: refs/heads/master Commit: b3a3420af5bce6ec25ebcb6ed544456cbc3a988f Parents: 61d22e2 Author: Alexey Serbin <[email protected]> Authored: Mon Nov 21 10:11:12 2016 -0800 Committer: David Ribeiro Alves <[email protected]> Committed: Wed Nov 23 20:14:30 2016 +0000 ---------------------------------------------------------------------- src/kudu/client/client.h | 1 + src/kudu/integration-tests/consistency-itest.cc | 226 ++++++++++++++++--- 2 files changed, 201 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/b3a3420a/src/kudu/client/client.h ---------------------------------------------------------------------- diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h index 62d631f..2d2d30b 100644 --- a/src/kudu/client/client.h +++ b/src/kudu/client/client.h @@ -1901,6 +1901,7 @@ class KUDU_EXPORT KuduScanner { FRIEND_TEST(ClientTest, TestScanFaultTolerance); FRIEND_TEST(ClientTest, TestScanNoBlockCaching); FRIEND_TEST(ClientTest, TestScanTimeout); + FRIEND_TEST(ConsistencyITest, DISABLED_TestSnapshotScanTimestampReuse); // Owned. Data* data_; http://git-wip-us.apache.org/repos/asf/kudu/blob/b3a3420a/src/kudu/integration-tests/consistency-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/consistency-itest.cc b/src/kudu/integration-tests/consistency-itest.cc index 4ff31c1..3c6476c 100644 --- a/src/kudu/integration-tests/consistency-itest.cc +++ b/src/kudu/integration-tests/consistency-itest.cc @@ -25,6 +25,8 @@ #include "kudu/client/client.h" #include "kudu/client/client-test-util.h" +#include "kudu/client/scanner-internal.h" +#include "kudu/client/scan_configuration.h" #include "kudu/client/shared_ptr.h" #include "kudu/common/partial_row.h" #include "kudu/gutil/stringprintf.h" @@ -48,9 +50,11 @@ DECLARE_bool(enable_data_block_fsync); DECLARE_int32(heartbeat_interval_ms); +DECLARE_int32(max_clock_sync_error_usec); DECLARE_int32(scanner_gc_check_interval_us); DECLARE_bool(use_mock_wall_clock); +using kudu::client::ScanConfiguration; using kudu::client::sp::shared_ptr; using kudu::master::CatalogManager; using kudu::master::GetTableLocationsRequestPB; @@ -68,9 +72,9 @@ namespace kudu { namespace client { -class TimestampPropagationTest : public MiniClusterITestBase { +class ConsistencyITest : public MiniClusterITestBase { public: - TimestampPropagationTest() + ConsistencyITest() : num_tablet_servers_(2), table_name_("timestamp_propagation_test_table"), key_column_name_("key"), @@ -157,16 +161,13 @@ class TimestampPropagationTest : public MiniClusterITestBase { return Status::OK(); } - Status GetTabletIdForKeyValue(int32_t key_value_begin, - int32_t key_value_end, - const string& table_name, - vector<string>* tablet_ids) { - if (!tablet_ids) { - return Status::InvalidArgument("null output container"); + Status GetTabletIdForKey(int32_t key_value, string* tablet_id) { + if (!tablet_id) { + return Status::InvalidArgument("null output string"); } - tablet_ids->clear(); + const int32_t key_value_begin = key_value; + const int32_t key_value_end = key_value_begin + 1; - // Find the tablet for the first range (i.e. for the rows to be inserted). unique_ptr<KuduPartialRow> split_row_start(schema_.NewRow()); RETURN_NOT_OK(split_row_start->SetInt32(0, key_value_begin)); string partition_key_start; @@ -178,7 +179,7 @@ class TimestampPropagationTest : public MiniClusterITestBase { RETURN_NOT_OK(split_row_end->EncodeRowKey(&partition_key_end)); GetTableLocationsRequestPB req; - req.mutable_table()->set_table_name(table_name); + req.mutable_table()->set_table_name(table_name_); req.set_partition_key_start(partition_key_start); req.set_partition_key_end(partition_key_end); master::CatalogManager* catalog = @@ -187,9 +188,14 @@ class TimestampPropagationTest : public MiniClusterITestBase { CatalogManager::ScopedLeaderSharedLock l(catalog); RETURN_NOT_OK(l.first_failed_status()); RETURN_NOT_OK(catalog->GetTableLocations(&req, &resp)); - for (size_t i = 0; i < resp.tablet_locations_size(); ++i) { - tablet_ids->emplace_back(resp.tablet_locations(i).tablet_id()); + if (resp.tablet_locations_size() < 1) { + return Status::NotFound(Substitute("$0: no tablets for key", key_value)); } + if (resp.tablet_locations_size() > 1) { + return Status::IllegalState( + Substitute("$0: multiple tablet servers for key", key_value)); + } + *tablet_id = resp.tablet_locations(0).tablet_id(); return Status::OK(); } @@ -217,6 +223,17 @@ class TimestampPropagationTest : public MiniClusterITestBase { return Status::OK(); } + Status UpdateClockForTabletHostingKey(int32_t key, const MonoDelta& offset) { + string tablet_id; + RETURN_NOT_OK(GetTabletIdForKey(key, &tablet_id)); + scoped_refptr<TabletPeer> p; + RETURN_NOT_OK(FindPeerForTablet(tablet_id, &p)); + + HybridClock* clock = CHECK_NOTNULL(dynamic_cast<HybridClock*>(p->clock())); + UpdateClock(clock, offset); + return Status::OK(); + } + const size_t num_tablet_servers_; const string table_name_; const string key_column_name_; @@ -266,7 +283,7 @@ class TimestampPropagationTest : public MiniClusterITestBase { // behind Ta server's time, and scanning at Tb's write time would not // include the rows inserted into Ta. // -TEST_F(TimestampPropagationTest, TwoBatchesAndReadAtSnapshot) { +TEST_F(ConsistencyITest, TestTimestampPropagationFromScans) { uint64_t ts_a; { shared_ptr<KuduClient> client; @@ -275,18 +292,10 @@ TEST_F(TimestampPropagationTest, TwoBatchesAndReadAtSnapshot) { shared_ptr<KuduTable> table; ASSERT_OK(client->OpenTable(table_name_, &table)); - // Find the tablet for the first range (i.e. for the rows to be inserted). - vector<string> tablet_ids; - ASSERT_OK(GetTabletIdForKeyValue(0, key_split_value_ - 1, // first range - table_name_, &tablet_ids)); - ASSERT_EQ(1, tablet_ids.size()); - scoped_refptr<TabletPeer> peer; - ASSERT_OK(FindPeerForTablet(tablet_ids[0], &peer)); - - // Advance tablet server's clock. - HybridClock* clock = dynamic_cast<HybridClock*>(peer->clock()); - ASSERT_NE(nullptr, clock) << "unexpected clock for tablet server"; - UpdateClock(clock, MonoDelta::FromMilliseconds(100)); + // Advance tablet server's clock hosting the first key range + // (i.e. for the row which is about to be inserted below). + ASSERT_OK(UpdateClockForTabletHostingKey( + 0, MonoDelta::FromMilliseconds(100))); // Insert data into the first tablet (a.k.a. Ta). ASSERT_OK(InsertTestRows(client.get(), table.get(), key_split_value_, 0)); @@ -345,5 +354,170 @@ TEST_F(TimestampPropagationTest, TwoBatchesAndReadAtSnapshot) { } } +// This is a test for KUDU-1189. It verifies that in case of a READ_AT_SNAPSHOT +// scan with unspecified snapshot timestamp, the scanner picks timestamp from +// the first server that the data is read from. If the scan spans multiple +// tablets, the timestamp picked when scanning the first tablet is then used +// when scanning following tablets. +// +// The idea of the test is simple: have a scan spanned across two tablets +// where the clocks of the corresponding tablet servers are skewed. The scenario +// is as following: +// +// 1. Create a table which spans across two tablets. +// +// 2. Run the first scenario: +// * Advance the clock of the second tablet's server. +// * Create a client object and with it: +// ** Insert a row into the first tablet. +// ** Insert a row into the second tablet. +// * Discard the client object. +// * Create a new client object and perform a scan at READ_AT_SNAPSHOT +// mode, no timestamp specified. +// * Given the tight timings on the after-the-insert scan and difference in +// server clocks, there should only one row in the result if the snapshot +// timestamp is taken from the first server. Otherwise, if the snapshot +// timestamp was taken from the second server, both rows would be visible +// for the scan. +// * Discard the client object. +// +// 3. Run the second scenario: +// * Advance the clock of the first tablet's server, so the clock of the +// first tablet is ahead of the clock of the second one. +// * Create a client object and with it: +// ** Insert an additional row into the first tablet. +// * Discard the client object. +// * Create a new client object and perform a scan at READ_AT_SNAPSHOT +// mode, no timestamp specified. +// * All the inserted rows should be visible to the scan because we +// expect the snapshot timestamp to be taken from the first tablet +// server. If the snapshot timestamp was taken from the second server, +// given the tight timings on the scan following the prior insert into +// the first tablet and difference in server clocks, not all rows would +// be visible the the scan. +// +TEST_F(ConsistencyITest, DISABLED_TestSnapshotScanTimestampReuse) { + const int32_t offset_usec = FLAGS_max_clock_sync_error_usec / 2; + // Assuming the offset is specified as a positive number. + ASSERT_GT(offset_usec, 0); + // Need to have two rows in the first partition; the values start at 0. + ASSERT_LT(2, key_split_value_); + + // Prepare the setup: create a proper disposition for tablet servers' clocks + // and populate the table with appropriate data. + { + shared_ptr<KuduClient> client; + ASSERT_OK(cluster_->CreateClient(nullptr, &client)); + ASSERT_OK(CreateTable(client.get(), table_name_)); + // Advance second partition's tablet server clock. + ASSERT_OK(UpdateClockForTabletHostingKey( + key_split_value_, MonoDelta::FromMicroseconds(offset_usec))); + shared_ptr<KuduTable> table; + ASSERT_OK(client->OpenTable(table_name_, &table)); + // Insert a row into the first tablet. + ASSERT_OK(InsertTestRows(client.get(), table.get(), 1, 0)); + // Insert a row into the second tablet. + ASSERT_OK(InsertTestRows(client.get(), table.get(), 1, key_split_value_)); + } + + // Discarding the prior client object: if using it to perform scans, due + // to the scan timestamp propagation the lagging tablet server's clock + // would be advanced and it was not possible to distinguish between + // the timestamps coming from the first and the second tablet servers. + + // Now, perform the scan at READ_AT_SNAPSHOT where a timestamp is not + // specified: make sure the snapshot timestamp is taken from the first tablet + // server among those the data was fetched from. For this scenario, perform + // a scan which would try to fetch all the table's data + // (i.e. make calls to all tablet servers which host table's data). + { + shared_ptr<KuduClient> client; + ASSERT_OK(cluster_->CreateClient(nullptr, &client)); + // Scan the table at a snapshot: let the servers pick the timestamp. + shared_ptr<KuduTable> table; + ASSERT_OK(client->OpenTable(table_name_, &table)); + KuduScanner scanner(table.get()); + ASSERT_OK(scanner.SetReadMode(KuduScanner::READ_AT_SNAPSHOT)); + const ScanConfiguration& cfg(scanner.data_->configuration()); + ASSERT_FALSE(cfg.has_snapshot_timestamp()); + + size_t row_count; + ASSERT_OK(CountRowsWithRetries(&scanner, &row_count)); + + // At this point, we have inserted 2 rows in total, where the second row + // was inserted into the tablet which server's clock was advanced + // (i.e. shifted into the future). We are expecting to get the timestamp + // for the scan from the first tablet server, so the second row should not + // be visible at that timestamp: from the second tablet server's view, + // it was inserted after the specified timestamp. Instead, if the timestamp + // for the scan were sampled at the second server's clock, then both rows + // would be visible to the scan. + ASSERT_EQ(1UL, row_count); + ASSERT_TRUE(cfg.has_snapshot_timestamp()); + } + + // Advance the clock of the first server even further, leaving the clock + // of the second server behind. Also, insert an additional row into the first + // tablet. + { + // Find the tablet for the first range to advance its server's clock. + ASSERT_OK(UpdateClockForTabletHostingKey( + 0, MonoDelta::FromMicroseconds(2 * offset_usec))); + shared_ptr<KuduClient> client; + ASSERT_OK(cluster_->CreateClient(nullptr, &client)); + shared_ptr<KuduTable> table; + ASSERT_OK(client->OpenTable(table_name_, &table)); + // Insert an additional row into the first tablet. + // This is to check that the timestamp is taken from the first tablet + // server: since now the clocks of both tablet servers are ahead of the + // timestamps of the inserted rows so far, there would be no way to tell + // which server's clock is used for the scan using the number of rows + // returned by the scan. In either case, there will be two rows. + // + // Now, once we add a new row into the first tablet, given the big time + // margin provided by the current clock offset, we should see different + // outcomes from the subsequent scan: + // * if the timestamp is taken from the first server, there should be + // three rows in the result + // * if the timestamp is taken from the second server, there should be + // just two rows in the result + ASSERT_OK(InsertTestRows(client.get(), table.get(), 1, 1)); + } + + // Scan the table again and make sure the snapshot scan's timestamp is taken + // from the first tablet server, as before. However, now the clock of the + // first tablet server is ahead of the second tablet server's clock. If the + // timestamp was taken from the second server, there would be 2 rows + // in the result. The expected result is 3 rows, since the timestamp should + // be taken from the first server. + { + shared_ptr<KuduClient> client; + ASSERT_OK(cluster_->CreateClient(nullptr, &client)); + // Scan the table at snapshot: let the servers pick the timestamp. + shared_ptr<KuduTable> table; + ASSERT_OK(client->OpenTable(table_name_, &table)); + KuduScanner scanner(table.get()); + ASSERT_OK(scanner.SetReadMode(KuduScanner::READ_AT_SNAPSHOT)); + const ScanConfiguration& cfg(scanner.data_->configuration()); + // Check the snapshot timestamp is unset -- it's a fresh object for a + // READ_AT_SNAPSHOT scan where the snapshot timestamp is not specified + // explicitly. + ASSERT_FALSE(cfg.has_snapshot_timestamp()); + + size_t row_count; + ASSERT_OK(CountRowsWithRetries(&scanner, &row_count)); + + // At this point, we have inserted 3 rows in total. Since the snapshot + // timestamp is taken from the first server's clock, all 3 rows should be + // visible to the scan at that timestamp. Given the tight timings on the + // after-the-intsert scan and difference in server clocks, that would not be + // the case if the snapshot was taken from the second server. + ASSERT_EQ(3UL, row_count); + // Check that the timestamp returned by the tablet server is set into the + // scan configuration. + ASSERT_TRUE(cfg.has_snapshot_timestamp()); + } +} + } // namespace client } // namespace kudu
