[c++] Reuse snapshot scan timestamp across tablets KUDU-1189 On reads at a snapshot that touch multiple tablets, without the user setting a timestamp, use the timestamp from the first server for following scans.
For a READ_AT_SNAPSHOT scan operation with no snapshot timestamp specified, store the snapshot timestamp returned from the first tablet server into the scan configuration object. Then reuse it when continuing the scan on other tablet servers operations performed at other tablet servers. Added corresponding unit test as well. Change-Id: I2ac708e38b8a80834f7d54eca294517cbfb06ec6 Reviewed-on: http://gerrit.cloudera.org:8080/5143 Reviewed-by: David Ribeiro Alves <[email protected]> Tested-by: Kudu Jenkins Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/71768bf4 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/71768bf4 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/71768bf4 Branch: refs/heads/master Commit: 71768bf465f42ff93cd26b631c445fb1c97151bc Parents: b3a3420 Author: Alexey Serbin <[email protected]> Authored: Mon Nov 21 10:40:42 2016 -0800 Committer: David Ribeiro Alves <[email protected]> Committed: Wed Nov 23 20:14:37 2016 +0000 ---------------------------------------------------------------------- src/kudu/client/client-test.cc | 67 ++++++++++++++++++++ src/kudu/client/client.h | 3 +- src/kudu/client/scanner-internal.cc | 11 +++- src/kudu/integration-tests/consistency-itest.cc | 5 +- 4 files changed, 79 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/71768bf4/src/kudu/client/client-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc index 3596bf0..58c48c6 100644 --- a/src/kudu/client/client-test.cc +++ b/src/kudu/client/client-test.cc @@ -3896,6 +3896,73 @@ TEST_F(ClientTest, TestScanAtLatestObservedTimestamp) { } } +// Peform a READ_AT_SNAPSHOT scan with no explicit snapshot timestamp specified +// and verify that the timestamp received in the first tablet server's response +// is set into the scan configuration and used for subsequent requests +// sent to other tablet servers hosting the table's data. +// Basically, this is a unit test for KUDU-1189. +TEST_F(ClientTest, TestReadAtSnapshotNoTimestampSet) { + // Number of tablets which host the tablet data. + static const size_t kTabletsNum = 3; + static const size_t kRowsPerTablet = 2; + + shared_ptr<KuduTable> table; + { + vector<unique_ptr<KuduPartialRow>> rows; + for (size_t i = 1; i < kTabletsNum; ++i) { + unique_ptr<KuduPartialRow> row(schema_.NewRow()); + CHECK_OK(row->SetInt32(0, i * kRowsPerTablet)); + rows.push_back(std::move(row)); + } + ASSERT_NO_FATAL_FAILURE(CreateTable("test_table", 1, + std::move(rows), {}, &table)); + // Insert some data into the table, so each tablet would get populated. + shared_ptr<KuduSession> session(client_->NewSession()); + ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); + for (size_t i = 0; i < kTabletsNum * kRowsPerTablet; ++i) { + gscoped_ptr<KuduInsert> insert(BuildTestRow(table.get(), i)); + ASSERT_OK(session->Apply(insert.release())); + } + FlushSessionOrDie(session); + } + + // Now, run a scan in READ_AT_SNAPSHOT mode with no timestamp specified. + // The scan should fetch all the inserted rows. Since it's a multi-tablet + // scan, in the process there should be one NextBatch() per tablet. + KuduScanner sc(table.get()); + ASSERT_OK(sc.SetReadMode(KuduScanner::READ_AT_SNAPSHOT)); + + // The scan configuration object should not contain the snapshot timestamp: + // since its a fresh scan object with no snashot timestamp set. + ASSERT_FALSE(sc.data_->configuration().has_snapshot_timestamp()); + + ASSERT_OK(sc.Open()); + // The reference timestamp. + ASSERT_TRUE(sc.data_->configuration().has_snapshot_timestamp()); + const uint64_t ts_ref = sc.data_->configuration().snapshot_timestamp(); + ASSERT_TRUE(sc.data_->last_response_.has_snap_timestamp()); + EXPECT_EQ(ts_ref, sc.data_->last_response_.snap_timestamp()); + + // On some of the KuduScanner::NextBatch() calls the client connects to the + // next tablet server and fetches rows from there. It's necessary to check + // that the initial timestamp received from the very first tablet server + // stays as in the scan configuration, with no modification. + size_t total_row_count = 0; + while (sc.HasMoreRows()) { + const uint64_t ts_pre = sc.data_->configuration().snapshot_timestamp(); + EXPECT_EQ(ts_ref, ts_pre); + + KuduScanBatch batch; + ASSERT_OK(sc.NextBatch(&batch)); + const size_t row_count = batch.NumRows(); + total_row_count += row_count; + + const uint64_t ts_post = sc.data_->configuration().snapshot_timestamp(); + EXPECT_EQ(ts_ref, ts_post); + } + EXPECT_EQ(kTabletsNum * kRowsPerTablet, total_row_count); +} + TEST_F(ClientTest, TestClonePredicates) { ASSERT_NO_FATAL_FAILURE(InsertTestRows(client_table_.get(), 2, 0)); http://git-wip-us.apache.org/repos/asf/kudu/blob/71768bf4/src/kudu/client/client.h ---------------------------------------------------------------------- diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h index 2d2d30b..9b72f97 100644 --- a/src/kudu/client/client.h +++ b/src/kudu/client/client.h @@ -1901,7 +1901,8 @@ class KUDU_EXPORT KuduScanner { FRIEND_TEST(ClientTest, TestScanFaultTolerance); FRIEND_TEST(ClientTest, TestScanNoBlockCaching); FRIEND_TEST(ClientTest, TestScanTimeout); - FRIEND_TEST(ConsistencyITest, DISABLED_TestSnapshotScanTimestampReuse); + FRIEND_TEST(ClientTest, TestReadAtSnapshotNoTimestampSet); + FRIEND_TEST(ConsistencyITest, TestSnapshotScanTimestampReuse); // Owned. Data* data_; http://git-wip-us.apache.org/repos/asf/kudu/blob/71768bf4/src/kudu/client/scanner-internal.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/scanner-internal.cc b/src/kudu/client/scanner-internal.cc index 129d0da..311a7bf 100644 --- a/src/kudu/client/scanner-internal.cc +++ b/src/kudu/client/scanner-internal.cc @@ -409,13 +409,20 @@ Status KuduScanner::Data::OpenTablet(const string& partition_key, // primary key. This is used when retrying the scan elsewhere. The last // primary key is also updated on each scan response. if (configuration().is_fault_tolerant()) { - CHECK(last_response_.has_snap_timestamp()); - configuration_.SetSnapshotRaw(last_response_.snap_timestamp()); if (last_response_.has_last_primary_key()) { last_primary_key_ = last_response_.last_primary_key(); } } + if (configuration_.read_mode() == KuduScanner::READ_AT_SNAPSHOT && + !configuration_.has_snapshot_timestamp()) { + // There must be a snapshot timestamp returned by the tablet server: + // it's the first response from the tablet server when scanning in the + // READ_AT_SNAPSHOT mode with unspecified snapshot timestamp. + CHECK(last_response_.has_snap_timestamp()); + configuration_.SetSnapshotRaw(last_response_.snap_timestamp()); + } + if (last_response_.has_propagated_timestamp()) { table_->client()->data_->UpdateLatestObservedTimestamp( last_response_.propagated_timestamp()); http://git-wip-us.apache.org/repos/asf/kudu/blob/71768bf4/src/kudu/integration-tests/consistency-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/consistency-itest.cc b/src/kudu/integration-tests/consistency-itest.cc index 3c6476c..45efeb9 100644 --- a/src/kudu/integration-tests/consistency-itest.cc +++ b/src/kudu/integration-tests/consistency-itest.cc @@ -250,9 +250,6 @@ class ConsistencyITest : public MiniClusterITestBase { // in the row history if T1 and T2 are performed in different servers, // as T2 can be assigned a timestamp that is lower than T1. // -// The test is disabled because it fails due to the absense of timestamp -// propagation. Remove the 'DISABLED_' prefix, recompile and run if needed. -// // The scenario to expose inconsistency when not propagating the timestamp // for the scan operations: // @@ -396,7 +393,7 @@ TEST_F(ConsistencyITest, TestTimestampPropagationFromScans) { // the first tablet and difference in server clocks, not all rows would // be visible the the scan. // -TEST_F(ConsistencyITest, DISABLED_TestSnapshotScanTimestampReuse) { +TEST_F(ConsistencyITest, TestSnapshotScanTimestampReuse) { const int32_t offset_usec = FLAGS_max_clock_sync_error_usec / 2; // Assuming the offset is specified as a positive number. ASSERT_GT(offset_usec, 0);
