KUDU-420 [c++ client] timestamp propagation via scan tokens Implemented server timestamp propagation via scan tokens for the Kudu C++ client library. Added corresponding unit test as well.
Besides, this changelist also enables the ConsistencyITest.TestScanTokenTimestampPropagation test: this patch brings the necessary fix to make it pass. This is in the context of the following JIRA item: KUDU-420 c++ client: implement HT timestamp propagation via scan tokens Change-Id: I5c76c20b62cb91695c69f7dc4b98f4dc98bf02cc Reviewed-on: http://gerrit.cloudera.org:8080/5220 Reviewed-by: David Ribeiro Alves <[email protected]> Tested-by: Kudu Jenkins Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/d81aa80a Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/d81aa80a Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/d81aa80a Branch: refs/heads/master Commit: d81aa80a98bcec4b928e4a50c3c0a95f09bb57af Parents: 68dc1ff Author: Alexey Serbin <[email protected]> Authored: Thu Nov 24 14:33:01 2016 -0800 Committer: Alexey Serbin <[email protected]> Committed: Wed Nov 30 21:33:34 2016 +0000 ---------------------------------------------------------------------- src/kudu/client/client.h | 1 + src/kudu/client/scan_token-internal.cc | 10 +-- src/kudu/client/scan_token-internal.h | 1 - src/kudu/client/scan_token-test.cc | 80 ++++++++++++++++++++ src/kudu/integration-tests/consistency-itest.cc | 2 +- 5 files changed, 86 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/d81aa80a/src/kudu/client/client.h ---------------------------------------------------------------------- diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h index 9b72f97..506a19f 100644 --- a/src/kudu/client/client.h +++ b/src/kudu/client/client.h @@ -464,6 +464,7 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> { friend class ClientTest; friend class KuduClientBuilder; friend class KuduScanner; + friend class KuduScanToken; friend class KuduScanTokenBuilder; friend class KuduSession; friend class KuduTable; http://git-wip-us.apache.org/repos/asf/kudu/blob/d81aa80a/src/kudu/client/scan_token-internal.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/scan_token-internal.cc b/src/kudu/client/scan_token-internal.cc index c0d327d..678b915 100644 --- a/src/kudu/client/scan_token-internal.cc +++ b/src/kudu/client/scan_token-internal.cc @@ -51,9 +51,6 @@ KuduScanToken::Data::Data(KuduTable* table, tablet_(std::move(tablet)) { } -KuduScanToken::Data::~Data() { -} - Status KuduScanToken::Data::IntoKuduScanner(KuduScanner** scanner) const { return PBIntoScanner(table_->client(), message_, scanner); } @@ -154,12 +151,12 @@ Status KuduScanToken::Data::PBIntoScanner(KuduClient* client, RETURN_NOT_OK(scan_builder->SetSnapshotRaw(message.snap_timestamp())); } + RETURN_NOT_OK(scan_builder->SetCacheBlocks(message.cache_blocks())); + if (message.has_propagated_timestamp()) { - // TODO(KUDU-420) + client->data_->UpdateLatestObservedTimestamp(message.propagated_timestamp()); } - RETURN_NOT_OK(scan_builder->SetCacheBlocks(message.cache_blocks())); - *scanner = scan_builder.release(); return Status::OK(); } @@ -219,6 +216,7 @@ Status KuduScanTokenBuilder::Data::Build(vector<KuduScanToken*>* tokens) { pb.set_cache_blocks(configuration_.spec().cache_blocks()); pb.set_fault_tolerant(configuration_.is_fault_tolerant()); + pb.set_propagated_timestamp(client->GetLatestObservedTimestamp()); MonoTime deadline = MonoTime::Now() + client->default_admin_operation_timeout(); http://git-wip-us.apache.org/repos/asf/kudu/blob/d81aa80a/src/kudu/client/scan_token-internal.h ---------------------------------------------------------------------- diff --git a/src/kudu/client/scan_token-internal.h b/src/kudu/client/scan_token-internal.h index 65bef42..dd26517 100644 --- a/src/kudu/client/scan_token-internal.h +++ b/src/kudu/client/scan_token-internal.h @@ -33,7 +33,6 @@ class KuduScanToken::Data { explicit Data(KuduTable* table, ScanTokenPB message, std::unique_ptr<KuduTablet> tablet); - ~Data(); Status IntoKuduScanner(KuduScanner** scanner) const; http://git-wip-us.apache.org/repos/asf/kudu/blob/d81aa80a/src/kudu/client/scan_token-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/scan_token-test.cc b/src/kudu/client/scan_token-test.cc index 2a26b69..8875315 100644 --- a/src/kudu/client/scan_token-test.cc +++ b/src/kudu/client/scan_token-test.cc @@ -23,6 +23,7 @@ #include <vector> #include "kudu/client/client.h" +#include "kudu/client/client.pb.h" #include "kudu/gutil/stl_util.h" #include "kudu/integration-tests/mini_cluster.h" #include "kudu/tserver/mini_tablet_server.h" @@ -322,5 +323,84 @@ TEST_F(ScanTokenTest, TestScanTokensWithNonCoveringRange) { } } +// When building a scanner from a serialized scan token, +// verify that the propagated timestamp from the token makes its way into the +// latest observed timestamp of the client object. +TEST_F(ScanTokenTest, TestTimestampPropagation) { + static const string kTableName = "p_ts_table"; + + // Create a table to work with: + // * Deserializing a scan token into a scanner requires the table to exist. + // * Creating a scan token requires the table to exist. + shared_ptr<KuduTable> table; + { + static const string kKeyColumnName = "c_key"; + KuduSchema schema; + { + KuduSchemaBuilder builder; + builder.AddColumn(kKeyColumnName)->NotNull()-> + Type(KuduColumnSchema::INT64)->PrimaryKey(); + ASSERT_OK(builder.Build(&schema)); + } + + { + unique_ptr<KuduPartialRow> split(schema.NewRow()); + ASSERT_OK(split->SetInt64(kKeyColumnName, 0)); + unique_ptr<client::KuduTableCreator> creator(client_->NewTableCreator()); + ASSERT_OK(creator->table_name(kTableName) + .schema(&schema) + .add_hash_partitions({ kKeyColumnName }, 2) + .split_rows({ split.release() }) + .num_replicas(1) + .Create()); + } + } + + // Deserialize a scan token and make sure the client's last observed timestamp + // is updated accordingly. + { + const uint64_t ts_prev = client_->GetLatestObservedTimestamp(); + const uint64_t ts_propagated = ts_prev + 1000000; + + ScanTokenPB pb; + pb.set_table_name(kTableName); + pb.set_read_mode(::kudu::READ_AT_SNAPSHOT); + pb.set_propagated_timestamp(ts_propagated); + const string serialized_token = pb.SerializeAsString(); + EXPECT_EQ(ts_prev, client_->GetLatestObservedTimestamp()); + + KuduScanner* scanner_raw; + ASSERT_OK(KuduScanToken::DeserializeIntoScanner(client_.get(), + serialized_token, + &scanner_raw)); + // The caller of the DeserializeIntoScanner() is responsible for + // de-allocating the result scanner object. + unique_ptr<KuduScanner> scanner(scanner_raw); + EXPECT_EQ(ts_propagated, client_->GetLatestObservedTimestamp()); + } + + // Build the set of scan tokens for the table, serialize them and + // make sure the serialized tokens contain the propagated timestamp. + { + ASSERT_OK(client_->OpenTable(kTableName, &table)); + const uint64_t ts_prev = client_->GetLatestObservedTimestamp(); + const uint64_t ts_propagated = ts_prev + 1000000; + + client_->SetLatestObservedTimestamp(ts_propagated); + vector<KuduScanToken*> tokens; + ElementDeleter deleter(&tokens); + ASSERT_OK(KuduScanTokenBuilder(table.get()).Build(&tokens)); + for (const auto* t : tokens) { + string serialized_token; + ASSERT_OK(t->Serialize(&serialized_token)); + + ScanTokenPB pb; + ASSERT_TRUE(pb.ParseFromString(serialized_token)); + ASSERT_TRUE(pb.has_propagated_timestamp()); + EXPECT_EQ(ts_propagated, pb.propagated_timestamp()); + } + } +} + } // namespace client } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/d81aa80a/src/kudu/integration-tests/consistency-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/consistency-itest.cc b/src/kudu/integration-tests/consistency-itest.cc index 9d2a870..a69f128 100644 --- a/src/kudu/integration-tests/consistency-itest.cc +++ b/src/kudu/integration-tests/consistency-itest.cc @@ -538,7 +538,7 @@ TEST_F(ConsistencyITest, TestSnapshotScanTimestampReuse) { // with the specified timestamp to retrieve the data: it should observe // a timestamp which is not less than the propagated timestamp // encoded in the token. -TEST_F(ConsistencyITest, DISABLED_TestScanTokenTimestampPropagation) { +TEST_F(ConsistencyITest, TestScanTokenTimestampPropagation) { const int32_t offset_usec = FLAGS_max_clock_sync_error_usec; // Need to have at least one row in the first partition with
