Repository: kudu Updated Branches: refs/heads/master fec9b8879 -> 88b02349d
[c++client] fixed KUDU-1743 KUDU-1743: GetPendingErrors() not returning all errors after Flush() A race has been discovered during Impala + Kudu testing: if working with tables split among multiple tablet servers, a rare race condition manifests itself as not having all the errorrs in the error collector upon return from the KuduSession::Flush() method. This patch fixes the race and adds corresponding test for that. Change-Id: I3054d7f7c00dd937f4307fb01a5a0054b8ae27f7 Reviewed-on: http://gerrit.cloudera.org:8080/5048 Reviewed-by: Adar Dembo <[email protected]> Tested-by: Kudu Jenkins Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/88b02349 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/88b02349 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/88b02349 Branch: refs/heads/master Commit: 88b02349d88d335caac18bf8b930eac6d327ed40 Parents: fec9b88 Author: Alexey Serbin <[email protected]> Authored: Wed Nov 9 17:49:07 2016 -0800 Committer: Alexey Serbin <[email protected]> Committed: Fri Nov 11 06:51:36 2016 +0000 ---------------------------------------------------------------------- src/kudu/client/batcher.cc | 40 +++++++++++----- src/kudu/client/batcher.h | 2 +- src/kudu/client/client-test.cc | 84 +++++++++++++++++++++++++++++++-- src/kudu/client/client.h | 1 + src/kudu/client/error_collector.h | 6 ++- src/kudu/client/session-internal.h | 1 + 6 files changed, 115 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/88b02349/src/kudu/client/batcher.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc index 9bb36dc..9b65984 100644 --- a/src/kudu/client/batcher.cc +++ b/src/kudu/client/batcher.cc @@ -721,19 +721,9 @@ void Batcher::ProcessWriteResponse(const WriteRpc& rpc, MarkHadErrors(); } - // Remove all the ops from the "in-flight" list. - { - std::lock_guard<simple_spinlock> l(lock_); - for (InFlightOp* op : rpc.ops()) { - CHECK_EQ(1, ops_.erase(op)) - << "Could not remove op " << op->ToString() - << " from in-flight list"; - } - } - // Check individual row errors. for (const WriteResponsePB_PerRowErrorPB& err_pb : rpc.resp().per_row_errors()) { - // TODO: handle case where we get one of the more specific TS errors + // TODO(todd): handle case where we get one of the more specific TS errors // like the tablet not being hosted? if (err_pb.row_index() >= rpc.ops().size()) { @@ -752,6 +742,34 @@ void Batcher::ProcessWriteResponse(const WriteRpc& rpc, MarkHadErrors(); } + // Remove all the ops from the "in-flight" list. It's essential to do so + // _after_ adding all errors into the collector, otherwise there might be + // a race which manifests itself as described at KUDU-1743. Essentially, + // the race was the following: + // + // * There are two concurrent calls to this method, one from each of RPC + // sent to corresponding tservers. + // + // * T1 removes its Write's ops from ops_, adds its errors, and calls + // CheckForFinishedFlush(). + // + // * T2 does the same. + // + // * T1 is descheduled/delayed after removing from ops_ but before adding + // its errors. + // + // * T2 runs completely through ProcessWriteResponse(), + // calls CheckForFinishedFlush(), and wakes up the client thread + // from which the Flush() is being called. + { + std::lock_guard<simple_spinlock> l(lock_); + for (InFlightOp* op : rpc.ops()) { + CHECK_EQ(1, ops_.erase(op)) + << "Could not remove op " << op->ToString() + << " from in-flight list"; + } + } + CheckForFinishedFlush(); } http://git-wip-us.apache.org/repos/asf/kudu/blob/88b02349/src/kudu/client/batcher.h ---------------------------------------------------------------------- diff --git a/src/kudu/client/batcher.h b/src/kudu/client/batcher.h index 3a52016..f98cf0d 100644 --- a/src/kudu/client/batcher.h +++ b/src/kudu/client/batcher.h @@ -186,7 +186,7 @@ class Batcher : public RefCountedThreadSafe<Batcher> { kudu::client::KuduSession::ExternalConsistencyMode consistency_mode_; // Errors are reported into this error collector. - const scoped_refptr<ErrorCollector> error_collector_; + scoped_refptr<ErrorCollector> error_collector_; // The time when the very first operation was added into the batcher. MonoTime first_op_time_; http://git-wip-us.apache.org/repos/asf/kudu/blob/88b02349/src/kudu/client/client-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc index d7a8a5e..0c419ba 100644 --- a/src/kudu/client/client-test.cc +++ b/src/kudu/client/client-test.cc @@ -16,6 +16,7 @@ // under the License. #include <algorithm> +#include <atomic> #include <functional> #include <map> #include <memory> @@ -33,6 +34,7 @@ #include "kudu/client/client.h" #include "kudu/client/client-internal.h" #include "kudu/client/client-test-util.h" +#include "kudu/client/error_collector.h" #include "kudu/client/meta_cache.h" #include "kudu/client/row_result.h" #include "kudu/client/scanner-internal.h" @@ -267,15 +269,24 @@ class ClientTest : public KuduTest { return ret; } + // Inserts given number of tests rows into the specified table + // in the context of the session. + void InsertTestRows(KuduTable* table, KuduSession* session, + int num_rows, int first_row = 0) { + for (int i = first_row; i < num_rows + first_row; ++i) { + gscoped_ptr<KuduInsert> insert(BuildTestRow(table, i)); + ASSERT_OK(session->Apply(insert.release())); + } + } + // Inserts 'num_rows' test rows using 'client' - void InsertTestRows(KuduClient* client, KuduTable* table, int num_rows, int first_row = 0) { + void InsertTestRows(KuduClient* client, KuduTable* table, + int num_rows, int first_row = 0) { shared_ptr<KuduSession> session = client->NewSession(); ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND)); session->SetTimeoutMillis(60000); - for (int i = first_row; i < num_rows + first_row; i++) { - gscoped_ptr<KuduInsert> insert(BuildTestRow(table, i)); - ASSERT_OK(session->Apply(insert.release())); - } + ASSERT_NO_FATAL_FAILURE(InsertTestRows(table, session.get(), + num_rows, first_row)); FlushSessionOrDie(session); ASSERT_NO_FATAL_FAILURE(CheckNoRpcOverflow()); } @@ -2464,6 +2475,69 @@ TEST_F(ClientTest, TestAutoFlushBackgroundAndExplicitFlush) { EXPECT_EQ(kIterNum, CountRowsFromClient(client_table_.get())); } +// A test to verify that in case of AUTO_FLUSH_BACKGROUND information on +// _all_ the errors is delivered after Flush() finishes. Basically, it's a test +// to cover KUDU-1743 -- there was a race where Flush() returned before +// the corresponding errors were added to the error collector. +TEST_F(ClientTest, TestAutoFlushBackgroundAndErrorCollector) { + using kudu::client::internal::ErrorCollector; + // The main idea behind this custom error collector is to delay + // adding the very first error: this is to expose the race which is + // the root cause of the KUDU-1743 issue. + class CustomErrorCollector : public ErrorCollector { + public: + CustomErrorCollector(): + ErrorCollector(), + error_cnt_(0) { + } + + void AddError(gscoped_ptr<KuduError> error) override { + //LOG(INFO) << "Hello from: " << Thread::UniqueThreadId(); + if (0 == error_cnt_++) { + const bool prev_allowed = ThreadRestrictions::SetWaitAllowed(true); + SleepFor(MonoDelta::FromSeconds(1)); + ThreadRestrictions::SetWaitAllowed(prev_allowed); + } + ErrorCollector::AddError(std::move(error)); + } + + private: + std::atomic<uint64_t> error_cnt_; + }; + + const size_t kIterNum = AllowSlowTests() ? 32 : 2; + for (size_t i = 0; i < kIterNum; ++i) { + static const size_t kRowNum = 2; + + vector<unique_ptr<KuduPartialRow>> splits; + unique_ptr<KuduPartialRow> split(schema_.NewRow()); + ASSERT_OK(split->SetInt32("key", kRowNum / 2)); + splits.push_back(std::move(split)); + + // Create the test table: it's important the table is split into multiple + // (at least two) tablets and replicated: that helps to get + // RPC completion callbacks from different reactor threads, which is + // the crux of the race condition for KUDU-1743. + const string table_name = Substitute("table.$0", i); + shared_ptr<KuduTable> table; + NO_FATALS(CreateTable(table_name, 3, std::move(splits), {}, &table)); + + shared_ptr<KuduSession> session(client_->NewSession()); + scoped_refptr<ErrorCollector> ec(new CustomErrorCollector); + ec.swap(session->data_->error_collector_); + ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND)); + NO_FATALS(InsertTestRows(table.get(), session.get(), kRowNum)); + NO_FATALS(InsertTestRows(table.get(), session.get(), kRowNum)); + vector<KuduError*> errors; + ElementDeleter deleter(&errors); + ASSERT_FALSE(session->Flush().ok()); + bool overflowed; + session->GetPendingErrors(&errors, &overflowed); + ASSERT_FALSE(overflowed); + ASSERT_EQ(kRowNum, errors.size()); + } +} + // A test which verifies that a session in AUTO_FLUSH_BACKGROUND mode can // be safely abandoned: its pending data should not be flushed. // This test also checks that the reference to a session stored by the http://git-wip-us.apache.org/repos/asf/kudu/blob/88b02349/src/kudu/client/client.h ---------------------------------------------------------------------- diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h index a46322e..728949f 100644 --- a/src/kudu/client/client.h +++ b/src/kudu/client/client.h @@ -1530,6 +1530,7 @@ class KUDU_EXPORT KuduSession : public sp::enable_shared_from_this<KuduSession> friend class internal::Batcher; friend class ClientTest; FRIEND_TEST(ClientTest, TestAutoFlushBackgroundApplyBlocks); + FRIEND_TEST(ClientTest, TestAutoFlushBackgroundAndErrorCollector); explicit KuduSession(const sp::shared_ptr<KuduClient>& client); http://git-wip-us.apache.org/repos/asf/kudu/blob/88b02349/src/kudu/client/error_collector.h ---------------------------------------------------------------------- diff --git a/src/kudu/client/error_collector.h b/src/kudu/client/error_collector.h index 42cd0c7..8d4d393 100644 --- a/src/kudu/client/error_collector.h +++ b/src/kudu/client/error_collector.h @@ -37,7 +37,7 @@ class ErrorCollector : public RefCountedThreadSafe<ErrorCollector> { public: ErrorCollector() = default; - void AddError(gscoped_ptr<KuduError> error); + virtual void AddError(gscoped_ptr<KuduError> error); // See KuduSession for details. size_t CountErrors() const; @@ -45,9 +45,11 @@ class ErrorCollector : public RefCountedThreadSafe<ErrorCollector> { // See KuduSession for details. void GetErrors(std::vector<KuduError*>* errors, bool* overflowed); + protected: + virtual ~ErrorCollector(); + private: friend class RefCountedThreadSafe<ErrorCollector>; - virtual ~ErrorCollector(); mutable simple_spinlock lock_; std::vector<KuduError*> errors_; http://git-wip-us.apache.org/repos/asf/kudu/blob/88b02349/src/kudu/client/session-internal.h ---------------------------------------------------------------------- diff --git a/src/kudu/client/session-internal.h b/src/kudu/client/session-internal.h index 66a70a6..787eb95 100644 --- a/src/kudu/client/session-internal.h +++ b/src/kudu/client/session-internal.h @@ -231,6 +231,7 @@ class KuduSession::Data { private: FRIEND_TEST(ClientTest, TestAutoFlushBackgroundApplyBlocks); + FRIEND_TEST(ClientTest, TestAutoFlushBackgroundAndErrorCollector); bool buffer_pre_flush_enabled_; // Set to 'false' only in test scenarios.
