This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new eb55189  [client] actionable error message for KuduSession::Flush()
eb55189 is described below

commit eb55189eec3cb30bf10a5f0778ac5bc46164de7b
Author: Alexey Serbin <[email protected]>
AuthorDate: Sun Jul 11 18:53:15 2021 -0700

    [client] actionable error message for KuduSession::Flush()
    
    This patch makes the error message for Status::IOError() returned by
    KuduSession::Flush() more actionable, citing that details are available
    via call to KuduSession::GetPendingErrors().  Yes, that's documented
    in-line in client.h and in the client C++ API docs [1], but who reads
    those?
    
    Prior to this patch, the error message was:
      Some errors occurred
    
    With this patch, the error message now is:
      failed to flush data: error details are available
      via KuduSession::GetPendingErrors()
    
    In addition, I switched the code in batcher.{h,cc} from Atomic
    (util/atomic.h and gutil/atomicops.h) to std::atomic and removed the
    Batcher::MarkHadErrors() utility method.
    
    [1] 
https://kudu.apache.org/cpp-client-api/classkudu_1_1client_1_1KuduSession.html
    
    Change-Id: I69e1613d703bb689641186c4282fe8b3e821d323
    Reviewed-on: http://gerrit.cloudera.org:8080/17673
    Tested-by: Kudu Jenkins
    Reviewed-by: Bankim Bhavsar <[email protected]>
---
 src/kudu/client/batcher.cc                         | 41 +++++++--------
 src/kudu/client/batcher.h                          | 22 +++-----
 src/kudu/client/client-test.cc                     | 59 ++++++++++++----------
 src/kudu/client/session-internal.cc                |  4 +-
 .../security-unknown-tsk-itest.cc                  |  2 +-
 src/kudu/integration-tests/txn_write_ops-itest.cc  |  6 +--
 6 files changed, 63 insertions(+), 71 deletions(-)

diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc
index f48a5c8..9fdc872 100644
--- a/src/kudu/client/batcher.cc
+++ b/src/kudu/client/batcher.cc
@@ -45,7 +45,6 @@
 #include "kudu/common/txn_id.h"
 #include "kudu/common/wire_protocol.h"
 #include "kudu/common/wire_protocol.pb.h"
