KUDU-420 [i-tests] scan token timestamp propagation test Added an integration test to verify timestamp propagation via scan tokens for C++ client.
The test is disabled as it currently fails. A follow up patch will fix the bug and enable the test. This is in the context of the following JIRA item: KUDU-420 c++ client: implement HT timestamp propagation via scan tokens Change-Id: I47cd067248f4a26c4605f075ec5ee30da71f6f30 Reviewed-on: http://gerrit.cloudera.org:8080/5219 Reviewed-by: David Ribeiro Alves <[email protected]> Tested-by: Kudu Jenkins Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/68dc1ffe Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/68dc1ffe Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/68dc1ffe Branch: refs/heads/master Commit: 68dc1ffeff2f6ad7ad8654da0df66162cb286cca Parents: b4764f9 Author: Alexey Serbin <[email protected]> Authored: Thu Nov 24 14:24:57 2016 -0800 Committer: Alexey Serbin <[email protected]> Committed: Wed Nov 30 21:33:27 2016 +0000 ---------------------------------------------------------------------- src/kudu/integration-tests/consistency-itest.cc | 111 ++++++++++++++++++- 1 file changed, 107 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/68dc1ffe/src/kudu/integration-tests/consistency-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/consistency-itest.cc b/src/kudu/integration-tests/consistency-itest.cc index 45efeb9..9d2a870 100644 --- a/src/kudu/integration-tests/consistency-itest.cc +++ b/src/kudu/integration-tests/consistency-itest.cc @@ -295,7 +295,8 @@ TEST_F(ConsistencyITest, TestTimestampPropagationFromScans) { 0, MonoDelta::FromMilliseconds(100))); // Insert data into the first tablet (a.k.a. Ta). - ASSERT_OK(InsertTestRows(client.get(), table.get(), key_split_value_, 0)); + const int rows_num = key_split_value_; // fill in the partition completely + ASSERT_OK(InsertTestRows(client.get(), table.get(), rows_num, 0)); size_t row_count; ASSERT_OK(GetRowCount(table.get(), KuduScanner::READ_LATEST, 0, &row_count)); @@ -324,8 +325,9 @@ TEST_F(ConsistencyITest, TestTimestampPropagationFromScans) { // Inserting data into the second tablet (a.k.a. Tb): using the second // key range partition. + const int rows_num = key_split_value_; ASSERT_OK(InsertTestRows(client.get(), table.get(), - key_split_value_, key_split_value_)); + rows_num, key_split_value_)); // Retrieve the latest observed timestamp. ts_b = client->GetLatestObservedTimestamp(); } @@ -358,8 +360,8 @@ TEST_F(ConsistencyITest, TestTimestampPropagationFromScans) { // when scanning following tablets. // // The idea of the test is simple: have a scan spanned across two tablets -// where the clocks of the corresponding tablet servers are skewed. The scenario -// is as following: +// where the clocks of the corresponding tablet servers are skewed. The sequence +// of actions is as following: // // 1. Create a table which spans across two tablets. // @@ -516,5 +518,106 @@ TEST_F(ConsistencyITest, TestSnapshotScanTimestampReuse) { } } +// Verify that the propagated timestamp from a serialized scan token +// makes its way into corresponding tablet servers while performing a scan +// operation built from the token. +// +// The real-world use-cases behind this test assume a Kudu client (C++/Java) +// can get scan tokens for a scan operation, serialize those and pass them +// to the other Kudu client (C++/Java). Since de-serializing a scan token +// propagates the latest observed timestamp, the latter client will have +// the latest observed timestamp set accordingly if it de-serializes those +// scan tokens into corresponding scan operations. +// +// The test scenario uses a table split into two tablets, each hosted by a +// tablet server. The clock of the first tablet server is shifted into the +// future. The first client inserts a row into the first tablet. Then it creates +// a scan token to retrieve some "related" data from the second +// tablet hosted by the second server. Now, another client receives the +// serialized scan token and runs corresponding READ_AT_SNAPSHOT scan +// with the specified timestamp to retrieve the data: it should observe +// a timestamp which is not less than the propagated timestamp +// encoded in the token. +TEST_F(ConsistencyITest, DISABLED_TestScanTokenTimestampPropagation) { + const int32_t offset_usec = FLAGS_max_clock_sync_error_usec; + + // Need to have at least one row in the first partition with + // values starting at 0. + ASSERT_GE(key_split_value_, 1); + + { + // Prepare the setup: create a proper disposition for tablet servers' clocks + // and populate the table with initial data. + shared_ptr<KuduClient> client; + ASSERT_OK(cluster_->CreateClient(nullptr, &client)); + ASSERT_OK(CreateTable(client.get(), table_name_)); + shared_ptr<KuduTable> table; + ASSERT_OK(client->OpenTable(table_name_, &table)); + + // Insert a single row into the second tablet: it's necessary to get + // non-empty scan in the verification phase of the test. + ASSERT_OK(InsertTestRows(client.get(), table.get(), 1, key_split_value_)); + } + + uint64_t ts_ref; + string scan_token; + { + shared_ptr<KuduClient> client; + ASSERT_OK(cluster_->CreateClient(nullptr, &client)); + + // Advance the clock of the server hosting the first partition tablet. + const int32_t row_key = 0; + ASSERT_OK(UpdateClockForTabletHostingKey( + row_key, MonoDelta::FromMicroseconds(offset_usec))); + shared_ptr<KuduTable> table; + ASSERT_OK(client->OpenTable(table_name_, &table)); + // Insert just a single row into the first tablet. + ASSERT_OK(InsertTestRows(client.get(), table.get(), 1, row_key)); + ts_ref = client->GetLatestObservedTimestamp(); + + // Create and serialize a scan token: the scan selects a row by its key + // from the other tablet at the timestamp at which the first row was + // inserted. + vector<KuduScanToken*> tokens; + ElementDeleter deleter(&tokens); + KuduScanTokenBuilder builder(table.get()); + ASSERT_OK(builder.SetReadMode(KuduScanner::READ_AT_SNAPSHOT)); + unique_ptr<KuduPredicate> predicate(table->NewComparisonPredicate( + key_column_name_, + KuduPredicate::EQUAL, + KuduValue::FromInt(key_split_value_))); + ASSERT_OK(builder.AddConjunctPredicate(predicate.release())); + ASSERT_OK(builder.Build(&tokens)); + ASSERT_EQ(1, tokens.size()); + ASSERT_OK(tokens[0]->Serialize(&scan_token)); + } + + // The other client: scan the second tablet using a scanner built from + // the serialized scanner token. If the client propagates timestamp from the + // de-serialized scan token, upon fetching a batch of rows the client + // should observe timestamp not less than the reference propagated timestamp + // encoded in the token. + { + shared_ptr<KuduClient> client; + ASSERT_OK(cluster_->CreateClient(nullptr, &client)); + shared_ptr<KuduTable> table; + ASSERT_OK(client->OpenTable(table_name_, &table)); + KuduScanner* scanner_raw; + ASSERT_OK(KuduScanToken::DeserializeIntoScanner(client.get(), scan_token, + &scanner_raw)); + unique_ptr<KuduScanner> scanner(scanner_raw); + ASSERT_OK(scanner->Open()); + ASSERT_TRUE(scanner->HasMoreRows()); + size_t row_count = 0; + while (scanner->HasMoreRows()) { + KuduScanBatch batch; + ASSERT_OK(scanner->NextBatch(&batch)); + row_count += batch.NumRows(); + ASSERT_LE(ts_ref, client->GetLatestObservedTimestamp()); + } + EXPECT_EQ(1, row_count); + } +} + } // namespace client } // namespace kudu
