This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new eb55189 [client] actionable error message for KuduSession::Flush()
eb55189 is described below
commit eb55189eec3cb30bf10a5f0778ac5bc46164de7b
Author: Alexey Serbin <[email protected]>
AuthorDate: Sun Jul 11 18:53:15 2021 -0700
[client] actionable error message for KuduSession::Flush()
This patch makes the error message for Status::IOError() returned by
KuduSession::Flush() more actionable, citing that details are available
via call to KuduSession::GetPendingErrors(). Yes, that's documented
in-line in client.h and in the client C++ API docs [1], but who reads
those?
Prior to this patch, the error message was:
Some errors occurred
With this patch, the error message now is:
failed to flush data: error details are available
via KuduSession::GetPendingErrors()
In addition, I switched the code in batcher.{h,cc} from Atomic
(util/atomic.h and gutil/atomicops.h) to std::atomic and removed the
Batcher::MarkHadErrors() utility method.
[1]
https://kudu.apache.org/cpp-client-api/classkudu_1_1client_1_1KuduSession.html
Change-Id: I69e1613d703bb689641186c4282fe8b3e821d323
Reviewed-on: http://gerrit.cloudera.org:8080/17673
Tested-by: Kudu Jenkins
Reviewed-by: Bankim Bhavsar <[email protected]>
---
src/kudu/client/batcher.cc | 41 +++++++--------
src/kudu/client/batcher.h | 22 +++-----
src/kudu/client/client-test.cc | 59 ++++++++++++----------
src/kudu/client/session-internal.cc | 4 +-
.../security-unknown-tsk-itest.cc | 2 +-
src/kudu/integration-tests/txn_write_ops-itest.cc | 6 +--
6 files changed, 63 insertions(+), 71 deletions(-)
diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc
index f48a5c8..9fdc872 100644
--- a/src/kudu/client/batcher.cc
+++ b/src/kudu/client/batcher.cc
@@ -45,7 +45,6 @@
#include "kudu/common/txn_id.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/common/wire_protocol.pb.h"
-#include "kudu/gutil/atomic_refcount.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
@@ -91,10 +90,10 @@ using strings::Substitute;
namespace kudu {
+class RowOperationsPB;
class Schema;
namespace client {
-
namespace internal {
// About lock ordering in this file:
@@ -610,12 +609,12 @@ Batcher::Batcher(KuduClient* client,
consistency_mode_(consistency_mode),
txn_id_(txn_id),
error_collector_(std::move(error_collector)),
- had_errors_(false),
flush_callback_(nullptr),
next_op_sequence_number_(0),
timeout_(client->default_rpc_timeout()),
outstanding_lookups_(0),
buffer_bytes_used_(0),
+ had_errors_(false),
arena_(1024) {
ops_.set_empty_key(nullptr);
ops_.set_deleted_key(reinterpret_cast<InFlightOp*>(-1));
@@ -694,12 +693,14 @@ void Batcher::CheckForFinishedFlush() {
// a lock inversion deadlock -- the session lock should always
// come before the batcher lock.
session->data_->FlushFinished(this);
-
}
if (flush_callback_) {
- // User is responsible for fetching errors from the error collector.
- Status s = had_errors_ ? Status::IOError("Some errors occurred")
- : Status::OK();
+ // In case of a failure flushing the data to the server, a user can get
+ // per-row error details by calling KuduSession::GetPendingErrors().
+ auto s = had_errors_
+ ? Status::IOError("failed to flush last batch of rows: error details "
+ "are available via KuduSession::GetPendingErrors()")
+ : Status::OK();
flush_callback_->Run(s);
}
}
@@ -747,7 +748,7 @@ Status Batcher::Add(KuduWriteOperation* write_op) {
// deadline_ is set in FlushAsync(), after all Add() calls are done, so
// here we're forced to create a new deadline.
MonoTime deadline = ComputeDeadlineUnlocked();
- base::RefCountInc(&outstanding_lookups_);
+ ++outstanding_lookups_;
scoped_refptr<Batcher> self(this);
client_->data_->meta_cache_->LookupTabletByKey(
op->write_op->table(),
@@ -756,8 +757,7 @@ Status Batcher::Add(KuduWriteOperation* write_op) {
MetaCache::LookupType::kPoint,
&op->tablet,
[self, op](const Status& s) { self->TabletLookupFinished(op, s); });
-
- buffer_bytes_used_.IncrementBy(write_op->SizeInBuffer());
+ buffer_bytes_used_ += write_op->SizeInBuffer();
return Status::OK();
}
@@ -780,17 +780,13 @@ bool Batcher::IsAbortedUnlocked() const {
return state_ == kAborted;
}
-void Batcher::MarkHadErrors() {
- std::lock_guard<simple_spinlock> l(lock_);
- had_errors_ = true;
-}
-
void Batcher::MarkInFlightOpFailed(InFlightOp* op, const Status& s) {
std::lock_guard<simple_spinlock> l(lock_);
MarkInFlightOpFailedUnlocked(op, s);
}
void Batcher::MarkInFlightOpFailedUnlocked(InFlightOp* op, const Status& s) {
+ DCHECK(lock_.is_locked());
CHECK_EQ(1, ops_.erase(op))
<< "Could not remove op " << op->ToString() << " from in-flight list";
error_collector_->AddError(unique_ptr<KuduError>(new
KuduError(op->write_op.release(), s)));
@@ -799,7 +795,7 @@ void Batcher::MarkInFlightOpFailedUnlocked(InFlightOp* op,
const Status& s) {
}
void Batcher::TabletLookupFinished(InFlightOp* op, const Status& s) {
- base::RefCountDec(&outstanding_lookups_);
+ --outstanding_lookups_;
// Acquire the batcher lock early to atomically:
// 1. Test if the batcher was aborted, and
@@ -880,10 +876,10 @@ void Batcher::FlushBuffersIfReady() {
VLOG(3) << "FlushBuffersIfReady: batcher not yet in flushing state";
return;
}
- if (!base::RefCountIsZero(&outstanding_lookups_)) {
- VLOG(3) << "FlushBuffersIfReady: "
- << base::subtle::NoBarrier_Load(&outstanding_lookups_)
- << " ops still in lookup";
+ const int32_t pending_lookups_num = outstanding_lookups_;
+ if (pending_lookups_num != 0) {
+ VLOG(3) << Substitute("FlushBuffersIfReady: $0 ops still in lookup",
+ pending_lookups_num);
return;
}
// Take ownership of the ops while we're under the lock.
@@ -944,9 +940,8 @@ void Batcher::ProcessWriteResponse(const WriteRpc& rpc,
for (InFlightOp* op : rpc.ops()) {
unique_ptr<KuduError> error(new KuduError(op->write_op.release(), s));
error_collector_->AddError(std::move(error));
+ had_errors_ = true;
}
-
- MarkHadErrors();
}
// Check individual row errors.
@@ -968,7 +963,7 @@ void Batcher::ProcessWriteResponse(const WriteRpc& rpc,
Status op_status = StatusFromPB(err_pb.error());
unique_ptr<KuduError> error(new KuduError(op.release(), op_status));
error_collector_->AddError(std::move(error));
- MarkHadErrors();
+ had_errors_ = true;
}
// Remove all the ops from the "in-flight" list. It's essential to do so
diff --git a/src/kudu/client/batcher.h b/src/kudu/client/batcher.h
index 3874705..aa62d45 100644
--- a/src/kudu/client/batcher.h
+++ b/src/kudu/client/batcher.h
@@ -16,6 +16,7 @@
// under the License.
#pragma once
+#include <atomic>
#include <cstdint>
#include <memory>
#include <mutex>
@@ -28,11 +29,9 @@
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/client/write_op.h"
#include "kudu/common/txn_id.h"
-#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
-#include "kudu/util/atomic.h"
#include "kudu/util/locks.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/monotime.h"
@@ -124,7 +123,7 @@ class Batcher : public RefCountedThreadSafe<Batcher> {
// Return the total size (number of bytes) of all pending write operations
// accumulated by the batcher.
int64_t buffer_bytes_used() const {
- return buffer_bytes_used_.Load();
+ return buffer_bytes_used_;
}
// Return the identifier of a multi-row transaction (if any) that all the
@@ -153,10 +152,6 @@ class Batcher : public RefCountedThreadSafe<Batcher> {
// processing wherever they are.
bool IsAbortedUnlocked() const;
- // Mark the fact that errors have occurred with this batch. This ensures that
- // the flush callback will get a bad Status.
- void MarkHadErrors();
-
// Remove an op from the in-flight op list, and delete the op itself.
// The operation is reported to the ErrorReporter as having failed with the
// given status.
@@ -208,10 +203,6 @@ class Batcher : public RefCountedThreadSafe<Batcher> {
// The time when the very first operation was added into the batcher.
MonoTime first_op_time_;
- // Set to true if there was at least one error from this Batcher.
- // Protected by lock_
- bool had_errors_;
-
// If state is kFlushing, this member will be set to the user-provided
// callback. Once there are no more in-flight operations, the callback
// will be called exactly once (and the state changed to kFlushed).
@@ -239,12 +230,13 @@ class Batcher : public RefCountedThreadSafe<Batcher> {
MonoTime deadline_;
// Number of outstanding lookups across all in-flight ops.
- //
- // Note: _not_ protected by lock_!
- Atomic32 outstanding_lookups_;
+ std::atomic<int32_t> outstanding_lookups_;
// The number of bytes used in the buffer for pending operations.
- AtomicInt<int64_t> buffer_bytes_used_;
+ std::atomic<int64_t> buffer_bytes_used_;
+
+ // Set to true if there was at least one error from this Batcher.
+ std::atomic<bool> had_errors_;
Arena arena_;
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index f6ad605..51bf76b 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -3342,9 +3342,10 @@ TEST_F(ClientTest,
TestBatchWithPartialErrorOfDuplicateKeys) {
ASSERT_OK(ApplyInsertToSession(session.get(), client_table_, 1, 1,
"Attempted dup"));
ASSERT_OK(ApplyInsertToSession(session.get(), client_table_, 2, 1, "Should
succeed"));
Status s = session->Flush();
- ASSERT_FALSE(s.ok());
- ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
-
+ ASSERT_TRUE(s.IsIOError()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "failed to flush data: error details are available "
+ "via KuduSession::GetPendingErrors()");
// Fetch and verify the reported error.
unique_ptr<KuduError> error;
NO_FATALS(error = GetSingleErrorFromSession(session.get()));
@@ -3379,9 +3380,10 @@ TEST_F(ClientTest,
TestBatchWithPartialErrorOfMissingRequiredColumn) {
// Insert a row missing a required column, which will fail.
ASSERT_OK(ApplyInsertToSession(session.get(), client_table_, 2, 2, "Missing
required column"));
Status s = session->Flush();
- ASSERT_FALSE(s.ok());
- ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
-
+ ASSERT_TRUE(s.IsIOError()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "failed to flush data: error details are available "
+ "via KuduSession::GetPendingErrors()");
// Fetch and verify the reported error.
unique_ptr<KuduError> error;
NO_FATALS(error = GetSingleErrorFromSession(session.get()));
@@ -3418,9 +3420,10 @@ TEST_F(ClientTest,
TestBatchWithPartialErrorOfNoFieldsUpdated) {
// Update a row with some non-key fields updated, which will success.
ASSERT_OK(ApplyUpdateToSession(session.get(), client_table_, 2, 22));
Status s = session->Flush();
- ASSERT_FALSE(s.ok());
- ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
-
+ ASSERT_TRUE(s.IsIOError()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "failed to flush data: error details are available "
+ "via KuduSession::GetPendingErrors()");
// Fetch and verify the reported error.
unique_ptr<KuduError> error;
NO_FATALS(error = GetSingleErrorFromSession(session.get()));
@@ -3454,9 +3457,10 @@ TEST_F(ClientTest,
TestBatchWithPartialErrorOfNonKeyColumnSpecifiedDelete) {
// Delete a row with some non-key fields, which will fail.
ASSERT_OK(ApplyDeleteToSession(session.get(), client_table_, 2, 2));
Status s = session->Flush();
- ASSERT_FALSE(s.ok());
- ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
-
+ ASSERT_TRUE(s.IsIOError()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "failed to flush data: error details are available "
+ "via KuduSession::GetPendingErrors()");
// Fetch and verify the reported error.
unique_ptr<KuduError> error;
NO_FATALS(error = GetSingleErrorFromSession(session.get()));
@@ -3488,9 +3492,10 @@ TEST_F(ClientTest,
TestBatchWithPartialErrorOfAllRowsFailed) {
ASSERT_OK(ApplyDeleteToSession(session.get(), client_table_, 1, 1));
ASSERT_OK(ApplyDeleteToSession(session.get(), client_table_, 2, 2));
Status s = session->Flush();
- ASSERT_FALSE(s.ok());
- ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
-
+ ASSERT_TRUE(s.IsIOError()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "failed to flush data: error details are available "
+ "via KuduSession::GetPendingErrors()");
// Fetch and verify the reported error.
vector<KuduError*> errors;
ElementDeleter d(&errors);
@@ -4331,8 +4336,8 @@ TEST_F(ClientTest, TestMutateDeletedRow) {
// Attempt update deleted row
ASSERT_OK(ApplyUpdateToSession(session.get(), client_table_, 1, 2));
Status s = session->Flush();
- ASSERT_FALSE(s.ok());
- ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
+ ASSERT_TRUE(s.IsIOError()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "failed to flush data");
// Verify error
unique_ptr<KuduError> error = GetSingleErrorFromSession(session.get());
ASSERT_EQ(error->failed_op().ToString(),
@@ -4343,8 +4348,8 @@ TEST_F(ClientTest, TestMutateDeletedRow) {
// Attempt delete deleted row
ASSERT_OK(ApplyDeleteToSession(session.get(), client_table_, 1));
s = session->Flush();
- ASSERT_FALSE(s.ok());
- ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
+ ASSERT_TRUE(s.IsIOError()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "failed to flush data");
// Verify error
error = GetSingleErrorFromSession(session.get());
ASSERT_EQ(error->failed_op().ToString(),
@@ -4361,8 +4366,8 @@ TEST_F(ClientTest, TestMutateNonexistentRow) {
// Attempt update nonexistent row
ASSERT_OK(ApplyUpdateToSession(session.get(), client_table_, 1, 2));
Status s = session->Flush();
- ASSERT_FALSE(s.ok());
- ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
+ ASSERT_TRUE(s.IsIOError()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "failed to flush data");
// Verify error
unique_ptr<KuduError> error = GetSingleErrorFromSession(session.get());
ASSERT_EQ(error->failed_op().ToString(),
@@ -4373,8 +4378,8 @@ TEST_F(ClientTest, TestMutateNonexistentRow) {
// Attempt delete nonexistent row
ASSERT_OK(ApplyDeleteToSession(session.get(), client_table_, 1));
s = session->Flush();
- ASSERT_FALSE(s.ok());
- ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
+ ASSERT_TRUE(s.IsIOError()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "failed to flush data");
// Verify error
error = GetSingleErrorFromSession(session.get());
ASSERT_EQ(error->failed_op().ToString(),
@@ -7122,8 +7127,7 @@ TEST_F(ClientTest,
WritingRowsWithUnsetNonNullableColumns) {
// Flush() should return an error.
const auto flush_status = session->Flush();
ASSERT_TRUE(flush_status.IsIOError()) << flush_status.ToString();
- ASSERT_STR_CONTAINS(flush_status.ToString(),
- "IO error: Some errors occurred");
+ ASSERT_STR_CONTAINS(flush_status.ToString(), "failed to flush data");
}
// Make sure if a non-nullable column (without defaults) is not set for an
@@ -7143,8 +7147,7 @@ TEST_F(ClientTest,
WritingRowsWithUnsetNonNullableColumns) {
// Of course, Flush() should fail as well.
const auto flush_status = session->Flush();
ASSERT_TRUE(flush_status.IsIOError()) << flush_status.ToString();
- ASSERT_STR_CONTAINS(flush_status.ToString(),
- "IO error: Some errors occurred");
+ ASSERT_STR_CONTAINS(flush_status.ToString(), "failed to flush data");
}
// Do delete a row, only the key is necessary.
@@ -7443,7 +7446,7 @@ TEST_F(ClientTest, TxnRetryCommitAfterSessionFlushErrors)
{
const auto s = txn->Commit();
const auto errmsg = s.ToString();
ASSERT_TRUE(s.IsIOError()) << errmsg;
- ASSERT_STR_MATCHES(errmsg, "Some errors occurred");
+ ASSERT_STR_MATCHES(errmsg, "failed to flush data");
ASSERT_FALSE(session->HasPendingOperations());
ASSERT_EQ(1, session->CountPendingErrors());
diff --git a/src/kudu/client/session-internal.cc
b/src/kudu/client/session-internal.cc
index 5baa4ed..320ccac 100644
--- a/src/kudu/client/session-internal.cc
+++ b/src/kudu/client/session-internal.cc
@@ -249,7 +249,9 @@ Status KuduSession::Data::Flush() {
}
}
return error_collector_->CountErrors()
- ? Status::IOError("Some errors occurred") : Status::OK();
+ ? Status::IOError("failed to flush data: error details are available "
+ "via KuduSession::GetPendingErrors()")
+ : Status::OK();
}
bool KuduSession::Data::HasPendingOperations() const {
diff --git a/src/kudu/integration-tests/security-unknown-tsk-itest.cc
b/src/kudu/integration-tests/security-unknown-tsk-itest.cc
index c7d37ae..7811773 100644
--- a/src/kudu/integration-tests/security-unknown-tsk-itest.cc
+++ b/src/kudu/integration-tests/security-unknown-tsk-itest.cc
@@ -286,7 +286,7 @@ TEST_F(SecurityUnknownTskTest,
ErrorUnavailableCommonOperations) {
// The error returned is a generic IOError, and the details are provided
// by the KuduSession::GetPendingErrors() method.
ASSERT_TRUE(s_apply.IsIOError()) << s_apply.ToString();
- ASSERT_STR_CONTAINS(s_apply.ToString(), "Some errors occurred");
+ ASSERT_STR_CONTAINS(s_apply.ToString(), "failed to flush data");
std::vector<KuduError*> errors;
ElementDeleter cleanup(&errors);
diff --git a/src/kudu/integration-tests/txn_write_ops-itest.cc
b/src/kudu/integration-tests/txn_write_ops-itest.cc
index d49064c..f460f4e 100644
--- a/src/kudu/integration-tests/txn_write_ops-itest.cc
+++ b/src/kudu/integration-tests/txn_write_ops-itest.cc
@@ -882,7 +882,7 @@ TEST_F(TxnWriteOpsITest, WriteOpForNonExistentTxn) {
const auto s = session->Apply(insert.release());
ASSERT_TRUE(s.IsIOError()) << s.ToString();
- ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
+ ASSERT_STR_CONTAINS(s.ToString(), "failed to flush data");
const auto err_status = GetSingleRowError(session.get());
ASSERT_TRUE(err_status.IsInvalidArgument()) << err_status.ToString();
ASSERT_STR_CONTAINS(err_status.ToString(),
@@ -921,7 +921,7 @@ TEST_F(TxnWriteOpsITest, TxnWriteAfterCommit) {
unique_ptr<KuduInsert> insert(BuildInsert(table_.get(), idx++));
auto s = session->Apply(insert.release());
ASSERT_TRUE(s.IsIOError()) << s.ToString();
- ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
+ ASSERT_STR_CONTAINS(s.ToString(), "failed to flush data");
const auto err_status = GetSingleRowError(session.get());
ASSERT_TRUE(err_status.IsIllegalState()) << err_status.ToString();
ASSERT_STR_CONTAINS(err_status.ToString(),
@@ -958,7 +958,7 @@ TEST_F(TxnWriteOpsITest, TxnWriteAfterCommit) {
unique_ptr<KuduInsert> insert(BuildInsert(table_.get(), idx++));
auto s = session->Apply(insert.release());
ASSERT_TRUE(s.IsIOError()) << s.ToString();
- ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
+ ASSERT_STR_CONTAINS(s.ToString(), "failed to flush data");
const auto err_status = GetSingleRowError(session.get());
ASSERT_TRUE(err_status.IsIllegalState()) << err_status.ToString();
ASSERT_STR_CONTAINS(err_status.ToString(),