Repository: kudu
Updated Branches:
  refs/heads/master fec9b8879 -> 88b02349d


[c++client] fixed KUDU-1743

KUDU-1743: GetPendingErrors() not returning all errors after Flush()

A race has been discovered during Impala + Kudu testing: if working
with tables split among multiple tablet servers, a rare race condition
manifests itself as not having all the errorrs in the error collector
upon return from the KuduSession::Flush() method.

This patch fixes the race and adds corresponding test for that.

Change-Id: I3054d7f7c00dd937f4307fb01a5a0054b8ae27f7
Reviewed-on: http://gerrit.cloudera.org:8080/5048
Reviewed-by: Adar Dembo <[email protected]>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/88b02349
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/88b02349
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/88b02349

Branch: refs/heads/master
Commit: 88b02349d88d335caac18bf8b930eac6d327ed40
Parents: fec9b88
Author: Alexey Serbin <[email protected]>
Authored: Wed Nov 9 17:49:07 2016 -0800
Committer: Alexey Serbin <[email protected]>
Committed: Fri Nov 11 06:51:36 2016 +0000

----------------------------------------------------------------------
 src/kudu/client/batcher.cc         | 40 +++++++++++-----
 src/kudu/client/batcher.h          |  2 +-
 src/kudu/client/client-test.cc     | 84 +++++++++++++++++++++++++++++++--
 src/kudu/client/client.h           |  1 +
 src/kudu/client/error_collector.h  |  6 ++-
 src/kudu/client/session-internal.h |  1 +
 6 files changed, 115 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/88b02349/src/kudu/client/batcher.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc
index 9bb36dc..9b65984 100644
--- a/src/kudu/client/batcher.cc
+++ b/src/kudu/client/batcher.cc
@@ -721,19 +721,9 @@ void Batcher::ProcessWriteResponse(const WriteRpc& rpc,
     MarkHadErrors();
   }
 