-#include "kudu/gutil/atomic_refcount.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -91,10 +90,10 @@ using strings::Substitute;
 
 namespace kudu {
 
+class RowOperationsPB;
 class Schema;
 
 namespace client {
-
 namespace internal {
 
 // About lock ordering in this file:
@@ -610,12 +609,12 @@ Batcher::Batcher(KuduClient* client,
     consistency_mode_(consistency_mode),
     txn_id_(txn_id),
     error_collector_(std::move(error_collector)),
-    had_errors_(false),
     flush_callback_(nullptr),
     next_op_sequence_number_(0),
     timeout_(client->default_rpc_timeout()),
     outstanding_lookups_(0),
     buffer_bytes_used_(0),
+    had_errors_(false),
     arena_(1024) {
   ops_.set_empty_key(nullptr);
   ops_.set_deleted_key(reinterpret_cast<InFlightOp*>(-1));
@@ -694,12 +693,14 @@ void Batcher::CheckForFinishedFlush() {
     // a lock inversion deadlock -- the session lock should always
     // come before the batcher lock.
     session->data_->FlushFinished(this);
-
   }
   if (flush_callback_) {
-    // User is responsible for fetching errors from the error collector.
-    Status s = had_errors_ ? Status::IOError("Some errors occurred")
-                           : Status::OK();
+    // In case of a failure flushing the data to the server, a user can get
+    // per-row error details by calling KuduSession::GetPendingErrors().
+    auto s = had_errors_
+        ? Status::IOError("failed to flush last batch of rows: error details "
+                          "are available via KuduSession::GetPendingErrors()")
+        : Status::OK();
     flush_callback_->Run(s);
   }
 }
@@ -747,7 +748,7 @@ Status Batcher::Add(KuduWriteOperation* write_op) {
   // deadline_ is set in FlushAsync(), after all Add() calls are done, so
   // here we're forced to create a new deadline.
   MonoTime deadline = ComputeDeadlineUnlocked();
-  base::RefCountInc(&outstanding_lookups_);
+  ++outstanding_lookups_;
   scoped_refptr<Batcher> self(this);
   client_->data_->meta_cache_->LookupTabletByKey(
       op->write_op->table(),
@@ -756,8 +757,7 @@ Status Batcher::Add(KuduWriteOperation* write_op) {
       MetaCache::LookupType::kPoint,
       &op->tablet,
       [self, op](const Status& s) { self->TabletLookupFinished(op, s); });
-
-  buffer_bytes_used_.IncrementBy(write_op->SizeInBuffer());
+  buffer_bytes_used_ += write_op->SizeInBuffer();
 
   return Status::OK();
 }
@@ -780,17 +780,13 @@ bool Batcher::IsAbortedUnlocked() const {
   return state_ == kAborted;
 }
 
-void Batcher::MarkHadErrors() {
-  std::lock_guard<simple_spinlock> l(lock_);
-  had_errors_ = true;
-}
-
 void Batcher::MarkInFlightOpFailed(InFlightOp* op, const Status& s) {
   std::lock_guard<simple_spinlock> l(lock_);
   MarkInFlightOpFailedUnlocked(op, s);
 }
 
 void Batcher::MarkInFlightOpFailedUnlocked(InFlightOp* op, const Status& s) {
+  DCHECK(lock_.is_locked());
   CHECK_EQ(1, ops_.erase(op))
     << "Could not remove op " << op->ToString() << " from in-flight list";
   error_collector_->AddError(unique_ptr<KuduError>(new 
KuduError(op->write_op.release(), s)));
@@ -799,7 +795,7 @@ void Batcher::MarkInFlightOpFailedUnlocked(InFlightOp* op, 
const Status& s) {
 }
 
 void Batcher::TabletLookupFinished(InFlightOp* op, const Status& s) {
-  base::RefCountDec(&outstanding_lookups_);
+  --outstanding_lookups_;
 
   // Acquire the batcher lock early to atomically:
   // 1. Test if the batcher was aborted, and
@@ -880,10 +876,10 @@ void Batcher::FlushBuffersIfReady() {
       VLOG(3) << "FlushBuffersIfReady: batcher not yet in flushing state";
       return;
     }
-    if (!base::RefCountIsZero(&outstanding_lookups_)) {
-      VLOG(3) << "FlushBuffersIfReady: "
-              << base::subtle::NoBarrier_Load(&outstanding_lookups_)
-              << " ops still in lookup";
+    const int32_t pending_lookups_num = outstanding_lookups_;
+    if (pending_lookups_num != 0) {
+      VLOG(3) << Substitute("FlushBuffersIfReady: $0 ops still in lookup",
+                            pending_lookups_num);
       return;
     }
     // Take ownership of the ops while we're under the lock.
@@ -944,9 +940,8 @@ void Batcher::ProcessWriteResponse(const WriteRpc& rpc,
     for (InFlightOp* op : rpc.ops()) {
       unique_ptr<KuduError> error(new KuduError(op->write_op.release(), s));
       error_collector_->AddError(std::move(error));
+      had_errors_ = true;
     }
-
-    MarkHadErrors();
   }
 
   // Check individual row errors.
@@ -968,7 +963,7 @@ void Batcher::ProcessWriteResponse(const WriteRpc& rpc,
     Status op_status = StatusFromPB(err_pb.error());
     unique_ptr<KuduError> error(new KuduError(op.release(), op_status));
     error_collector_->AddError(std::move(error));
-    MarkHadErrors();
+    had_errors_ = true;
   }
 
   // Remove all the ops from the "in-flight" list. It's essential to do so
diff --git a/src/kudu/client/batcher.h b/src/kudu/client/batcher.h
index 3874705..aa62d45 100644
--- a/src/kudu/client/batcher.h
+++ b/src/kudu/client/batcher.h
@@ -16,6 +16,7 @@
 // under the License.
 #pragma once
 
+#include <atomic>
 #include <cstdint>
 #include <memory>
 #include <mutex>
@@ -28,11 +29,9 @@
 #include "kudu/client/shared_ptr.h" // IWYU pragma: keep
 #include "kudu/client/write_op.h"
 #include "kudu/common/txn_id.h"
-#include "kudu/gutil/atomicops.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
-#include "kudu/util/atomic.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/memory/arena.h"
 #include "kudu/util/monotime.h"
@@ -124,7 +123,7 @@ class Batcher : public RefCountedThreadSafe<Batcher> {
   // Return the total size (number of bytes) of all pending write operations
   // accumulated by the batcher.
   int64_t buffer_bytes_used() const {
-    return buffer_bytes_used_.Load();
+    return buffer_bytes_used_;
   }
 
   // Return the identifier of a multi-row transaction (if any) that all the
@@ -153,10 +152,6 @@ class Batcher : public RefCountedThreadSafe<Batcher> {
   // processing wherever they are.
   bool IsAbortedUnlocked() const;
 
-  // Mark the fact that errors have occurred with this batch. This ensures that
-  // the flush callback will get a bad Status.
-  void MarkHadErrors();
-
   // Remove an op from the in-flight op list, and delete the op itself.
   // The operation is reported to the ErrorReporter as having failed with the
   // given status.
@@ -208,10 +203,6 @@ class Batcher : public RefCountedThreadSafe<Batcher> {
   // The time when the very first operation was added into the batcher.
   MonoTime first_op_time_;
 
-  // Set to true if there was at least one error from this Batcher.
-  // Protected by lock_
-  bool had_errors_;
-
   // If state is kFlushing, this member will be set to the user-provided
   // callback. Once there are no more in-flight operations, the callback
   // will be called exactly once (and the state changed to kFlushed).
@@ -239,12 +230,13 @@ class Batcher : public RefCountedThreadSafe<Batcher> {
   MonoTime deadline_;
 
   // Number of outstanding lookups across all in-flight ops.
-  //
-  // Note: _not_ protected by lock_!
-  Atomic32 outstanding_lookups_;
+  std::atomic<int32_t> outstanding_lookups_;
 
   // The number of bytes used in the buffer for pending operations.
-  AtomicInt<int64_t> buffer_bytes_used_;
+  std::atomic<int64_t> buffer_bytes_used_;
+
+  // Set to true if there was at least one error from this Batcher.
+  std::atomic<bool> had_errors_;
 
   Arena arena_;
 
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index f6ad605..51bf76b 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -3342,9 +3342,10 @@ TEST_F(ClientTest, 
TestBatchWithPartialErrorOfDuplicateKeys) {
   ASSERT_OK(ApplyInsertToSession(session.get(), client_table_, 1, 1, 
"Attempted dup"));
   ASSERT_OK(ApplyInsertToSession(session.get(), client_table_, 2, 1, "Should 
succeed"));
   Status s = session->Flush();
-  ASSERT_FALSE(s.ok());
-  ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
-
+  ASSERT_TRUE(s.IsIOError()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(),
+                      "failed to flush data: error details are available "
+                      "via KuduSession::GetPendingErrors()");
   // Fetch and verify the reported error.
   unique_ptr<KuduError> error;
   NO_FATALS(error = GetSingleErrorFromSession(session.get()));
@@ -3379,9 +3380,10 @@ TEST_F(ClientTest, 
TestBatchWithPartialErrorOfMissingRequiredColumn) {
   // Insert a row missing a required column, which will fail.
   ASSERT_OK(ApplyInsertToSession(session.get(), client_table_, 2, 2, "Missing 
required column"));
   Status s = session->Flush();
-  ASSERT_FALSE(s.ok());
-  ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
-
+  ASSERT_TRUE(s.IsIOError()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(),
+                      "failed to flush data: error details are available "
+                      "via KuduSession::GetPendingErrors()");
   // Fetch and verify the reported error.
   unique_ptr<KuduError> error;
   NO_FATALS(error = GetSingleErrorFromSession(session.get()));
@@ -3418,9 +3420,10 @@ TEST_F(ClientTest, 
TestBatchWithPartialErrorOfNoFieldsUpdated) {
   // Update a row with some non-key fields updated, which will success.
   ASSERT_OK(ApplyUpdateToSession(session.get(), client_table_, 2, 22));
   Status s = session->Flush();
-  ASSERT_FALSE(s.ok());
-  ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
-
+  ASSERT_TRUE(s.IsIOError()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(),
+                      "failed to flush data: error details are available "
+                      "via KuduSession::GetPendingErrors()");
   // Fetch and verify the reported error.
   unique_ptr<KuduError> error;
   NO_FATALS(error = GetSingleErrorFromSession(session.get()));
@@ -3454,9 +3457,10 @@ TEST_F(ClientTest, 
TestBatchWithPartialErrorOfNonKeyColumnSpecifiedDelete) {
   // Delete a row with some non-key fields, which will fail.
   ASSERT_OK(ApplyDeleteToSession(session.get(), client_table_, 2, 2));
   Status s = session->Flush();
-  ASSERT_FALSE(s.ok());
-  ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
-
+  ASSERT_TRUE(s.IsIOError()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(),
+                      "failed to flush data: error details are available "
+                      "via KuduSession::GetPendingErrors()");
   // Fetch and verify the reported error.
   unique_ptr<KuduError> error;
   NO_FATALS(error = GetSingleErrorFromSession(session.get()));
@@ -3488,9 +3492,10 @@ TEST_F(ClientTest, 
TestBatchWithPartialErrorOfAllRowsFailed) {
   ASSERT_OK(ApplyDeleteToSession(session.get(), client_table_, 1, 1));
   ASSERT_OK(ApplyDeleteToSession(session.get(), client_table_, 2, 2));
   Status s = session->Flush();
-  ASSERT_FALSE(s.ok());
-  ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
-
+  ASSERT_TRUE(s.IsIOError()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(),
+                      "failed to flush data: error details are available "
+                      "via KuduSession::GetPendingErrors()");
   // Fetch and verify the reported error.
   vector<KuduError*> errors;
   ElementDeleter d(&errors);
@@ -4331,8 +4336,8 @@ TEST_F(ClientTest, TestMutateDeletedRow) {
   // Attempt update deleted row
   ASSERT_OK(ApplyUpdateToSession(session.get(), client_table_, 1, 2));
   Status s = session->Flush();
-  ASSERT_FALSE(s.ok());
-  ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
+  ASSERT_TRUE(s.IsIOError()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "failed to flush data");
   // Verify error
   unique_ptr<KuduError> error = GetSingleErrorFromSession(session.get());
   ASSERT_EQ(error->failed_op().ToString(),
@@ -4343,8 +4348,8 @@ TEST_F(ClientTest, TestMutateDeletedRow) {
   // Attempt delete deleted row
   ASSERT_OK(ApplyDeleteToSession(session.get(), client_table_, 1));
   s = session->Flush();
-  ASSERT_FALSE(s.ok());
-  ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
+  ASSERT_TRUE(s.IsIOError()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "failed to flush data");
   // Verify error
   error = GetSingleErrorFromSession(session.get());
   ASSERT_EQ(error->failed_op().ToString(),
@@ -4361,8 +4366,8 @@ TEST_F(ClientTest, TestMutateNonexistentRow) {
   // Attempt update nonexistent row
   ASSERT_OK(ApplyUpdateToSession(session.get(), client_table_, 1, 2));
   Status s = session->Flush();
-  ASSERT_FALSE(s.ok());
-  ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
+  ASSERT_TRUE(s.IsIOError()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "failed to flush data");
   // Verify error
   unique_ptr<KuduError> error = GetSingleErrorFromSession(session.get());
   ASSERT_EQ(error->failed_op().ToString(),
@@ -4373,8 +4378,8 @@ TEST_F(ClientTest, TestMutateNonexistentRow) {
   // Attempt delete nonexistent row
   ASSERT_OK(ApplyDeleteToSession(session.get(), client_table_, 1));
   s = session->Flush();
-  ASSERT_FALSE(s.ok());
-  ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
+  ASSERT_TRUE(s.IsIOError()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "failed to flush data");
   // Verify error
   error = GetSingleErrorFromSession(session.get());
   ASSERT_EQ(error->failed_op().ToString(),
@@ -7122,8 +7127,7 @@ TEST_F(ClientTest, 
WritingRowsWithUnsetNonNullableColumns) {
     // Flush() should return an error.
     const auto flush_status = session->Flush();
     ASSERT_TRUE(flush_status.IsIOError()) << flush_status.ToString();
-    ASSERT_STR_CONTAINS(flush_status.ToString(),
-                        "IO error: Some errors occurred");
+    ASSERT_STR_CONTAINS(flush_status.ToString(), "failed to flush data");
   }
 
   // Make sure if a non-nullable column (without defaults) is not set for an
@@ -7143,8 +7147,7 @@ TEST_F(ClientTest, 
WritingRowsWithUnsetNonNullableColumns) {
     // Of course, Flush() should fail as well.
     const auto flush_status = session->Flush();
     ASSERT_TRUE(flush_status.IsIOError()) << flush_status.ToString();
-    ASSERT_STR_CONTAINS(flush_status.ToString(),
-                        "IO error: Some errors occurred");
+    ASSERT_STR_CONTAINS(flush_status.ToString(), "failed to flush data");
   }
 
   // Do delete a row, only the key is necessary.
@@ -7443,7 +7446,7 @@ TEST_F(ClientTest, TxnRetryCommitAfterSessionFlushErrors) 
{
   const auto s = txn->Commit();
   const auto errmsg = s.ToString();
   ASSERT_TRUE(s.IsIOError()) << errmsg;
-  ASSERT_STR_MATCHES(errmsg, "Some errors occurred");
+  ASSERT_STR_MATCHES(errmsg, "failed to flush data");
 
   ASSERT_FALSE(session->HasPendingOperations());
   ASSERT_EQ(1, session->CountPendingErrors());
diff --git a/src/kudu/client/session-internal.cc 
b/src/kudu/client/session-internal.cc
index 5baa4ed..320ccac 100644
--- a/src/kudu/client/session-internal.cc
+++ b/src/kudu/client/session-internal.cc
@@ -249,7 +249,9 @@ Status KuduSession::Data::Flush() {
     }
   }
   return error_collector_->CountErrors()
-      ? Status::IOError("Some errors occurred") : Status::OK();
+      ? Status::IOError("failed to flush data: error details are available "
+                        "via KuduSession::GetPendingErrors()")
+      : Status::OK();
 }
 
 bool KuduSession::Data::HasPendingOperations() const {
diff --git a/src/kudu/integration-tests/security-unknown-tsk-itest.cc 
b/src/kudu/integration-tests/security-unknown-tsk-itest.cc
index c7d37ae..7811773 100644
--- a/src/kudu/integration-tests/security-unknown-tsk-itest.cc
+++ b/src/kudu/integration-tests/security-unknown-tsk-itest.cc
@@ -286,7 +286,7 @@ TEST_F(SecurityUnknownTskTest, 
ErrorUnavailableCommonOperations) {
     // The error returned is a generic IOError, and the details are provided
     // by the KuduSession::GetPendingErrors() method.
     ASSERT_TRUE(s_apply.IsIOError()) << s_apply.ToString();
-    ASSERT_STR_CONTAINS(s_apply.ToString(), "Some errors occurred");
+    ASSERT_STR_CONTAINS(s_apply.ToString(), "failed to flush data");
 
     std::vector<KuduError*> errors;
     ElementDeleter cleanup(&errors);
diff --git a/src/kudu/integration-tests/txn_write_ops-itest.cc 
b/src/kudu/integration-tests/txn_write_ops-itest.cc
index d49064c..f460f4e 100644
--- a/src/kudu/integration-tests/txn_write_ops-itest.cc
+++ b/src/kudu/integration-tests/txn_write_ops-itest.cc
@@ -882,7 +882,7 @@ TEST_F(TxnWriteOpsITest, WriteOpForNonExistentTxn) {
 
     const auto s = session->Apply(insert.release());
     ASSERT_TRUE(s.IsIOError()) << s.ToString();
-    ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
+    ASSERT_STR_CONTAINS(s.ToString(), "failed to flush data");
     const auto err_status = GetSingleRowError(session.get());
     ASSERT_TRUE(err_status.IsInvalidArgument()) << err_status.ToString();
     ASSERT_STR_CONTAINS(err_status.ToString(),
@@ -921,7 +921,7 @@ TEST_F(TxnWriteOpsITest, TxnWriteAfterCommit) {
       unique_ptr<KuduInsert> insert(BuildInsert(table_.get(), idx++));
       auto s = session->Apply(insert.release());
       ASSERT_TRUE(s.IsIOError()) << s.ToString();
-      ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
+      ASSERT_STR_CONTAINS(s.ToString(), "failed to flush data");
       const auto err_status = GetSingleRowError(session.get());
       ASSERT_TRUE(err_status.IsIllegalState()) << err_status.ToString();
       ASSERT_STR_CONTAINS(err_status.ToString(),
@@ -958,7 +958,7 @@ TEST_F(TxnWriteOpsITest, TxnWriteAfterCommit) {
       unique_ptr<KuduInsert> insert(BuildInsert(table_.get(), idx++));
       auto s = session->Apply(insert.release());
       ASSERT_TRUE(s.IsIOError()) << s.ToString();
-      ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
+      ASSERT_STR_CONTAINS(s.ToString(), "failed to flush data");
       const auto err_status = GetSingleRowError(session.get());
       ASSERT_TRUE(err_status.IsIllegalState()) << err_status.ToString();
       ASSERT_STR_CONTAINS(err_status.ToString(),

Reply via email to