Repository: kudu Updated Branches: refs/heads/master 7327c28b2 -> 80923d690
[tests] MANUAL_FLUSH --> AUTO_FLUSH_BACKGROUND In tests, run KuduSession in AUTO_FLUSH_BACKGROUND instead of MANUAL_FLUSH mode where appropriate. Change-Id: Ieafc198609cceb5d6945a910364056d81786629a Reviewed-on: http://gerrit.cloudera.org:8080/4471 Reviewed-by: David Ribeiro Alves <[email protected]> Tested-by: Kudu Jenkins Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/bc14b2f9 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/bc14b2f9 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/bc14b2f9 Branch: refs/heads/master Commit: bc14b2f9d775c9f27f2e2be36d4b03080977e8fa Parents: 7327c28 Author: Alexey Serbin <[email protected]> Authored: Mon Sep 19 17:15:13 2016 -0700 Committer: Alexey Serbin <[email protected]> Committed: Wed Sep 21 19:09:55 2016 +0000 ---------------------------------------------------------------------- src/kudu/client/client-test.cc | 8 +-- src/kudu/client/predicate-test.cc | 7 +- src/kudu/client/scan_token-test.cc | 18 ++--- src/kudu/integration-tests/all_types-itest.cc | 34 ++++++---- src/kudu/integration-tests/alter_table-test.cc | 71 +++++++++++--------- .../integration-tests/client_failover-itest.cc | 3 +- .../flex_partitioning-itest.cc | 6 +- .../integration-tests/linked_list-test-util.h | 12 ++-- .../tablet_history_gc-itest.cc | 9 ++- src/kudu/integration-tests/test_workload.cc | 13 +--- src/kudu/tools/ksck_remote-test.cc | 11 ++- 11 files changed, 91 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/bc14b2f9/src/kudu/client/client-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc index 6c08cbf..8da6d88 100644 --- a/src/kudu/client/client-test.cc +++ b/src/kudu/client/client-test.cc @@ -270,7 +270,7 @@ class ClientTest : public KuduTest { // Inserts 'num_rows' test rows using 'client' void InsertTestRows(KuduClient* client, KuduTable* table, int num_rows, int first_row = 0) { shared_ptr<KuduSession> session = client->NewSession(); - ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); + ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND)); session->SetTimeoutMillis(60000); for (int i = first_row; i < num_rows + first_row; i++) { gscoped_ptr<KuduInsert> insert(BuildTestRow(table, i)); @@ -287,7 +287,7 @@ class ClientTest : public KuduTest { void UpdateTestRows(KuduTable* table, int lo, int hi) { shared_ptr<KuduSession> session = client_->NewSession(); - ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); + ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND)); session->SetTimeoutMillis(10000); for (int i = lo; i < hi; i++) { gscoped_ptr<KuduUpdate> update(UpdateTestRow(table, i)); @@ -299,7 +299,7 @@ class ClientTest : public KuduTest { void DeleteTestRows(KuduTable* table, int lo, int hi) { shared_ptr<KuduSession> session = client_->NewSession(); - ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); + ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND)); session->SetTimeoutMillis(10000); for (int i = lo; i < hi; i++) { gscoped_ptr<KuduDelete> del(DeleteTestRow(table, i)); @@ -3498,7 +3498,7 @@ TEST_F(ClientTest, TestDeadlockSimulation) { const int kTimeoutMillis = 60000; shared_ptr<KuduSession> session = client_->NewSession(); session->SetTimeoutMillis(kTimeoutMillis); - ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); + ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND)); for (int i = 0; i < kNumRows; ++i) ASSERT_OK(ApplyInsertToSession(session.get(), client_table_, i, i, "")); FlushSessionOrDie(session); http://git-wip-us.apache.org/repos/asf/kudu/blob/bc14b2f9/src/kudu/client/predicate-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/predicate-test.cc b/src/kudu/client/predicate-test.cc index 3be0939..7a4e253 100644 --- a/src/kudu/client/predicate-test.cc +++ b/src/kudu/client/predicate-test.cc @@ -17,12 +17,13 @@ #include <algorithm> #include <cmath> -#include <gtest/gtest.h> #include <limits> #include <memory> #include <string> #include <vector> +#include <gtest/gtest.h> + #include "kudu/client/client.h" #include "kudu/gutil/stringprintf.h" #include "kudu/gutil/strings/escaping.h" @@ -75,11 +76,11 @@ class PredicateTest : public KuduTest { return table; } - // Creates a new session in manual flush mode. + // Creates a new session in automatic background flush mode. shared_ptr<KuduSession> CreateSession() { shared_ptr<KuduSession> session = client_->NewSession(); session->SetTimeoutMillis(10000); - CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); + CHECK_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND)); return session; } http://git-wip-us.apache.org/repos/asf/kudu/blob/bc14b2f9/src/kudu/client/scan_token-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/scan_token-test.cc b/src/kudu/client/scan_token-test.cc index 02a943a..4f1a251 100644 --- a/src/kudu/client/scan_token-test.cc +++ b/src/kudu/client/scan_token-test.cc @@ -52,14 +52,6 @@ class ScanTokenTest : public KuduTest { ASSERT_OK(cluster_->CreateClient(nullptr, &client_)); } - // Creates a new session in manual flush mode. - shared_ptr<KuduSession> CreateSession() { - shared_ptr<KuduSession> session = client_->NewSession(); - session->SetTimeoutMillis(10000); - CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); - return session; - } - // Count the rows in a table which satisfy the specified predicates. Simulates // a central query planner / remote task execution by creating a thread per // token, each with a new client. @@ -70,7 +62,7 @@ class ScanTokenTest : public KuduTest { string buf; CHECK_OK(token->Serialize(&buf)); - threads.emplace_back(thread([this, &rows] (string serialized_token) { + threads.emplace_back([this, &rows] (string serialized_token) { shared_ptr<KuduClient> client; ASSERT_OK(cluster_->CreateClient(nullptr, &client)); KuduScanner* scanner_ptr; @@ -78,7 +70,7 @@ class ScanTokenTest : public KuduTest { serialized_token, &scanner_ptr)); unique_ptr<KuduScanner> scanner(scanner_ptr); - scanner->Open(); + ASSERT_OK(scanner->Open()); while (scanner->HasMoreRows()) { KuduScanBatch batch; @@ -86,7 +78,7 @@ class ScanTokenTest : public KuduTest { rows += batch.NumRows(); } scanner->Close(); - }, std::move(buf))); + }, std::move(buf)); } for (thread& thread : threads) { @@ -152,7 +144,7 @@ TEST_F(ScanTokenTest, TestScanTokens) { // Create session shared_ptr<KuduSession> session = client_->NewSession(); session->SetTimeoutMillis(10000); - ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); + ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND)); // Insert rows for (int i = -100; i < 100; i++) { @@ -259,7 +251,7 @@ TEST_F(ScanTokenTest, TestScanTokensWithNonCoveringRange) { // Create session shared_ptr<KuduSession> session = client_->NewSession(); session->SetTimeoutMillis(10000); - ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); + ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND)); // Insert rows for (int i = 0; i < 100; i++) { http://git-wip-us.apache.org/repos/asf/kudu/blob/bc14b2f9/src/kudu/integration-tests/all_types-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/all_types-itest.cc b/src/kudu/integration-tests/all_types-itest.cc index 6dbfe36..70d5b9a 100644 --- a/src/kudu/integration-tests/all_types-itest.cc +++ b/src/kudu/integration-tests/all_types-itest.cc @@ -15,9 +15,11 @@ // specific language governing permissions and limitations // under the License. -#include <gtest/gtest.h> #include <vector> +#include <gflags/gflags.h> +#include <gtest/gtest.h> + #include "kudu/gutil/strings/substitute.h" #include "kudu/client/row_result.h" #include "kudu/common/wire_protocol-test-util.h" @@ -212,16 +214,25 @@ class AllTypesItest : public KuduTest { } Status CreateCluster() { - vector<string> ts_flags; - // Set the flush threshold low so that we have flushes and test the on-disk formats. - ts_flags.push_back("--flush_threshold_mb=1"); - // Set the major delta compaction ratio low enough that we trigger a lot of them. - ts_flags.push_back("--tablet_delta_store_major_compact_min_ratio=0.001"); + static const vector<string> kTsFlags = { + // Set the flush threshold low so that we have flushes and test the on-disk formats. + "--flush_threshold_mb=1", + + // Set the major delta compaction ratio low enough that we trigger a lot of them. + "--tablet_delta_store_major_compact_min_ratio=0.001", + + // TODO(KUDU-1346) Remove custom consensus_max_batch_size_bytes setting + // once KUDU-1346 is fixed. It's necessary to change the default + // value of the consensus_max_batch_size_bytes flag to avoid + // triggering debug assert when a relatively big chunk of write operations + // is flushed to the tablet server. + "--consensus_max_batch_size_bytes=2097152", + }; ExternalMiniClusterOptions opts; opts.num_tablet_servers = kNumTabletServers; - for (const std::string& flag : ts_flags) { + for (const std::string& flag : kTsFlags) { opts.extra_tserver_flags.push_back(flag); } @@ -276,17 +287,14 @@ class AllTypesItest : public KuduTest { // ended up in the right place. Status InsertRows() { shared_ptr<KuduSession> session = client_->NewSession(); - RETURN_NOT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); - int max_rows_per_tablet = setup_.GetRowsPerTablet(); + RETURN_NOT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND)); + const int max_rows_per_tablet = setup_.GetRowsPerTablet(); for (int i = 0; i < kNumTablets; ++i) { for (int j = 0; j < max_rows_per_tablet; ++j) { RETURN_NOT_OK(GenerateRow(session.get(), i, j)); - if (j % 1000 == 0) { - RETURN_NOT_OK(session->Flush()); - } } - RETURN_NOT_OK(session->Flush()); } + RETURN_NOT_OK(session->Flush()); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/kudu/blob/bc14b2f9/src/kudu/integration-tests/alter_table-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/alter_table-test.cc b/src/kudu/integration-tests/alter_table-test.cc index 0acacd9..3f36269 100644 --- a/src/kudu/integration-tests/alter_table-test.cc +++ b/src/kudu/integration-tests/alter_table-test.cc @@ -15,14 +15,16 @@ // specific language governing permissions and limitations // under the License. -#include <boost/bind.hpp> -#include <gflags/gflags.h> -#include <gtest/gtest.h> +#include <atomic> #include <map> #include <memory> #include <string> #include <utility> +#include <boost/bind.hpp> +#include <gflags/gflags.h> +#include <gtest/gtest.h> + #include "kudu/client/client.h" #include "kudu/client/client-test-util.h" #include "kudu/client/row_result.h" @@ -41,7 +43,6 @@ #include "kudu/tserver/mini_tablet_server.h" #include "kudu/tserver/tablet_server.h" #include "kudu/tserver/ts_tablet_manager.h" -#include "kudu/util/atomic.h" #include "kudu/util/faststring.h" #include "kudu/util/random.h" #include "kudu/util/stopwatch.h" @@ -62,7 +63,6 @@ using client::KuduColumnSchema; using client::KuduError; using client::KuduInsert; using client::KuduRowResult; -using client::KuduScanBatch; using client::KuduScanner; using client::KuduSchema; using client::KuduSchemaBuilder; @@ -75,19 +75,19 @@ using client::KuduValue; using client::sp::shared_ptr; using master::AlterTableRequestPB; using master::AlterTableResponsePB; -using master::MiniMaster; +using std::atomic; using std::map; using std::pair; using std::unique_ptr; using std::vector; using tablet::TabletPeer; -using tserver::MiniTabletServer; class AlterTableTest : public KuduTest { public: AlterTableTest() - : stop_threads_(false), - inserted_idx_(0) { + : stop_threads_(false), + next_idx_(0), + update_ops_cnt_(0) { KuduSchemaBuilder b; b.AddColumn("c0")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey(); @@ -257,12 +257,14 @@ class AlterTableTest : public KuduTest { scoped_refptr<TabletPeer> tablet_peer_; - AtomicBool stop_threads_; + atomic<bool> stop_threads_; - // The index of the last row inserted by InserterThread. - // UpdaterThread uses this to figure out which rows can be - // safely updated. - AtomicInt<int32_t> inserted_idx_; + // The index of the next row to be inserted by the InserterThread. + // The UpdaterThread uses this to figure out which rows can be safely updated. + atomic<uint32_t> next_idx_; + + // Number of update operations issues by the UpdaterThread so far. + atomic<uint32_t> update_ops_cnt_; }; // Subclass which creates three servers and a replicated cluster. @@ -426,7 +428,7 @@ TEST_F(AlterTableTest, TestGetSchemaAfterAlterTable) { void AlterTableTest::InsertRows(int start_row, int num_rows) { shared_ptr<KuduSession> session = client_->NewSession(); shared_ptr<KuduTable> table; - CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); + CHECK_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND)); session->SetTimeoutMillis(15 * 1000); CHECK_OK(client_->OpenTable(kTableName, &table)); @@ -456,7 +458,7 @@ void AlterTableTest::InsertRows(int start_row, int num_rows) { Status AlterTableTest::InsertRowsSequential(const string& table_name, int start_row, int num_rows) { shared_ptr<KuduSession> session = client_->NewSession(); shared_ptr<KuduTable> table; - RETURN_NOT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); + RETURN_NOT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND)); session->SetTimeoutMillis(15 * 1000); RETURN_NOT_OK(client_->OpenTable(table_name, &table)); @@ -487,7 +489,7 @@ void AlterTableTest::UpdateRow(int32_t row_key, shared_ptr<KuduSession> session = client_->NewSession(); shared_ptr<KuduTable> table; CHECK_OK(client_->OpenTable(kTableName, &table)); - CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); + CHECK_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND)); session->SetTimeoutMillis(15 * 1000); unique_ptr<KuduUpdate> update(table->NewUpdate()); int32_t key = bswap_32(row_key); // endian swap to match 'InsertRows' @@ -852,18 +854,18 @@ TEST_F(AlterTableTest, TestMajorCompactDeltasAfterAddUpdateRemoveColumn) { } // Thread which inserts rows into the table. -// After each batch of rows is inserted, inserted_idx_ is updated +// After each batch of rows is inserted, next_idx_ is updated // to communicate how much data has been written (and should now be // updateable) void AlterTableTest::InserterThread() { shared_ptr<KuduSession> session = client_->NewSession(); - shared_ptr<KuduTable> table; CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); session->SetTimeoutMillis(15 * 1000); + shared_ptr<KuduTable> table; CHECK_OK(client_->OpenTable(kTableName, &table)); - int32_t i = 0; - while (!stop_threads_.Load()) { + uint32_t i = 0; + while (!stop_threads_) { unique_ptr<KuduInsert> insert(table->NewInsert()); // Endian-swap the key so that we spew inserts randomly // instead of just a sequential write pattern. This way @@ -875,30 +877,28 @@ void AlterTableTest::InserterThread() { if (i % 50 == 0) { FlushSessionOrDie(session); - inserted_idx_.Store(i); + next_idx_ = i; } } FlushSessionOrDie(session); - inserted_idx_.Store(i); + next_idx_ = i; } // Thread which follows behind the InserterThread and generates random // updates across the previously inserted rows. void AlterTableTest::UpdaterThread() { shared_ptr<KuduSession> session = client_->NewSession(); - shared_ptr<KuduTable> table; CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); session->SetTimeoutMillis(15 * 1000); + shared_ptr<KuduTable> table; CHECK_OK(client_->OpenTable(kTableName, &table)); Random rng(1); - int32_t i = 0; - while (!stop_threads_.Load()) { - unique_ptr<KuduUpdate> update(table->NewUpdate()); - - int32_t max = inserted_idx_.Load(); + uint32_t i = 0; + while (!stop_threads_) { + const uint32_t max = next_idx_; if (max == 0) { // Inserter hasn't inserted anything yet, so we have nothing to update. SleepFor(MonoDelta::FromMicroseconds(100)); @@ -906,10 +906,12 @@ void AlterTableTest::UpdaterThread() { } // Endian-swap the key to match the way the InserterThread generates // keys to insert. - int32_t key = bswap_32(rng.Uniform(max)); + uint32_t key = bswap_32(rng.Uniform(max - 1)); + unique_ptr<KuduUpdate> update(table->NewUpdate()); CHECK_OK(update->mutable_row()->SetInt32(0, key)); CHECK_OK(update->mutable_row()->SetInt32(1, i)); CHECK_OK(session->Apply(update.release())); + ++update_ops_cnt_; if (i++ % 50 == 0) { FlushSessionOrDie(session); @@ -924,9 +926,9 @@ void AlterTableTest::UpdaterThread() { void AlterTableTest::ScannerThread() { shared_ptr<KuduTable> table; CHECK_OK(client_->OpenTable(kTableName, &table)); - while (!stop_threads_.Load()) { + while (!stop_threads_) { KuduScanner scanner(table.get()); - int inserted_at_scanner_start = inserted_idx_.Load(); + uint32_t inserted_at_scanner_start = next_idx_; CHECK_OK(scanner.Open()); int count = 0; vector<KuduRowResult> results; @@ -979,10 +981,13 @@ TEST_F(AlterTableTest, TestAlterUnderWriteLoad) { i)); } - stop_threads_.Store(true); + stop_threads_ = true; writer->Join(); updater->Join(); scanner->Join(); + // A sanity check: the updater should have generate at least one update + // given the parameters the test is running with. + CHECK_GE(update_ops_cnt_, 0U); } TEST_F(AlterTableTest, TestInsertAfterAlterTable) { http://git-wip-us.apache.org/repos/asf/kudu/blob/bc14b2f9/src/kudu/integration-tests/client_failover-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/client_failover-itest.cc b/src/kudu/integration-tests/client_failover-itest.cc index 71a0a00..59f2b3e 100644 --- a/src/kudu/integration-tests/client_failover-itest.cc +++ b/src/kudu/integration-tests/client_failover-itest.cc @@ -107,7 +107,7 @@ TEST_P(ClientFailoverParamITest, TestDeleteLeaderWhileScanning) { shared_ptr<KuduTable> table; ASSERT_OK(client_->OpenTable(TestWorkload::kDefaultTableName, &table)); shared_ptr<KuduSession> session = client_->NewSession(); - ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); + ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC)); session->SetTimeoutMillis(kTimeout.ToMilliseconds()); // The row we will update later when testing writes. @@ -117,7 +117,6 @@ TEST_P(ClientFailoverParamITest, TestDeleteLeaderWhileScanning) { ASSERT_OK(insert->mutable_row()->SetInt32(1, 1)); ASSERT_OK(insert->mutable_row()->SetStringNoCopy(2, "a")); ASSERT_OK(session->Apply(insert)); - ASSERT_OK(session->Flush()); ASSERT_EQ(1, CountTableRows(table.get())); // Write data to a tablet. http://git-wip-us.apache.org/repos/asf/kudu/blob/bc14b2f9/src/kudu/integration-tests/flex_partitioning-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/flex_partitioning-itest.cc b/src/kudu/integration-tests/flex_partitioning-itest.cc index 9387505..69c20ab 100644 --- a/src/kudu/integration-tests/flex_partitioning-itest.cc +++ b/src/kudu/integration-tests/flex_partitioning-itest.cc @@ -307,7 +307,7 @@ Status FlexPartitioningITest::InsertRows(const RangePartitionOptions& range_part shared_ptr<KuduSession> session(client_->NewSession()); session->SetTimeoutMillis(10000); - RETURN_NOT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); + RETURN_NOT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND)); int count = 0; for (const auto& bound : bounds) { @@ -317,10 +317,6 @@ Status FlexPartitioningITest::InsertRows(const RangePartitionOptions& range_part inserted_rows_.emplace_back(new KuduPartialRow(*insert->mutable_row())); RETURN_NOT_OK(session->Apply(insert.release())); count++; - - if (i > 0 && i % 1000 == 0) { - RETURN_NOT_OK(session->Flush()); - } } } http://git-wip-us.apache.org/repos/asf/kudu/blob/bc14b2f9/src/kudu/integration-tests/linked_list-test-util.h ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/linked_list-test-util.h b/src/kudu/integration-tests/linked_list-test-util.h index b3d9ea5..c37c4ed 100644 --- a/src/kudu/integration-tests/linked_list-test-util.h +++ b/src/kudu/integration-tests/linked_list-test-util.h @@ -16,13 +16,15 @@ // under the License. #include <algorithm> -#include <glog/logging.h> #include <iostream> #include <list> +#include <memory> #include <string> #include <utility> #include <vector> +#include <glog/logging.h> + #include "kudu/client/client.h" #include "kudu/client/client-test-util.h" #include "kudu/client/row_result.h" @@ -253,18 +255,14 @@ class ScopedRowUpdater { void RowUpdaterThread() { client::sp::shared_ptr<client::KuduSession> session(table_->client()->NewSession()); session->SetTimeoutMillis(15000); - CHECK_OK(session->SetFlushMode(client::KuduSession::MANUAL_FLUSH)); + CHECK_OK(session->SetFlushMode(client::KuduSession::AUTO_FLUSH_BACKGROUND)); int64_t next_key; - int64_t updated_count = 0; while (to_update_.BlockingGet(&next_key)) { - gscoped_ptr<client::KuduUpdate> update(table_->NewUpdate()); + std::unique_ptr<client::KuduUpdate> update(table_->NewUpdate()); CHECK_OK(update->mutable_row()->SetInt64(kKeyColumnName, next_key)); CHECK_OK(update->mutable_row()->SetBool(kUpdatedColumnName, true)); CHECK_OK(session->Apply(update.release())); - if (++updated_count % 50 == 0) { - FlushSessionOrDie(session); - } } FlushSessionOrDie(session); http://git-wip-us.apache.org/repos/asf/kudu/blob/bc14b2f9/src/kudu/integration-tests/tablet_history_gc-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/tablet_history_gc-itest.cc b/src/kudu/integration-tests/tablet_history_gc-itest.cc index e9503c6..ec1e78a 100644 --- a/src/kudu/integration-tests/tablet_history_gc-itest.cc +++ b/src/kudu/integration-tests/tablet_history_gc-itest.cc @@ -392,7 +392,8 @@ TEST_F(RandomizedTabletHistoryGcITest, TestRandomHistoryGCWorkload) { client::sp::shared_ptr<client::KuduSession> session = client_->NewSession(); session->SetTimeoutMillis(20000); - ASSERT_OK_FAST(session->SetFlushMode(client::KuduSession::MANUAL_FLUSH)); + ASSERT_OK_FAST(session->SetFlushMode( + client::KuduSession::AUTO_FLUSH_BACKGROUND)); for (int32_t i = 0; i < num_rows_to_insert; i++) { int32_t row_key = rows_inserted_; @@ -458,7 +459,8 @@ TEST_F(RandomizedTabletHistoryGcITest, TestRandomHistoryGCWorkload) { } else { client::sp::shared_ptr<client::KuduSession> session = client_->NewSession(); session->SetTimeoutMillis(20000); - ASSERT_OK_FAST(session->SetFlushMode(client::KuduSession::MANUAL_FLUSH)); + ASSERT_OK_FAST(session->SetFlushMode( + client::KuduSession::AUTO_FLUSH_BACKGROUND)); for (const MaterializedTestRow& test_row : updates) { unique_ptr<client::KuduUpdate> update(table->NewUpdate()); @@ -517,7 +519,8 @@ TEST_F(RandomizedTabletHistoryGcITest, TestRandomHistoryGCWorkload) { } else { client::sp::shared_ptr<client::KuduSession> session = client_->NewSession(); session->SetTimeoutMillis(20000); - ASSERT_OK_FAST(session->SetFlushMode(client::KuduSession::MANUAL_FLUSH)); + ASSERT_OK_FAST(session->SetFlushMode( + client::KuduSession::AUTO_FLUSH_BACKGROUND)); for (int32_t row_key : deletes) { unique_ptr<client::KuduDelete> del(table->NewDelete()); http://git-wip-us.apache.org/repos/asf/kudu/blob/bc14b2f9/src/kudu/integration-tests/test_workload.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/test_workload.cc b/src/kudu/integration-tests/test_workload.cc index 275804b..3c26ab4 100644 --- a/src/kudu/integration-tests/test_workload.cc +++ b/src/kudu/integration-tests/test_workload.cc @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. - #include "kudu/client/client.h" #include "kudu/client/client-test-util.h" #include "kudu/client/schema-internal.h" @@ -32,15 +31,8 @@ namespace kudu { -using client::FromInternalCompressionType; -using client::FromInternalDataType; -using client::FromInternalEncodingType; -using client::KuduClient; -using client::KuduClientBuilder; -using client::KuduColumnSchema;; using client::KuduInsert; using client::KuduSchema; -using client::KuduSchemaBuilder; using client::KuduSchemaFromSchema; using client::KuduSession; using client::KuduTable; @@ -218,16 +210,15 @@ void TestWorkload::Setup() { if (write_pattern_ == UPDATE_ONE_ROW) { shared_ptr<KuduSession> session = client_->NewSession(); session->SetTimeoutMillis(20000); - CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); + CHECK_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC)); shared_ptr<KuduTable> table; CHECK_OK(client_->OpenTable(table_name_, &table)); - gscoped_ptr<KuduInsert> insert(table->NewInsert()); + std::unique_ptr<KuduInsert> insert(table->NewInsert()); KuduPartialRow* row = insert->mutable_row(); CHECK_OK(row->SetInt32(0, 0)); CHECK_OK(row->SetInt32(1, 0)); CHECK_OK(row->SetStringCopy(2, "hello world")); CHECK_OK(session->Apply(insert.release())); - CHECK_OK(session->Flush()); } } http://git-wip-us.apache.org/repos/asf/kudu/blob/bc14b2f9/src/kudu/tools/ksck_remote-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/ksck_remote-test.cc b/src/kudu/tools/ksck_remote-test.cc index 5a38945..6dc792d 100644 --- a/src/kudu/tools/ksck_remote-test.cc +++ b/src/kudu/tools/ksck_remote-test.cc @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +#include <memory> #include <sstream> #include <gtest/gtest.h> @@ -42,8 +43,8 @@ using client::KuduSession; using client::KuduTable; using client::KuduTableCreator; using client::sp::shared_ptr; -using std::static_pointer_cast; using std::string; +using std::unique_ptr; using std::vector; using strings::Substitute; @@ -173,16 +174,12 @@ class RemoteKsckTest : public KuduTest { RETURN_NOT_OK(client_->OpenTable(kTableName, &table)); shared_ptr<KuduSession> session(client_->NewSession()); session->SetTimeoutMillis(10000); - RETURN_NOT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); + RETURN_NOT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND)); for (uint64_t i = 0; i < num_rows; i++) { VLOG(1) << "Generating write for row id " << i; - gscoped_ptr<KuduInsert> insert(table->NewInsert()); + unique_ptr<KuduInsert> insert(table->NewInsert()); GenerateDataForRow(table->schema(), i, &random_, insert->mutable_row()); RETURN_NOT_OK(session->Apply(insert.release())); - - if (i > 0 && i % 1000 == 0) { - RETURN_NOT_OK(session->Flush()); - } } RETURN_NOT_OK(session->Flush()); return Status::OK();