-  // Remove all the ops from the "in-flight" list.
-  {
-    std::lock_guard<simple_spinlock> l(lock_);
-    for (InFlightOp* op : rpc.ops()) {
-      CHECK_EQ(1, ops_.erase(op))
-            << "Could not remove op " << op->ToString()
-            << " from in-flight list";
-    }
-  }
-
   // Check individual row errors.
   for (const WriteResponsePB_PerRowErrorPB& err_pb : 
rpc.resp().per_row_errors()) {
-    // TODO: handle case where we get one of the more specific TS errors
+    // TODO(todd): handle case where we get one of the more specific TS errors
     // like the tablet not being hosted?
 
     if (err_pb.row_index() >= rpc.ops().size()) {
@@ -752,6 +742,34 @@ void Batcher::ProcessWriteResponse(const WriteRpc& rpc,
     MarkHadErrors();
   }
 
+  // Remove all the ops from the "in-flight" list. It's essential to do so
+  // _after_ adding all errors into the collector, otherwise there might be
+  // a race which manifests itself as described at KUDU-1743. Essentially,
+  // the race was the following:
+  //
+  //   * There are two concurrent calls to this method, one from each of RPC
+  //     sent to corresponding tservers.
+  //
+  //   * T1 removes its Write's ops from ops_, adds its errors, and calls
+  //     CheckForFinishedFlush().
+  //
+  //   * T2 does the same.
+  //
+  //   * T1 is descheduled/delayed after removing from ops_ but before adding
+  //     its errors.
+  //
+  //   * T2 runs completely through ProcessWriteResponse(),
+  //     calls CheckForFinishedFlush(), and wakes up the client thread
+  //     from which the Flush() is being called.
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    for (InFlightOp* op : rpc.ops()) {
+      CHECK_EQ(1, ops_.erase(op))
+            << "Could not remove op " << op->ToString()
+            << " from in-flight list";
+    }
+  }
+
   CheckForFinishedFlush();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/88b02349/src/kudu/client/batcher.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/batcher.h b/src/kudu/client/batcher.h
index 3a52016..f98cf0d 100644
--- a/src/kudu/client/batcher.h
+++ b/src/kudu/client/batcher.h
@@ -186,7 +186,7 @@ class Batcher : public RefCountedThreadSafe<Batcher> {
   kudu::client::KuduSession::ExternalConsistencyMode consistency_mode_;
 
   // Errors are reported into this error collector.
-  const scoped_refptr<ErrorCollector> error_collector_;
+  scoped_refptr<ErrorCollector> error_collector_;
 
   // The time when the very first operation was added into the batcher.
   MonoTime first_op_time_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/88b02349/src/kudu/client/client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index d7a8a5e..0c419ba 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -16,6 +16,7 @@
 // under the License.
 
 #include <algorithm>
+#include <atomic>
 #include <functional>
 #include <map>
 #include <memory>
@@ -33,6 +34,7 @@
 #include "kudu/client/client.h"
 #include "kudu/client/client-internal.h"
 #include "kudu/client/client-test-util.h"
+#include "kudu/client/error_collector.h"
 #include "kudu/client/meta_cache.h"
 #include "kudu/client/row_result.h"
 #include "kudu/client/scanner-internal.h"
@@ -267,15 +269,24 @@ class ClientTest : public KuduTest {
     return ret;
   }
 
+  // Inserts given number of tests rows into the specified table
+  // in the context of the session.
+  void InsertTestRows(KuduTable* table, KuduSession* session,
+                      int num_rows, int first_row = 0) {
+    for (int i = first_row; i < num_rows + first_row; ++i) {
+      gscoped_ptr<KuduInsert> insert(BuildTestRow(table, i));
+      ASSERT_OK(session->Apply(insert.release()));
+    }
+  }
+
   // Inserts 'num_rows' test rows using 'client'
-  void InsertTestRows(KuduClient* client, KuduTable* table, int num_rows, int 
first_row = 0) {
+  void InsertTestRows(KuduClient* client, KuduTable* table,
+                      int num_rows, int first_row = 0) {
     shared_ptr<KuduSession> session = client->NewSession();
     ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
     session->SetTimeoutMillis(60000);
-    for (int i = first_row; i < num_rows + first_row; i++) {
-      gscoped_ptr<KuduInsert> insert(BuildTestRow(table, i));
-      ASSERT_OK(session->Apply(insert.release()));
-    }
+    ASSERT_NO_FATAL_FAILURE(InsertTestRows(table, session.get(),
+                                           num_rows, first_row));
     FlushSessionOrDie(session);
     ASSERT_NO_FATAL_FAILURE(CheckNoRpcOverflow());
   }
@@ -2464,6 +2475,69 @@ TEST_F(ClientTest, 
TestAutoFlushBackgroundAndExplicitFlush) {
   EXPECT_EQ(kIterNum, CountRowsFromClient(client_table_.get()));
 }
 
+// A test to verify that in case of AUTO_FLUSH_BACKGROUND information on
+// _all_ the errors is delivered after Flush() finishes. Basically, it's a test
+// to cover KUDU-1743 -- there was a race where Flush() returned before
+// the corresponding errors were added to the error collector.
+TEST_F(ClientTest, TestAutoFlushBackgroundAndErrorCollector) {
+  using kudu::client::internal::ErrorCollector;
+  // The main idea behind this custom error collector is to delay
+  // adding the very first error: this is to expose the race which is
+  // the root cause of the KUDU-1743 issue.
+  class CustomErrorCollector : public ErrorCollector {
+   public:
+    CustomErrorCollector():
+      ErrorCollector(),
+      error_cnt_(0) {
+    }
+
+    void AddError(gscoped_ptr<KuduError> error) override {
+      //LOG(INFO) << "Hello from: " << Thread::UniqueThreadId();
+      if (0 == error_cnt_++) {
+        const bool prev_allowed = ThreadRestrictions::SetWaitAllowed(true);
+        SleepFor(MonoDelta::FromSeconds(1));
+        ThreadRestrictions::SetWaitAllowed(prev_allowed);
+      }
+      ErrorCollector::AddError(std::move(error));
+    }
+
+   private:
+    std::atomic<uint64_t> error_cnt_;
+  };
+
+  const size_t kIterNum = AllowSlowTests() ? 32 : 2;
+  for (size_t i = 0; i < kIterNum; ++i) {
+    static const size_t kRowNum = 2;
+
+    vector<unique_ptr<KuduPartialRow>> splits;
+    unique_ptr<KuduPartialRow> split(schema_.NewRow());
+    ASSERT_OK(split->SetInt32("key", kRowNum / 2));
+    splits.push_back(std::move(split));
+
+    // Create the test table: it's important the table is split into multiple
+    // (at least two) tablets and replicated: that helps to get
+    // RPC completion callbacks from different reactor threads, which is
+    // the crux of the race condition for KUDU-1743.
+    const string table_name = Substitute("table.$0", i);
+    shared_ptr<KuduTable> table;
+    NO_FATALS(CreateTable(table_name, 3, std::move(splits), {}, &table));
+
+    shared_ptr<KuduSession> session(client_->NewSession());
+    scoped_refptr<ErrorCollector> ec(new CustomErrorCollector);
+    ec.swap(session->data_->error_collector_);
+    ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
+    NO_FATALS(InsertTestRows(table.get(), session.get(), kRowNum));
+    NO_FATALS(InsertTestRows(table.get(), session.get(), kRowNum));
+    vector<KuduError*> errors;
+    ElementDeleter deleter(&errors);
+    ASSERT_FALSE(session->Flush().ok());
+    bool overflowed;
+    session->GetPendingErrors(&errors, &overflowed);
+    ASSERT_FALSE(overflowed);
+    ASSERT_EQ(kRowNum, errors.size());
+  }
+}
+
 // A test which verifies that a session in AUTO_FLUSH_BACKGROUND mode can
 // be safely abandoned: its pending data should not be flushed.
 // This test also checks that the reference to a session stored by the

http://git-wip-us.apache.org/repos/asf/kudu/blob/88b02349/src/kudu/client/client.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index a46322e..728949f 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -1530,6 +1530,7 @@ class KUDU_EXPORT KuduSession : public 
sp::enable_shared_from_this<KuduSession>
   friend class internal::Batcher;
   friend class ClientTest;
   FRIEND_TEST(ClientTest, TestAutoFlushBackgroundApplyBlocks);
+  FRIEND_TEST(ClientTest, TestAutoFlushBackgroundAndErrorCollector);
 
   explicit KuduSession(const sp::shared_ptr<KuduClient>& client);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/88b02349/src/kudu/client/error_collector.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/error_collector.h 
b/src/kudu/client/error_collector.h
index 42cd0c7..8d4d393 100644
--- a/src/kudu/client/error_collector.h
+++ b/src/kudu/client/error_collector.h
@@ -37,7 +37,7 @@ class ErrorCollector : public 
RefCountedThreadSafe<ErrorCollector> {
  public:
   ErrorCollector() = default;
 
-  void AddError(gscoped_ptr<KuduError> error);
+  virtual void AddError(gscoped_ptr<KuduError> error);
 
   // See KuduSession for details.
   size_t CountErrors() const;
@@ -45,9 +45,11 @@ class ErrorCollector : public 
RefCountedThreadSafe<ErrorCollector> {
   // See KuduSession for details.
   void GetErrors(std::vector<KuduError*>* errors, bool* overflowed);
 
+ protected:
+  virtual ~ErrorCollector();
+
  private:
   friend class RefCountedThreadSafe<ErrorCollector>;
-  virtual ~ErrorCollector();
 
   mutable simple_spinlock lock_;
   std::vector<KuduError*> errors_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/88b02349/src/kudu/client/session-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/session-internal.h 
b/src/kudu/client/session-internal.h
index 66a70a6..787eb95 100644
--- a/src/kudu/client/session-internal.h
+++ b/src/kudu/client/session-internal.h
@@ -231,6 +231,7 @@ class KuduSession::Data {
 
  private:
   FRIEND_TEST(ClientTest, TestAutoFlushBackgroundApplyBlocks);
+  FRIEND_TEST(ClientTest, TestAutoFlushBackgroundAndErrorCollector);
 
   bool buffer_pre_flush_enabled_; // Set to 'false' only in test scenarios.
 

Reply via email to