This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new ec3a9f75b KUDU-3353 [schema] Add an immutable attribute on column
schema (part 2)
ec3a9f75b is described below
commit ec3a9f75b6924a70ecbf08e3805228ad9b92b9f0
Author: Yingchun Lai <[email protected]>
AuthorDate: Thu Aug 11 15:23:08 2022 +0800
KUDU-3353 [schema] Add an immutable attribute on column schema (part 2)
This is a follow-up to b6eedb224f715ad86378a92d25f09c2084b0e2b7.
This patch includes the C++ client-side changes of the "new column
attribute IMMUTABLE" feature, including:
1. Adds a new KuduColumnSpec::Immutable() method to add IMMUTABLE
attribute to a column, and adds a new KuduColumnSpec::Mutable()
method to remove IMMUTABLE attribute from a column.
2. Adds a new KuduColumnSchema::is_immutable() to check if the
attribute is set for a column schema.
3. Adds a new KuduUpsertIgnore operation in the client API: use the
newly added KuduTable::NewUpsertIgnore() to create a new instance
of such operation.
4. Adds unit tests to cover the newly introduced functionality.
5. Small refactoring on the server side.
Change-Id: Id301e313eedd9cc3a494d6b4f3c14fbabe63b436
Reviewed-on: http://gerrit.cloudera.org:8080/18835
Tested-by: Yingchun Lai <[email protected]>
Reviewed-by: Alexey Serbin <[email protected]>
---
src/kudu/client/client-internal.cc | 13 +-
src/kudu/client/client-internal.h | 6 +-
src/kudu/client/client-test.cc | 560 ++++++++++++++++++++++---
src/kudu/client/client-unittest.cc | 2 +-
src/kudu/client/client.cc | 28 +-
src/kudu/client/client.h | 6 +
src/kudu/client/schema-internal.h | 1 +
src/kudu/client/schema.cc | 22 +-
src/kudu/client/schema.h | 14 +
src/kudu/client/table_alterer-internal.cc | 6 +-
src/kudu/client/write_op.cc | 13 +-
src/kudu/client/write_op.h | 33 +-
src/kudu/common/row_operations.cc | 12 +-
src/kudu/integration-tests/alter_table-test.cc | 42 ++
src/kudu/master/master.proto | 2 +
src/kudu/master/master_service.cc | 7 +
src/kudu/tablet/ops/write_op.cc | 12 +-
src/kudu/tablet/row_op.cc | 2 +
src/kudu/tablet/row_op.h | 3 +
src/kudu/tablet/tablet.cc | 42 +-
src/kudu/tablet/tablet_metrics.cc | 11 +-
21 files changed, 725 insertions(+), 112 deletions(-)
diff --git a/src/kudu/client/client-internal.cc
b/src/kudu/client/client-internal.cc
index 321576ad2..63b5be4d3 100644
--- a/src/kudu/client/client-internal.cc
+++ b/src/kudu/client/client-internal.cc
@@ -313,13 +313,13 @@ Status KuduClient::Data::GetTabletServer(KuduClient*
client,
return Status::OK();
}
-// TODO(yingchun): Add has_immutable_column_schema
Status KuduClient::Data::CreateTable(KuduClient* client,
const CreateTableRequestPB& req,
CreateTableResponsePB* resp,
const MonoTime& deadline,
bool has_range_partition_bounds,
- bool has_range_specific_hash_schema) {
+ bool has_range_specific_hash_schema,
+ bool has_immutable_column_schema) {
vector<uint32_t> features;
if (has_range_partition_bounds) {
features.push_back(MasterFeatures::RANGE_PARTITION_BOUNDS);
@@ -327,6 +327,9 @@ Status KuduClient::Data::CreateTable(KuduClient* client,
if (has_range_specific_hash_schema) {
features.push_back(MasterFeatures::RANGE_SPECIFIC_HASH_SCHEMA);
}
+ if (has_immutable_column_schema) {
+ features.push_back(MasterFeatures::IMMUTABLE_COLUMN_ATTRIBUTE);
+ }
Synchronizer sync;
AsyncLeaderMasterRpc<CreateTableRequestPB, CreateTableResponsePB> rpc(
deadline, client, BackoffType::EXPONENTIAL, req, resp,
@@ -416,7 +419,8 @@ Status KuduClient::Data::AlterTable(KuduClient* client,
AlterTableResponsePB* resp,
const MonoTime& deadline,
bool has_add_drop_partition,
- bool adding_range_with_custom_hash_schema)
{
+ bool adding_range_with_custom_hash_schema,
+ bool has_immutable_column_schema) {
vector<uint32_t> required_feature_flags;
if (has_add_drop_partition) {
required_feature_flags.push_back(MasterFeatures::ADD_DROP_RANGE_PARTITIONS);
@@ -424,6 +428,9 @@ Status KuduClient::Data::AlterTable(KuduClient* client,
if (adding_range_with_custom_hash_schema) {
required_feature_flags.push_back(MasterFeatures::RANGE_SPECIFIC_HASH_SCHEMA);
}
+ if (has_immutable_column_schema) {
+
required_feature_flags.push_back(MasterFeatures::IMMUTABLE_COLUMN_ATTRIBUTE);
+ }
Synchronizer sync;
AsyncLeaderMasterRpc<AlterTableRequestPB, AlterTableResponsePB> rpc(
deadline, client, BackoffType::EXPONENTIAL, req, resp,
diff --git a/src/kudu/client/client-internal.h
b/src/kudu/client/client-internal.h
index 3e820f91f..7d70b5056 100644
--- a/src/kudu/client/client-internal.h
+++ b/src/kudu/client/client-internal.h
@@ -108,7 +108,8 @@ class KuduClient::Data {
master::CreateTableResponsePB* resp,
const MonoTime& deadline,
bool has_range_partition_bounds,
- bool has_range_specific_hash_schema);
+ bool has_range_specific_hash_schema,
+ bool has_immutable_column_schema);
static Status IsCreateTableInProgress(KuduClient* client,
master::TableIdentifierPB table,
@@ -135,7 +136,8 @@ class KuduClient::Data {
master::AlterTableResponsePB* resp,
const MonoTime& deadline,
bool has_add_drop_partition,
- bool adding_range_with_custom_hash_schema);
+ bool adding_range_with_custom_hash_schema,
+ bool has_immutable_column_schema);
static Status IsAlterTableInProgress(KuduClient* client,
master::TableIdentifierPB table,
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 7b59cbe57..d97d87ebe 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -34,6 +34,7 @@
#include <set>
#include <string>
#include <thread>
+#include <tuple>
#include <type_traits>
#include <unordered_set>
#include <utility>
@@ -146,6 +147,7 @@ DECLARE_bool(location_mapping_by_uuid);
DECLARE_bool(log_inject_latency);
DECLARE_bool(master_client_location_assignment_enabled);
DECLARE_bool(master_support_connect_to_master_rpc);
+DECLARE_bool(master_support_immutable_column_attribute);
DECLARE_bool(mock_table_metrics_for_testing);
DECLARE_bool(rpc_listen_on_unix_domain_socket);
DECLARE_bool(rpc_trace_negotiation);
@@ -235,18 +237,19 @@ namespace client {
class ClientTest : public KuduTest {
public:
- ClientTest() {
+ virtual Status BuildSchema() {
KuduSchemaBuilder b;
b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
b.AddColumn("int_val")->Type(KuduColumnSchema::INT32)->NotNull();
b.AddColumn("string_val")->Type(KuduColumnSchema::STRING)->Nullable();
b.AddColumn("non_null_with_default")->Type(KuduColumnSchema::INT32)->NotNull()
- ->Default(KuduValue::FromInt(12345));
- CHECK_OK(b.Build(&schema_));
+ ->Default(KuduValue::FromInt(12345));
+ return b.Build(&schema_);
}
void SetUp() override {
KuduTest::SetUp();
+ ASSERT_OK(BuildSchema());
// Reduce the TS<->Master heartbeat interval
FLAGS_heartbeat_interval_ms = 10;
@@ -285,7 +288,7 @@ class ClientTest : public KuduTest {
// Looks up the remote tablet entry for a given partition key in the meta
cache.
scoped_refptr<internal::RemoteTablet> MetaCacheLookup(
- KuduTable* table, const PartitionKey& partition_key) {
+ KuduTable* table, const PartitionKey& partition_key) const {
scoped_refptr<internal::RemoteTablet> rt;
Synchronizer sync;
client_->data_->meta_cache_->LookupTabletByKey(
@@ -298,7 +301,7 @@ class ClientTest : public KuduTest {
}
Status MetaCacheLookupById(
- const string& tablet_id, scoped_refptr<internal::RemoteTablet>*
remote_tablet) {
+ const string& tablet_id, scoped_refptr<internal::RemoteTablet>*
remote_tablet) const {
remote_tablet->reset();
scoped_refptr<internal::RemoteTablet> rt;
Synchronizer sync;
@@ -311,7 +314,7 @@ class ClientTest : public KuduTest {
}
// Generate a set of split rows for tablets used in this test.
- vector<unique_ptr<KuduPartialRow>> GenerateSplitRows() {
+ vector<unique_ptr<KuduPartialRow>> GenerateSplitRows() const {
vector<unique_ptr<KuduPartialRow>> rows;
unique_ptr<KuduPartialRow> row(schema_.NewRow());
CHECK_OK(row->SetInt32(0, 9));
@@ -322,7 +325,7 @@ class ClientTest : public KuduTest {
// Count the rows of a table, checking that the operation succeeds.
//
// Must be public to use as a thread closure.
- void CheckRowCount(KuduTable* table, int expected) {
+ void CheckRowCount(KuduTable* table, int expected) const {
CHECK_EQ(CountRowsFromClient(table), expected);
}
@@ -372,7 +375,7 @@ class ClientTest : public KuduTest {
// derived classes to test client location assignment.
virtual void SetLocationMappingCmd() {}
- string GetFirstTabletId(KuduTable* table) {
+ string GetFirstTabletId(KuduTable* table) const {
GetTableLocationsRequestPB req;
GetTableLocationsResponsePB resp;
req.mutable_table()->set_table_name(table->name());
@@ -385,7 +388,7 @@ class ClientTest : public KuduTest {
return resp.tablet_locations(0).tablet_id();
}
- void CheckNoRpcOverflow() {
+ void CheckNoRpcOverflow() const {
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
MiniTabletServer* server = cluster_->mini_tablet_server(i);
if (server->is_started()) {
@@ -411,8 +414,8 @@ class ClientTest : public KuduTest {
// Inserts given number of tests rows into the specified table
// in the context of the session.
- static void InsertTestRows(KuduTable* table, KuduSession* session,
- int num_rows, int first_row = 0) {
+ void InsertTestRows(KuduTable* table, KuduSession* session,
+ int num_rows, int first_row = 0) const {
for (int i = first_row; i < num_rows + first_row; ++i) {
unique_ptr<KuduInsert> insert(BuildTestInsert(table, i));
ASSERT_OK(session->Apply(insert.release()));
@@ -421,7 +424,7 @@ class ClientTest : public KuduTest {
// Inserts 'num_rows' test rows using 'client'
void InsertTestRows(KuduClient* client, KuduTable* table,
- int num_rows, int first_row = 0) {
+ int num_rows, int first_row = 0) const {
shared_ptr<KuduSession> session = client->NewSession();
ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
session->SetTimeoutMillis(60000);
@@ -431,11 +434,11 @@ class ClientTest : public KuduTest {
}
// Inserts 'num_rows' using the default client.
- void InsertTestRows(KuduTable* table, int num_rows, int first_row = 0) {
+ void InsertTestRows(KuduTable* table, int num_rows, int first_row = 0) const
{
InsertTestRows(client_.get(), table, num_rows, first_row);
}
- void UpdateTestRows(KuduTable* table, int lo, int hi) {
+ void UpdateTestRows(KuduTable* table, int lo, int hi) const {
shared_ptr<KuduSession> session = client_->NewSession();
ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
session->SetTimeoutMillis(10000);
@@ -447,7 +450,7 @@ class ClientTest : public KuduTest {
NO_FATALS(CheckNoRpcOverflow());
}
- void DeleteTestRows(KuduTable* table, int lo, int hi) {
+ void DeleteTestRows(KuduTable* table, int lo, int hi) const {
shared_ptr<KuduSession> session = client_->NewSession();
ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
session->SetTimeoutMillis(10000);
@@ -459,27 +462,41 @@ class ClientTest : public KuduTest {
NO_FATALS(CheckNoRpcOverflow());
}
- static unique_ptr<KuduInsert> BuildTestInsert(KuduTable* table, int index) {
+ unique_ptr<KuduInsert> BuildTestInsert(KuduTable* table, int index) const {
unique_ptr<KuduInsert> insert(table->NewInsert());
KuduPartialRow* row = insert->mutable_row();
PopulateDefaultRow(row, index);
return insert;
}
- static unique_ptr<KuduInsertIgnore> BuildTestInsertIgnore(KuduTable* table,
int index) {
+ unique_ptr<KuduInsertIgnore> BuildTestInsertIgnore(KuduTable* table, int
index) const {
unique_ptr<KuduInsertIgnore> insert_ignore(table->NewInsertIgnore());
KuduPartialRow* row = insert_ignore->mutable_row();
PopulateDefaultRow(row, index);
return insert_ignore;
}
- static unique_ptr<KuduUpdateIgnore> BuildTestUpdateIgnore(KuduTable* table,
int index) {
+ unique_ptr<KuduUpdate> BuildTestUpdate(KuduTable* table, int index) const {
+ unique_ptr<KuduUpdate> update(table->NewUpdate());
+ KuduPartialRow* row = update->mutable_row();
+ PopulateDefaultRow(row, index);
+ return update;
+ }
+
+ unique_ptr<KuduUpdateIgnore> BuildTestUpdateIgnore(KuduTable* table, int
index) const {
unique_ptr<KuduUpdateIgnore> update_ignore(table->NewUpdateIgnore());
KuduPartialRow* row = update_ignore->mutable_row();
PopulateDefaultRow(row, index);
return update_ignore;
}
+ unique_ptr<KuduUpsertIgnore> BuildTestUpsertIgnore(KuduTable* table, int
index) const {
+ unique_ptr<KuduUpsertIgnore> upsert_ignore(table->NewUpsertIgnore());
+ KuduPartialRow* row = upsert_ignore->mutable_row();
+ PopulateDefaultRow(row, index);
+ return upsert_ignore;
+ }
+
static unique_ptr<KuduDeleteIgnore> BuildTestDeleteIgnore(KuduTable* table,
int index) {
unique_ptr<KuduDeleteIgnore> delete_ignore(table->NewDeleteIgnore());
KuduPartialRow* row = delete_ignore->mutable_row();
@@ -487,13 +504,26 @@ class ClientTest : public KuduTest {
return delete_ignore;
}
- static void PopulateDefaultRow(KuduPartialRow* row, int index) {
+ virtual void PopulateDefaultRow(KuduPartialRow* row, int index) const {
CHECK_OK(row->SetInt32(0, index));
CHECK_OK(row->SetInt32(1, index * 2));
CHECK_OK(row->SetStringCopy(2, Slice(StringPrintf("hello %d", index))));
CHECK_OK(row->SetInt32(3, index * 3));
}
+ virtual void DoTestVerifyRows(const shared_ptr<KuduTable>& tbl, int
num_rows) const {
+ vector<string> rows;
+ KuduScanner scanner(tbl.get());
+ ASSERT_OK(ScanToStrings(&scanner, &rows));
+ ASSERT_EQ(num_rows, rows.size());
+ for (int i = 0; i < num_rows; i++) {
+ int key = i + 1;
+ ASSERT_EQ(StringPrintf("(int32 key=%d, int32 int_val=%d, string
string_val=\"hello %d\", "
+ "int32 non_null_with_default=%d)", key, key*2,
key, key*3),
+ rows[i]);
+ }
+ }
+
static unique_ptr<KuduUpdate> UpdateTestRow(KuduTable* table, int index) {
unique_ptr<KuduUpdate> update(table->NewUpdate());
KuduPartialRow* row = update->mutable_row();
@@ -668,14 +698,14 @@ class ClientTest : public KuduTest {
}
}
- int CountRowsFromClient(KuduTable* table) {
+ int CountRowsFromClient(KuduTable* table) const {
return CountRowsFromClient(table, KuduScanner::READ_LATEST);
}
int CountRowsFromClient(KuduTable* table,
KuduScanner::ReadMode scan_mode,
int32_t lower_bound = kNoBound,
- int32_t upper_bound = kNoBound) {
+ int32_t upper_bound = kNoBound) const {
return CountRowsFromClient(table, KuduClient::LEADER_ONLY, scan_mode,
lower_bound, upper_bound);
}
@@ -684,7 +714,7 @@ class ClientTest : public KuduTest {
KuduClient::ReplicaSelection selection,
KuduScanner::ReadMode scan_mode,
int32_t lower_bound = kNoBound,
- int32_t upper_bound = kNoBound) {
+ int32_t upper_bound = kNoBound) const {
KuduScanner scanner(table);
CHECK_OK(scanner.SetSelection(selection));
CHECK_OK(scanner.SetProjectedColumnNames({}));
@@ -2933,22 +2963,11 @@ TEST_F(ClientTest, TestInsertSingleRowManualBatch) {
FlushSessionOrDie(session);
}
-static void DoTestVerifyRows(const shared_ptr<KuduTable>& tbl, int num_rows) {
- vector<string> rows;
- KuduScanner scanner(tbl.get());
- ASSERT_OK(ScanToStrings(&scanner, &rows));
- ASSERT_EQ(num_rows, rows.size());
- for (int i = 0; i < num_rows; i++) {
- int key = i + 1;
- ASSERT_EQ(StringPrintf("(int32 key=%d, int32 int_val=%d, string
string_val=\"hello %d\", "
- "int32 non_null_with_default=%d)", key, key*2, key, key*3), rows[i]);
- }
-}
-
static void DoVerifyMetrics(const KuduSession* session,
int64_t successful_inserts,
int64_t insert_ignore_errors,
int64_t successful_upserts,
+ int64_t upsert_ignore_errors,
int64_t successful_updates,
int64_t update_ignore_errors,
int64_t successful_deletes,
@@ -2957,7 +2976,7 @@ static void DoVerifyMetrics(const KuduSession* session,
ASSERT_EQ(successful_inserts, metrics["successful_inserts"]);
ASSERT_EQ(insert_ignore_errors, metrics["insert_ignore_errors"]);
ASSERT_EQ(successful_upserts, metrics["successful_upserts"]);
- // TODO(yingchun): should test upsert_ignore_errors
+ ASSERT_EQ(upsert_ignore_errors, metrics["upsert_ignore_errors"]);
ASSERT_EQ(successful_updates, metrics["successful_updates"]);
ASSERT_EQ(update_ignore_errors, metrics["update_ignore_errors"]);
ASSERT_EQ(successful_deletes, metrics["successful_deletes"]);
@@ -2972,16 +2991,16 @@ TEST_F(ClientTest, TestInsertIgnore) {
{
unique_ptr<KuduInsert> insert(BuildTestInsert(client_table_.get(), 1));
ASSERT_OK(session->Apply(insert.release()));
- DoTestVerifyRows(client_table_, 1);
- DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 0, 0);
+ NO_FATALS(DoTestVerifyRows(client_table_, 1));
+ NO_FATALS(DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 0, 0, 0));
}
{
// INSERT IGNORE results in no error on duplicate primary key.
unique_ptr<KuduInsertIgnore>
insert_ignore(BuildTestInsertIgnore(client_table_.get(), 1));
ASSERT_OK(session->Apply(insert_ignore.release()));
- DoTestVerifyRows(client_table_, 1);
- DoVerifyMetrics(session.get(), 1, 1, 0, 0, 0, 0, 0);
+ NO_FATALS(DoTestVerifyRows(client_table_, 1));
+ NO_FATALS(DoVerifyMetrics(session.get(), 1, 1, 0, 0, 0, 0, 0, 0));
}
{
@@ -2992,16 +3011,63 @@ TEST_F(ClientTest, TestInsertIgnore) {
ASSERT_OK(insert_ignore->mutable_row()->SetStringCopy("string_val", "hello
world"));
ASSERT_OK(insert_ignore->mutable_row()->SetInt32("non_null_with_default",
999));
ASSERT_OK(session->Apply(insert_ignore.release())); // returns ok but
results in no change
- DoTestVerifyRows(client_table_, 1);
- DoVerifyMetrics(session.get(), 1, 2, 0, 0, 0, 0, 0);
+ NO_FATALS(DoTestVerifyRows(client_table_, 1));
+ NO_FATALS(DoVerifyMetrics(session.get(), 1, 2, 0, 0, 0, 0, 0, 0));
}
{
// INSERT IGNORE can insert new row.
unique_ptr<KuduInsertIgnore>
insert_ignore(BuildTestInsertIgnore(client_table_.get(), 2));
ASSERT_OK(session->Apply(insert_ignore.release()));
- DoTestVerifyRows(client_table_, 2);
- DoVerifyMetrics(session.get(), 2, 2, 0, 0, 0, 0, 0);
+ NO_FATALS(DoTestVerifyRows(client_table_, 2));
+ NO_FATALS(DoVerifyMetrics(session.get(), 2, 2, 0, 0, 0, 0, 0, 0));
+ }
+}
+
+TEST_F(ClientTest, TestUpdate) {
+ shared_ptr<KuduSession> session = client_->NewSession();
+ session->SetTimeoutMillis(10000);
+ ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+
+ {
+ // UPDATE results in an error on missing primary key.
+ unique_ptr<KuduUpdate> update(BuildTestUpdate(client_table_.get(), 1));
+ Status s = session->Apply(update.release());
+ ASSERT_TRUE(s.IsIOError()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "failed to flush data: error details are available "
+ "via KuduSession::GetPendingErrors()");
+ vector<KuduError*> errors;
+ ElementDeleter d(&errors);
+ bool overflow;
+ session->GetPendingErrors(&errors, &overflow);
+ NO_FATALS(DoTestVerifyRows(client_table_, 0));
+ NO_FATALS(DoVerifyMetrics(session.get(), 0, 0, 0, 0, 0, 0, 0, 0));
+ }
+
+ {
+ unique_ptr<KuduInsert> insert(BuildTestInsert(client_table_.get(), 1));
+ ASSERT_OK(session->Apply(insert.release()));
+ NO_FATALS(DoTestVerifyRows(client_table_, 1));
+ NO_FATALS(DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 0, 0, 0)); //
successful_inserts++
+ }
+
+ {
+ // UPDATE can update row.
+ unique_ptr<KuduUpdate> update(client_table_->NewUpdate());
+ ASSERT_OK(update->mutable_row()->SetInt32("key", 1));
+ ASSERT_OK(update->mutable_row()->SetInt32("int_val", 999));
+ ASSERT_OK(update->mutable_row()->SetStringCopy("string_val", "hello
world"));
+ ASSERT_OK(update->mutable_row()->SetInt32("non_null_with_default", 999));
+ ASSERT_OK(session->Apply(update.release()));
+ NO_FATALS(DoVerifyMetrics(session.get(), 1, 0, 0, 0, 1, 0, 0, 0)); //
successful_updates++
+
+ vector<string> rows;
+ KuduScanner scanner(client_table_.get());
+ ASSERT_OK(ScanToStrings(&scanner, &rows));
+ ASSERT_EQ(1, rows.size());
+ ASSERT_EQ("(int32 key=1, int32 int_val=999, string string_val=\"hello
world\", "
+ "int32 non_null_with_default=999)", rows[0]);
}
}
@@ -3014,15 +3080,15 @@ TEST_F(ClientTest, TestUpdateIgnore) {
// UPDATE IGNORE results in no error on missing primary key.
unique_ptr<KuduUpdateIgnore>
update_ignore(BuildTestUpdateIgnore(client_table_.get(), 1));
ASSERT_OK(session->Apply(update_ignore.release()));
- DoTestVerifyRows(client_table_, 0);
- DoVerifyMetrics(session.get(), 0, 0, 0, 0, 1, 0, 0);
+ NO_FATALS(DoTestVerifyRows(client_table_, 0));
+ NO_FATALS(DoVerifyMetrics(session.get(), 0, 0, 0, 0, 0, 1, 0, 0));
}
{
unique_ptr<KuduInsert> insert(BuildTestInsert(client_table_.get(), 1));
ASSERT_OK(session->Apply(insert.release()));
- DoTestVerifyRows(client_table_, 1);
- DoVerifyMetrics(session.get(), 1, 0, 0, 0, 1, 0, 0);
+ NO_FATALS(DoTestVerifyRows(client_table_, 1));
+ NO_FATALS(DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 1, 0, 0));
}
{
@@ -3033,7 +3099,7 @@ TEST_F(ClientTest, TestUpdateIgnore) {
ASSERT_OK(update_ignore->mutable_row()->SetStringCopy("string_val", "hello
world"));
ASSERT_OK(update_ignore->mutable_row()->SetInt32("non_null_with_default",
999));
ASSERT_OK(session->Apply(update_ignore.release()));
- DoVerifyMetrics(session.get(), 1, 0, 0, 1, 1, 0, 0);
+ NO_FATALS(DoVerifyMetrics(session.get(), 1, 0, 0, 0, 1, 1, 0, 0));
vector<string> rows;
KuduScanner scanner(client_table_.get());
@@ -3053,7 +3119,7 @@ TEST_F(ClientTest, TestDeleteIgnore) {
unique_ptr<KuduInsert> insert(BuildTestInsert(client_table_.get(), 1));
ASSERT_OK(session->Apply(insert.release()));
DoTestVerifyRows(client_table_, 1);
- DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 0, 0);
+ NO_FATALS(DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 0, 0, 0));
}
{
@@ -3061,7 +3127,7 @@ TEST_F(ClientTest, TestDeleteIgnore) {
unique_ptr<KuduDeleteIgnore>
delete_ignore(BuildTestDeleteIgnore(client_table_.get(), 1));
ASSERT_OK(session->Apply(delete_ignore.release()));
DoTestVerifyRows(client_table_, 0);
- DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 1, 0);
+ NO_FATALS(DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 0, 1, 0));
}
{
@@ -3069,7 +3135,7 @@ TEST_F(ClientTest, TestDeleteIgnore) {
unique_ptr<KuduDeleteIgnore>
delete_ignore(BuildTestDeleteIgnore(client_table_.get(), 1));
ASSERT_OK(session->Apply(delete_ignore.release()));
DoTestVerifyRows(client_table_, 0);
- DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 1, 1);
+ NO_FATALS(DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 0, 1, 1));
}
}
@@ -3122,11 +3188,22 @@ static Status ApplyUpsertToSession(KuduSession* session,
const shared_ptr<KuduTable>& table,
int row_key,
int int_val,
- const char* string_val) {
+ const char* string_val,
+ optional<int> imm_val = nullopt) {
unique_ptr<KuduUpsert> upsert(table->NewUpsert());
RETURN_NOT_OK(upsert->mutable_row()->SetInt32("key", row_key));
RETURN_NOT_OK(upsert->mutable_row()->SetInt32("int_val", int_val));
RETURN_NOT_OK(upsert->mutable_row()->SetStringCopy("string_val",
string_val));
+ // Here we use the first column to initialize an object of KuduColumnSchema
+ // for there is no default constructor for it.
+ KuduColumnSchema col_schema = table->schema().Column(0);
+ if (table->schema().HasColumn("imm_val", &col_schema)) {
+ if (imm_val) {
+ RETURN_NOT_OK(upsert->mutable_row()->SetInt32("imm_val", *imm_val));
+ } else {
+ RETURN_NOT_OK(upsert->mutable_row()->SetNull("imm_val"));
+ }
+ }
return session->Apply(upsert.release());
}
@@ -4477,7 +4554,7 @@ TEST_F(ClientTest, TestUpsert) {
// Perform and verify UPSERT which acts as an INSERT.
ASSERT_OK(ApplyUpsertToSession(session.get(), client_table_, 1, 1, "original
row"));
FlushSessionOrDie(session);
- DoVerifyMetrics(session.get(), 0, 0, 1, 0, 0, 0, 0);
+ NO_FATALS(DoVerifyMetrics(session.get(), 0, 0, 1, 0, 0, 0, 0, 0));
{
vector<string> rows;
@@ -4490,7 +4567,7 @@ TEST_F(ClientTest, TestUpsert) {
// Perform and verify UPSERT which acts as an UPDATE.
ASSERT_OK(ApplyUpsertToSession(session.get(), client_table_, 1, 2, "upserted
row"));
FlushSessionOrDie(session);
- DoVerifyMetrics(session.get(), 0, 0, 2, 0, 0, 0, 0);
+ NO_FATALS(DoVerifyMetrics(session.get(), 0, 0, 2, 0, 0, 0, 0, 0));
{
vector<string> rows;
@@ -4509,7 +4586,7 @@ TEST_F(ClientTest, TestUpsert) {
ASSERT_OK(row->SetInt32("non_null_with_default", 999));
ASSERT_OK(session->Apply(update.release()));
FlushSessionOrDie(session);
- DoVerifyMetrics(session.get(), 0, 0, 2, 1, 0, 0, 0);
+ NO_FATALS(DoVerifyMetrics(session.get(), 0, 0, 2, 0, 1, 0, 0, 0));
}
{
vector<string> rows;
@@ -4523,7 +4600,7 @@ TEST_F(ClientTest, TestUpsert) {
// column, and therefore should not revert it back to its default.
ASSERT_OK(ApplyUpsertToSession(session.get(), client_table_, 1, 3, "upserted
row 2"));
FlushSessionOrDie(session);
- DoVerifyMetrics(session.get(), 0, 0, 3, 1, 0, 0, 0);
+ NO_FATALS(DoVerifyMetrics(session.get(), 0, 0, 3, 0, 1, 0, 0, 0));
{
vector<string> rows;
ASSERT_OK(ScanTableToStrings(client_table_.get(), &rows));
@@ -4535,7 +4612,7 @@ TEST_F(ClientTest, TestUpsert) {
// Delete the row.
ASSERT_OK(ApplyDeleteToSession(session.get(), client_table_, 1));
FlushSessionOrDie(session);
- DoVerifyMetrics(session.get(), 0, 0, 3, 1, 0, 1, 0);
+ NO_FATALS(DoVerifyMetrics(session.get(), 0, 0, 3, 0, 1, 0, 1, 0));
{
vector<string> rows;
ASSERT_OK(ScanTableToStrings(client_table_.get(), &rows));
@@ -8717,12 +8794,379 @@ TEST_F(ClientTest, TestProjectionPredicatesFuzz) {
unordered_set<string>(rows.begin(), rows.end())) << rows;
}
+class ClientTestImmutableColumn : public ClientTest,
+ public
::testing::WithParamInterface<std::tuple<bool, bool>> {
+ public:
+ ClientTestImmutableColumn() {
+ init_immu_col_to_null_ = std::get<0>(GetParam());
+ update_immu_col_to_null_ = std::get<1>(GetParam());
+ }
+
+ Status BuildSchema() override {
+ KuduSchemaBuilder b;
+ b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
+ b.AddColumn("int_val")->Type(KuduColumnSchema::INT32)->NotNull();
+ b.AddColumn("string_val")->Type(KuduColumnSchema::STRING)->Nullable();
+
b.AddColumn("non_null_with_default")->Type(KuduColumnSchema::INT32)->NotNull()
+ ->Default(KuduValue::FromInt(12345));
+
b.AddColumn("imm_val")->Type(KuduColumnSchema::INT32)->Immutable()->Nullable();
+ return b.Build(&schema_);
+ }
+
+ void PopulateImmutableCell(KuduPartialRow* row, int index) const {
+ if (init_immu_col_to_null_) {
+ ASSERT_OK(row->SetNull(4));
+ } else {
+ ASSERT_OK(row->SetInt32(4, index * 4));
+ }
+ }
+
+ void PopulateDefaultRow(KuduPartialRow* row, int index) const override {
+ ClientTest::PopulateDefaultRow(row, index);
+ NO_FATALS(PopulateImmutableCell(row, index));
+ }
+
+ string ExpectImmuColCell(int index) const {
+ return init_immu_col_to_null_ ? "NULL" : std::to_string(index*4);
+ }
+
+ void DoTestVerifyRows(const shared_ptr<KuduTable>& tbl, int num_rows) const
override {
+ vector<string> rows;
+ KuduScanner scanner(tbl.get());
+ ASSERT_OK(ScanToStrings(&scanner, &rows));
+ ASSERT_EQ(num_rows, rows.size());
+ for (int i = 0; i < num_rows; i++) {
+ int key = i + 1;
+ ASSERT_EQ(
+ Substitute("(int32 key=$0, int32 int_val=$1, string
string_val=\"hello $2\", "
+ "int32 non_null_with_default=$3, int32 imm_val=$4)",
+ key, key*2, key, key*3, ExpectImmuColCell(key)),
+ rows[i]);
+ }
+ }
+
+ protected:
+ bool init_immu_col_to_null_;
+ bool update_immu_col_to_null_;
+};
+
+INSTANTIATE_TEST_SUITE_P(Params, ClientTestImmutableColumn,
+ ::testing::Combine(::testing::Bool(),
+ ::testing::Bool()));
+
+TEST_P(ClientTestImmutableColumn, TestUpdate) {
+ shared_ptr<KuduSession> session = client_->NewSession();
+ session->SetTimeoutMillis(10000);
+ ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+
+ {
+ unique_ptr<KuduInsert> insert(BuildTestInsert(client_table_.get(), 1));
+ ASSERT_OK(session->Apply(insert.release()));
+ NO_FATALS(DoTestVerifyRows(client_table_, 1));
+ NO_FATALS(DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 0, 0, 0)); //
successful_inserts++
+ }
+
+ const string expect_row = Substitute(
+ "(int32 key=1, int32 int_val=999, string string_val=\"hello world\", "
+ "int32 non_null_with_default=999, int32 imm_val=$0)",
+ ExpectImmuColCell(1));
+ {
+ // UPDATE can update row without immutable column set.
+ unique_ptr<KuduUpdate> update(client_table_->NewUpdate());
+ ASSERT_OK(update->mutable_row()->SetInt32("key", 1));
+ ASSERT_OK(update->mutable_row()->SetInt32("int_val", 999));
+ ASSERT_OK(update->mutable_row()->SetStringCopy("string_val", "hello
world"));
+ ASSERT_OK(update->mutable_row()->SetInt32("non_null_with_default", 999));
+ ASSERT_OK(session->Apply(update.release()));
+ NO_FATALS(DoVerifyMetrics(session.get(), 1, 0, 0, 0, 1, 0, 0, 0)); //
successful_updates++
+
+ vector<string> rows;
+ KuduScanner scanner(client_table_.get());
+ ASSERT_OK(ScanToStrings(&scanner, &rows));
+ ASSERT_EQ(1, rows.size());
+ ASSERT_EQ(expect_row, rows[0]);
+ }
+
+ {
+ // UPDATE results in an error when attempting to update row having at
least one column with the
+ // immutable attribute set.
+ unique_ptr<KuduUpdate> update(client_table_->NewUpdate());
+ ASSERT_OK(update->mutable_row()->SetInt32("key", 1));
+ ASSERT_OK(update->mutable_row()->SetInt32("int_val", 888));
+ ASSERT_OK(update->mutable_row()->SetStringCopy("string_val", "world
hello"));
+ ASSERT_OK(update->mutable_row()->SetInt32("non_null_with_default", 888));
+ if (update_immu_col_to_null_) {
+ ASSERT_OK(update->mutable_row()->SetNull("imm_val"));
+ } else {
+ ASSERT_OK(update->mutable_row()->SetInt32("imm_val", 888));
+ }
+ Status s = session->Apply(update.release());
+ ASSERT_TRUE(s.IsIOError()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "failed to flush data: error details are available "
+ "via KuduSession::GetPendingErrors()");
+ vector<KuduError*> errors;
+ ElementDeleter d(&errors);
+ bool overflow;
+ session->GetPendingErrors(&errors, &overflow);
+ ASSERT_FALSE(overflow);
+ ASSERT_EQ(1, errors.size());
+ ASSERT_STR_CONTAINS(
+ errors[0]->status().ToString(),
+ "Immutable: UPDATE not allowed for immutable column: imm_val INT32
NULLABLE IMMUTABLE");
+ NO_FATALS(DoVerifyMetrics(session.get(), 1, 0, 0, 0, 1, 0, 0, 0)); //
nothing changed
+
+ vector<string> rows;
+ KuduScanner scanner(client_table_.get());
+ ASSERT_OK(ScanToStrings(&scanner, &rows));
+ ASSERT_EQ(1, rows.size());
+ ASSERT_EQ(expect_row, rows[0]);
+ }
+}
+
+TEST_P(ClientTestImmutableColumn, TestUpdateIgnore) {
+ shared_ptr<KuduSession> session = client_->NewSession();
+ session->SetTimeoutMillis(10000);
+ ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+
+ {
+ unique_ptr<KuduInsert> insert(BuildTestInsert(client_table_.get(), 1));
+ ASSERT_OK(session->Apply(insert.release()));
+ NO_FATALS(DoTestVerifyRows(client_table_, 1));
+ NO_FATALS(DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 0, 0, 0)); //
successful_inserts++
+ }
+
+ const string expect_row = Substitute(
+ "(int32 key=1, int32 int_val=888, string string_val=\"world hello\", "
+ "int32 non_null_with_default=888, int32 imm_val=$0)",
+ ExpectImmuColCell(1));
+ {
+ // UPDATE IGNORE can update a row without changing the immutable column
cell, the error of
+ // updating the immutable column will be ignored.
+ unique_ptr<KuduUpdateIgnore>
update_ignore(client_table_->NewUpdateIgnore());
+ ASSERT_OK(update_ignore->mutable_row()->SetInt32("key", 1));
+ ASSERT_OK(update_ignore->mutable_row()->SetInt32("int_val", 888));
+ ASSERT_OK(update_ignore->mutable_row()->SetStringCopy("string_val", "world
hello"));
+ ASSERT_OK(update_ignore->mutable_row()->SetInt32("non_null_with_default",
888));
+ if (update_immu_col_to_null_) {
+ ASSERT_OK(update_ignore->mutable_row()->SetNull("imm_val"));
+ } else {
+ ASSERT_OK(update_ignore->mutable_row()->SetInt32("imm_val", 888));
+ }
+ ASSERT_OK(session->Apply(update_ignore.release()));
+ // successful_updates++, update_ignore_errors++,
+ NO_FATALS(DoVerifyMetrics(session.get(), 1, 0, 0, 0, 1, 1, 0, 0));
+
+ vector<string> rows;
+ KuduScanner scanner(client_table_.get());
+ ASSERT_OK(ScanToStrings(&scanner, &rows));
+ ASSERT_EQ(1, rows.size());
+ ASSERT_EQ(expect_row, rows[0]);
+ }
+
+ {
+ // UPDATE IGNORE only on immutable column. Note that this will result in
+ // a 'Invalid argument: No fields updated' error.
+ unique_ptr<KuduUpdateIgnore>
update_ignore(client_table_->NewUpdateIgnore());
+ ASSERT_OK(update_ignore->mutable_row()->SetInt32("key", 1));
+ if (update_immu_col_to_null_) {
+ ASSERT_OK(update_ignore->mutable_row()->SetNull("imm_val"));
+ } else {
+ ASSERT_OK(update_ignore->mutable_row()->SetInt32("imm_val", 888));
+ }
+ Status s = session->Apply(update_ignore.release());
+ ASSERT_TRUE(s.IsIOError()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "failed to flush data: error details are available "
+ "via KuduSession::GetPendingErrors()");
+ vector<KuduError*> errors;
+ ElementDeleter d(&errors);
+ bool overflow;
+ session->GetPendingErrors(&errors, &overflow);
+ ASSERT_FALSE(overflow);
+ ASSERT_EQ(1, errors.size());
+ ASSERT_STR_CONTAINS(errors[0]->status().ToString(),
+ "Invalid argument: No fields updated, key is: (int32
key=1)");
+ NO_FATALS(DoVerifyMetrics(session.get(), 1, 0, 0, 0, 1, 1, 0, 0)); //
nothing changed
+
+ vector<string> rows;
+ KuduScanner scanner(client_table_.get());
+ ASSERT_OK(ScanToStrings(&scanner, &rows));
+ ASSERT_EQ(1, rows.size());
+ ASSERT_EQ(expect_row, rows[0]);
+ }
+}
+
+TEST_P(ClientTestImmutableColumn, TestUpsertIgnore) {
+ shared_ptr<KuduSession> session = client_->NewSession();
+ session->SetTimeoutMillis(10000);
+ ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+
+ {
+ // UPSERT IGNORE can insert row.
+ unique_ptr<KuduUpsertIgnore>
upsert_ignore(BuildTestUpsertIgnore(client_table_.get(), 1));
+ ASSERT_OK(session->Apply(upsert_ignore.release()));
+ NO_FATALS(DoTestVerifyRows(client_table_, 1));
+ NO_FATALS(DoVerifyMetrics(session.get(), 0, 0, 1, 0, 0, 0, 0, 0)); //
successful_upserts++
+ }
+
+ {
+ // UPSERT IGNORE can update row without immutable column set.
+ unique_ptr<KuduUpsertIgnore>
upsert_ignore(client_table_->NewUpsertIgnore());
+ ASSERT_OK(upsert_ignore->mutable_row()->SetInt32("key", 1));
+ ASSERT_OK(upsert_ignore->mutable_row()->SetInt32("int_val", 999));
+ ASSERT_OK(upsert_ignore->mutable_row()->SetStringCopy("string_val", "hello
world"));
+ ASSERT_OK(upsert_ignore->mutable_row()->SetInt32("non_null_with_default",
999));
+ ASSERT_OK(session->Apply(upsert_ignore.release()));
+ NO_FATALS(DoVerifyMetrics(session.get(), 0, 0, 2, 0, 0, 0, 0, 0)); //
successful_upserts++
+
+ vector<string> rows;
+ KuduScanner scanner(client_table_.get());
+ ASSERT_OK(ScanToStrings(&scanner, &rows));
+ ASSERT_EQ(1, rows.size());
+ ASSERT_EQ(Substitute("(int32 key=1, int32 int_val=999, string
string_val=\"hello world\", "
+ "int32 non_null_with_default=999, int32 imm_val=$0)",
+ ExpectImmuColCell(1)),
+ rows[0]);
+ }
+
+ {
+ // UPSERT IGNORE can update row with immutable column set.
+ unique_ptr<KuduUpsertIgnore>
upsert_ignore(client_table_->NewUpsertIgnore());
+ ASSERT_OK(upsert_ignore->mutable_row()->SetInt32("key", 1));
+ ASSERT_OK(upsert_ignore->mutable_row()->SetInt32("int_val", 888));
+ ASSERT_OK(upsert_ignore->mutable_row()->SetStringCopy("string_val", "world
hello"));
+ ASSERT_OK(upsert_ignore->mutable_row()->SetInt32("non_null_with_default",
888));
+ if (update_immu_col_to_null_) {
+ ASSERT_OK(upsert_ignore->mutable_row()->SetNull("imm_val"));
+ } else {
+ ASSERT_OK(upsert_ignore->mutable_row()->SetInt32("imm_val", 888));
+ }
+ ASSERT_OK(session->Apply(upsert_ignore.release()));
+ // successful_upserts++, upsert_ignore_errors++
+ NO_FATALS(DoVerifyMetrics(session.get(), 0, 0, 3, 1, 0, 0, 0, 0));
+
+ vector<string> rows;
+ KuduScanner scanner(client_table_.get());
+ ASSERT_OK(ScanToStrings(&scanner, &rows));
+ ASSERT_EQ(1, rows.size());
+ ASSERT_EQ(Substitute("(int32 key=1, int32 int_val=888, string
string_val=\"world hello\", "
+ "int32 non_null_with_default=888, int32 imm_val=$0)",
+ ExpectImmuColCell(1)),
+ rows[0]);
+ }
+}
+
+TEST_P(ClientTestImmutableColumn, TestUpsert) {
+ shared_ptr<KuduSession> session = client_->NewSession();
+ ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+
+ // Perform and verify UPSERT which acts as an INSERT.
+ ASSERT_OK(ApplyUpsertToSession(session.get(), client_table_, 1, 1, "original
row", 1));
+ NO_FATALS(DoVerifyMetrics(session.get(), 0, 0, 1, 0, 0, 0, 0, 0)); //
successful_upserts++
+
+ const string expect_row = R"((int32 key=1, int32 int_val=1, string
string_val="original row", )"
+ "int32 non_null_with_default=12345, int32
imm_val=1)";
+ {
+ vector<string> rows;
+ ASSERT_OK(ScanTableToStrings(client_table_.get(), &rows));
+ ASSERT_EQ(1, rows.size());
+ EXPECT_EQ(expect_row, rows[0]);
+ }
+
+ // Perform an UPSERT. This upsert will attemp to update an immutable column,
+ // which will result an error.
+ Status s = ApplyUpsertToSession(session.get(), client_table_, 1, 4,
"upserted row 3",
+ update_immu_col_to_null_ ? nullopt :
optional<int>(999));
+ ASSERT_TRUE(s.IsIOError()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "failed to flush data: error details are available "
+ "via KuduSession::GetPendingErrors()");
+ vector<KuduError*> errors;
+ ElementDeleter d(&errors);
+ bool overflow;
+ session->GetPendingErrors(&errors, &overflow);
+ ASSERT_FALSE(overflow);
+ ASSERT_EQ(1, errors.size());
+ ASSERT_STR_CONTAINS(
+ errors[0]->status().ToString(),
+ "Immutable: UPDATE not allowed for immutable column: imm_val INT32
NULLABLE IMMUTABLE");
+ NO_FATALS(DoVerifyMetrics(session.get(), 0, 0, 1, 0, 0, 0, 0, 0)); //
nothing changed
+ {
+ vector<string> rows;
+ ASSERT_OK(ScanTableToStrings(client_table_.get(), &rows));
+ ASSERT_EQ(1, rows.size());
+ EXPECT_EQ(expect_row, rows[0]); // nothing changed
+ }
+}
+
+class ClientTestImmutableColumnCompatablity : public ClientTest {
+ public:
+ void SetUp() override {
+ // Disable the immutable column attribute feature in master for testing.
+ FLAGS_master_support_immutable_column_attribute = false;
+
+ KuduTest::SetUp();
+
+ // Start minicluster and wait for tablet servers to connect to master.
+ InternalMiniClusterOptions options;
+ options.num_tablet_servers = 1;
+ cluster_.reset(new InternalMiniCluster(env_, std::move(options)));
+ ASSERT_OK(cluster_->StartSync());
+ ASSERT_OK(cluster_->CreateClient(nullptr, &client_));
+ }
+};
+
+TEST_F(ClientTestImmutableColumnCompatablity, CreateTable) {
+ const string kTableName = "create_table_with_immutable_attribute_column";
+ KuduSchemaBuilder b;
+ b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
+
b.AddColumn("imm_val")->Type(KuduColumnSchema::INT32)->Immutable()->Nullable();
+ ASSERT_OK(b.Build(&schema_));
+
+ unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+ Status s = table_creator->table_name(kTableName)
+ .schema(&schema_)
+ .add_hash_partitions({"key"}, 2)
+ .num_replicas(1)
+ .Create();
+ ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
+ ASSERT_STR_CONTAINS(
+ s.ToString(),
+ Substitute("Error creating table $0 on the master: cluster does not
support "
+ "CreateTable with feature(s) IMMUTABLE_COLUMN_ATTRIBUTE",
kTableName));
+}
+
+TEST_F(ClientTestImmutableColumnCompatablity, AlterTable) {
+ const string kTableName = "alter_table_with_immutable_attribute_column";
+ KuduSchemaBuilder b;
+ b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
+ b.AddColumn("int_val")->Type(KuduColumnSchema::INT32)->Nullable();
+ ASSERT_OK(b.Build(&schema_));
+
+ unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+ ASSERT_OK(table_creator->table_name(kTableName)
+ .schema(&schema_)
+ .add_hash_partitions({"key"}, 2)
+ .num_replicas(1)
+ .Create());
+
+ unique_ptr<KuduTableAlterer>
table_alterer(client_->NewTableAlterer(kTableName));
+
table_alterer->AddColumn("imm_val")->Type(KuduColumnSchema::INT32)->Immutable()->Nullable();
+ Status s = table_alterer->Alter();
+ ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
+ ASSERT_STR_CONTAINS(
+ s.ToString(),
+ "cluster does not support AlterTable with feature(s)
IMMUTABLE_COLUMN_ATTRIBUTE");
+}
+
class ClientTestUnixSocket : public ClientTest {
public:
void SetUp() override {
FLAGS_rpc_listen_on_unix_domain_socket = true;
FLAGS_client_use_unix_domain_sockets = true;
ClientTest::SetUp();
+ ASSERT_OK(BuildSchema());
}
};
@@ -8744,6 +9188,7 @@ class MultiTServerClientTest : public ClientTest {
public:
void SetUp() override {
KuduTest::SetUp();
+ ASSERT_OK(BuildSchema());
// Start minicluster and wait for tablet servers to connect to master.
InternalMiniClusterOptions options;
@@ -8862,6 +9307,7 @@ class ReplicationFactorLimitsTest : public ClientTest {
FLAGS_max_num_replicas = 5;
KuduTest::SetUp();
+ ASSERT_OK(BuildSchema());
// Start minicluster and wait for tablet servers to connect to master.
InternalMiniClusterOptions options;
diff --git a/src/kudu/client/client-unittest.cc
b/src/kudu/client/client-unittest.cc
index 27f5d666f..63fe3824f 100644
--- a/src/kudu/client/client-unittest.cc
+++ b/src/kudu/client/client-unittest.cc
@@ -372,7 +372,7 @@ TEST(KuduColumnSchemaTest, TestEquals) {
const int kDefaultOf7 = 7;
KuduColumnSchema a32_dflt("a", KuduColumnSchema::INT32,
/*is_nullable=*/false,
- /*default_value=*/&kDefaultOf7);
+ /*is_immutable=*/false,
/*default_value=*/&kDefaultOf7);
ASSERT_NE(a32, a32_dflt);
}
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 47cb589cd..1c1dd95b5 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -51,6 +51,7 @@
#include "kudu/client/scan_predicate-internal.h"
#include "kudu/client/scan_token-internal.h"
#include "kudu/client/scanner-internal.h"
+#include "kudu/client/schema-internal.h"
#include "kudu/client/session-internal.h"
#include "kudu/client/table-internal.h"
#include "kudu/client/table_alterer-internal.h"
@@ -1040,6 +1041,15 @@ Status KuduTableCreator::Create() {
}
}
+ bool has_immutable_column_schema = false;
+ for (size_t i = 0; i < data_->schema_->num_columns(); i++) {
+ const auto& col_schema = data_->schema_->Column(i);
+ if (col_schema.is_immutable()) {
+ has_immutable_column_schema = true;
+ break;
+ }
+ }
+
if (data_->table_type_) {
req.set_table_type(*data_->table_type_);
}
@@ -1058,7 +1068,8 @@ Status KuduTableCreator::Create() {
&resp,
deadline,
!data_->range_partitions_.empty(),
- has_range_with_custom_hash_schema),
+ has_range_with_custom_hash_schema,
+ has_immutable_column_schema),
Substitute("Error creating table $0 on the master", data_->table_name_));
// Spin until the table is fully created, if requested.
if (data_->wait_) {
@@ -1183,6 +1194,10 @@ KuduUpsert* KuduTable::NewUpsert() {
return new KuduUpsert(shared_from_this());
}
+KuduUpsertIgnore* KuduTable::NewUpsertIgnore() {
+ return new KuduUpsertIgnore(shared_from_this());
+}
+
KuduUpdate* KuduTable::NewUpdate() {
return new KuduUpdate(shared_from_this());
}
@@ -1667,6 +1682,14 @@ Status KuduTableAlterer::Alter() {
AlterTableResponsePB resp;
RETURN_NOT_OK(data_->ToRequest(&req));
+ bool has_immutable_column_schema = false;
+ for (const auto& step : data_->steps_) {
+ if (step.step_type == AlterTableRequestPB::ADD_COLUMN &&
step.spec->data_->immutable) {
+ has_immutable_column_schema = true;
+ break;
+ }
+ }
+
MonoDelta timeout = data_->timeout_.Initialized() ?
data_->timeout_ :
data_->client_->default_admin_operation_timeout();
@@ -1674,7 +1697,8 @@ Status KuduTableAlterer::Alter() {
RETURN_NOT_OK(data_->client_->data_->AlterTable(
data_->client_, req, &resp, deadline,
data_->has_alter_partitioning_steps,
- data_->adding_range_with_custom_hash_schema));
+ data_->adding_range_with_custom_hash_schema,
+ has_immutable_column_schema));
if (data_->has_alter_partitioning_steps) {
// If the table partitions change, clear the local meta cache so that the
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index f39cd8f3c..cbe9877d0 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -107,6 +107,7 @@ class KuduTabletServer;
class KuduUpdate;
class KuduUpdateIgnore;
class KuduUpsert;
+class KuduUpsertIgnore;
class KuduValue;
class KuduWriteOperation;
class ResourceMetrics;
@@ -1624,6 +1625,11 @@ class KUDU_EXPORT KuduTable : public
sp::enable_shared_from_this<KuduTable> {
/// KuduSession::Apply().
KuduUpsert* NewUpsert();
+ /// @return New @c UPSERT_IGNORE operation for this table. It is the
+ /// caller's responsibility to free the result, unless it is passed to
+ /// KuduSession::Apply().
+ KuduUpsertIgnore* NewUpsertIgnore();
+
/// @return New @c UPDATE operation for this table. It is the caller's
/// responsibility to free the result, unless it is passed to
/// KuduSession::Apply().
diff --git a/src/kudu/client/schema-internal.h
b/src/kudu/client/schema-internal.h
index df5a963e0..0b8357545 100644
--- a/src/kudu/client/schema-internal.h
+++ b/src/kudu/client/schema-internal.h
@@ -80,6 +80,7 @@ class KuduColumnSpec::Data {
std::optional<KuduColumnStorageAttributes::CompressionType> compression;
std::optional<int32_t> block_size;
std::optional<bool> nullable;
+ std::optional<bool> immutable;
bool primary_key;
std::optional<KuduValue*> default_val; // Owned.
bool remove_default; // For ALTER
diff --git a/src/kudu/client/schema.cc b/src/kudu/client/schema.cc
index e3e48324f..75d3ba16b 100644
--- a/src/kudu/client/schema.cc
+++ b/src/kudu/client/schema.cc
@@ -345,6 +345,16 @@ KuduColumnSpec* KuduColumnSpec::Nullable() {
return this;
}
+KuduColumnSpec* KuduColumnSpec::Immutable() {
+ data_->immutable = true;
+ return this;
+}
+
+KuduColumnSpec* KuduColumnSpec::Mutable() {
+ data_->immutable = false;
+ return this;
+}
+
KuduColumnSpec* KuduColumnSpec::RemoveDefault() {
data_->remove_default = true;
return this;
@@ -453,6 +463,7 @@ Status KuduColumnSpec::ToColumnSchema(KuduColumnSchema*
col) const {
KuduColumnTypeAttributes type_attrs(precision, scale, length);
DataType internal_type = ToInternalDataType(data_->type.value(), type_attrs);
bool nullable = data_->nullable ? data_->nullable.value() : true;
+ bool immutable = data_->immutable ? data_->immutable.value() : false;
void* default_val = nullptr;
// TODO(unknown): distinguish between DEFAULT NULL and no default?
@@ -474,7 +485,7 @@ Status KuduColumnSpec::ToColumnSchema(KuduColumnSchema*
col) const {
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
*col = KuduColumnSchema(data_->name, data_->type.value(), nullable,
- default_val,
+ immutable, default_val,
KuduColumnStorageAttributes(encoding, compression,
block_size),
type_attrs,
data_->comment ? data_->comment.value() : "");
@@ -732,6 +743,7 @@ string KuduColumnSchema::DataTypeToString(DataType type) {
KuduColumnSchema::KuduColumnSchema(const string &name,
DataType type,
bool is_nullable,
+ bool is_immutable,
const void* default_value,
const KuduColumnStorageAttributes&
storage_attributes,
const KuduColumnTypeAttributes&
type_attributes,
@@ -746,7 +758,7 @@ KuduColumnSchema::KuduColumnSchema(const string &name,
type_attr_private.length = type_attributes.length();
col_ = new ColumnSchema(name, ToInternalDataType(type, type_attributes),
is_nullable,
- false, // TODO(yingchun): set according to a new
added parameter later
+ is_immutable,
default_value, default_value, attr_private,
type_attr_private, comment);
}
@@ -800,6 +812,10 @@ bool KuduColumnSchema::is_nullable() const {
return DCHECK_NOTNULL(col_)->is_nullable();
}
+bool KuduColumnSchema::is_immutable() const {
+ return DCHECK_NOTNULL(col_)->is_immutable();
+}
+
KuduColumnSchema::DataType KuduColumnSchema::type() const {
return FromInternalDataType(DCHECK_NOTNULL(col_)->type_info()->type());
}
@@ -901,7 +917,7 @@ KuduColumnSchema KuduSchema::Column(size_t idx) const {
KuduColumnTypeAttributes type_attrs(col.type_attributes().precision,
col.type_attributes().scale,
col.type_attributes().length);
return KuduColumnSchema(col.name(),
FromInternalDataType(col.type_info()->type()),
- col.is_nullable(), col.read_default_value(),
+ col.is_nullable(), col.is_immutable(),
col.read_default_value(),
attrs, type_attrs, col.comment());
}
diff --git a/src/kudu/client/schema.h b/src/kudu/client/schema.h
index 2fae75907..a7a8af6e5 100644
--- a/src/kudu/client/schema.h
+++ b/src/kudu/client/schema.h
@@ -303,6 +303,9 @@ class KUDU_EXPORT KuduColumnSchema {
/// @return @c true iff the column schema has the nullable attribute set.
bool is_nullable() const;
+
+ /// @return @c true iff the column schema has the immutable attribute set.
+ bool is_immutable() const;
///@}
/// @return Type attributes of the column schema.
@@ -340,6 +343,7 @@ class KUDU_EXPORT KuduColumnSchema {
const std::string &name,
DataType type,
bool is_nullable = false,
+ bool is_immutable = false,
const void* default_value = NULL, //NOLINT(modernize-use-nullptr)
const KuduColumnStorageAttributes& storage_attributes =
KuduColumnStorageAttributes(),
const KuduColumnTypeAttributes& type_attributes =
KuduColumnTypeAttributes(),
@@ -497,6 +501,16 @@ class KUDU_EXPORT KuduColumnSpec {
/// @return Pointer to the modified object.
KuduColumnSpec* Nullable();
+ /// Set the column to be immutable.
+ ///
+ /// @return Pointer to the modified object.
+ KuduColumnSpec* Immutable();
+
+ /// Set the column to be mutable (the default).
+ ///
+ /// @return Pointer to the modified object.
+ KuduColumnSpec* Mutable();
+
/// Set the data type of the column.
///
/// @note Column data types may not be changed once a table is created.
diff --git a/src/kudu/client/table_alterer-internal.cc
b/src/kudu/client/table_alterer-internal.cc
index 68a8b8def..ff869d5f0 100644
--- a/src/kudu/client/table_alterer-internal.cc
+++ b/src/kudu/client/table_alterer-internal.cc
@@ -138,7 +138,8 @@ Status
KuduTableAlterer::Data::ToRequest(AlterTableRequestPB* req) {
!s.spec->data_->encoding &&
!s.spec->data_->compression &&
!s.spec->data_->block_size &&
- !s.spec->data_->comment) {
+ !s.spec->data_->comment &&
+ !s.spec->data_->immutable) {
return Status::InvalidArgument("no alter operation specified",
s.spec->data_->name);
}
@@ -150,7 +151,8 @@ Status
KuduTableAlterer::Data::ToRequest(AlterTableRequestPB* req) {
!s.spec->data_->encoding &&
!s.spec->data_->compression &&
!s.spec->data_->block_size &&
- !s.spec->data_->comment) {
+ !s.spec->data_->comment &&
+ !s.spec->data_->immutable) {
pb_step->set_type(AlterTableRequestPB::RENAME_COLUMN);
pb_step->mutable_rename_column()->set_old_name(s.spec->data_->name);
pb_step->mutable_rename_column()->set_new_name(*(s.spec->data_->rename_to));
diff --git a/src/kudu/client/write_op.cc b/src/kudu/client/write_op.cc
index ca472057f..24b760f6c 100644
--- a/src/kudu/client/write_op.cc
+++ b/src/kudu/client/write_op.cc
@@ -18,16 +18,17 @@
#include "kudu/client/write_op.h"
#include <ostream>
+#include <type_traits>
#include <glog/logging.h>
-#include "kudu/client/client.h"
+#include "kudu/client/client.h" // IWYU pragma: keep
#include "kudu/client/schema.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/row.h"
+#include "kudu/common/row_operations.pb.h"
#include "kudu/common/schema.h"
#include "kudu/common/types.h"
-#include "kudu/common/wire_protocol.pb.h"
#include "kudu/util/bitmap.h"
#include "kudu/util/slice.h"
@@ -46,6 +47,7 @@ RowOperationsPB_Type
ToInternalWriteType(KuduWriteOperation::Type type) {
case KuduWriteOperation::INSERT_IGNORE: return
RowOperationsPB_Type_INSERT_IGNORE;
case KuduWriteOperation::UPDATE_IGNORE: return
RowOperationsPB_Type_UPDATE_IGNORE;
case KuduWriteOperation::DELETE_IGNORE: return
RowOperationsPB_Type_DELETE_IGNORE;
+ case KuduWriteOperation::UPSERT_IGNORE: return
RowOperationsPB_Type_UPSERT_IGNORE;
default: LOG(FATAL) << "Unexpected write operation type: " << type;
}
}
@@ -144,6 +146,13 @@ KuduUpsert::KuduUpsert(const shared_ptr<KuduTable>& table)
KuduUpsert::~KuduUpsert() {}
+// UpsertIgnore
-----------------------------------------------------------------
+
+KuduUpsertIgnore::KuduUpsertIgnore(const shared_ptr<KuduTable>& table)
+ : KuduWriteOperation(table) {
+}
+
+KuduUpsertIgnore::~KuduUpsertIgnore() {}
} // namespace client
} // namespace kudu
diff --git a/src/kudu/client/write_op.h b/src/kudu/client/write_op.h
index a1e9e30a6..ea30d8a12 100644
--- a/src/kudu/client/write_op.h
+++ b/src/kudu/client/write_op.h
@@ -73,7 +73,8 @@ class KUDU_EXPORT KuduWriteOperation {
UPSERT = 4,
INSERT_IGNORE = 5,
UPDATE_IGNORE = 6,
- DELETE_IGNORE = 7
+ DELETE_IGNORE = 7,
+ UPSERT_IGNORE = 8
};
virtual ~KuduWriteOperation();
@@ -215,6 +216,33 @@ class KUDU_EXPORT KuduUpsert : public KuduWriteOperation {
};
+/// @brief A single row upsert ignore to be sent to the cluster, errors on
updating immutable
+/// cells are ignored.
+///
+/// See KuduUpsert for more details.
+class KUDU_EXPORT KuduUpsertIgnore : public KuduWriteOperation {
+ public:
+ ~KuduUpsertIgnore() override;
+
+ /// @copydoc KuduWriteOperation::ToString()
+ std::string ToString() const override { return "UPSERT IGNORE " +
row_.ToString(); }
+
+ protected:
+ /// @cond PROTECTED_MEMBERS_DOCUMENTED
+
+ /// @copydoc KuduWriteOperation::type()
+ Type type() const override {
+ return UPSERT_IGNORE;
+ }
+
+ /// @endcond
+
+ private:
+ friend class KuduTable;
+ explicit KuduUpsertIgnore(const sp::shared_ptr<KuduTable>& table);
+};
+
+
/// @brief A single row update to be sent to the cluster.
///
/// @pre An update requires the key columns and at least one other column
@@ -241,7 +269,8 @@ class KUDU_EXPORT KuduUpdate : public KuduWriteOperation {
explicit KuduUpdate(const sp::shared_ptr<KuduTable>& table);
};
-/// @brief A single row update ignore to be sent to the cluster, missing row
errors are ignored.
+/// @brief A single row update ignore to be sent to the cluster, missing row
errors and
+/// errors on updating immutable cells are ignored.
///
/// @pre An update ignore requires the key columns and at least one other
column
/// in the schema to be set in the embedded KuduPartialRow object.
diff --git a/src/kudu/common/row_operations.cc
b/src/kudu/common/row_operations.cc
index 20ac21444..1ed07cdeb 100644
--- a/src/kudu/common/row_operations.cc
+++ b/src/kudu/common/row_operations.cc
@@ -562,11 +562,15 @@ Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const
ClientServerMapping& m
RowChangeListEncoder rcl_encoder(&buf);
// Now process the rest of columns as updates.
+ DCHECK_EQ(client_schema_->num_key_columns(), client_col_idx);
for (; client_col_idx < client_schema_->num_columns(); client_col_idx++) {
size_t tablet_col_idx = GetTabletColIdx(mapping, client_col_idx);
const ColumnSchema& col = tablet_schema_->column(tablet_col_idx);
if (BitmapTest(client_isset_map, client_col_idx)) {
+ bool client_set_to_null = client_schema_->has_nullables() &&
+ BitmapTest(client_null_map, client_col_idx);
+
if (col.is_immutable()) {
if (op->type == RowOperationsPB::UPDATE) {
op->SetFailureStatusOnce(
@@ -575,12 +579,13 @@ Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const
ClientServerMapping& m
DCHECK_EQ(RowOperationsPB::UPDATE_IGNORE, op->type);
op->error_ignored = true;
}
- RETURN_NOT_OK(ReadColumnAndDiscard(col));
+ if (!client_set_to_null) {
+ RETURN_NOT_OK(ReadColumnAndDiscard(col));
+ }
// Use 'continue' not 'break' to consume the rest row data.
continue;
}
- bool client_set_to_null = client_schema_->has_nullables() &&
- BitmapTest(client_null_map, client_col_idx);
+
uint8_t scratch[kLargestTypeSize];
uint8_t* val_to_add = nullptr;
if (!client_set_to_null) {
@@ -593,6 +598,7 @@ Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const
ClientServerMapping& m
RETURN_NOT_OK(ReadColumnAndDiscard(col));
continue;
}
+
rcl_encoder.AddColumnUpdate(col,
tablet_schema_->column_id(tablet_col_idx), val_to_add);
}
}
diff --git a/src/kudu/integration-tests/alter_table-test.cc
b/src/kudu/integration-tests/alter_table-test.cc
index f1c2d0b62..9e05280a1 100644
--- a/src/kudu/integration-tests/alter_table-test.cc
+++ b/src/kudu/integration-tests/alter_table-test.cc
@@ -26,6 +26,7 @@
#include <ostream>
#include <string>
#include <thread>
+#include <type_traits>
#include <unordered_map>
#include <utility>
#include <vector>
@@ -2545,4 +2546,45 @@ TEST_F(AlterTableTest, TestDisableCompactAlter) {
ASSERT_TRUE(s.IsInvalidArgument());
}
+TEST_F(AlterTableTest, AddAndRemoveImmutableAttribute) {
+ InsertRows(0, 1);
+ ASSERT_OK(tablet_replica_->tablet()->Flush());
+
+ vector<string> rows;
+ ScanToStrings(&rows);
+ ASSERT_EQ(1, rows.size());
+ EXPECT_EQ("(int32 c0=0, int32 c1=0)", rows[0]);
+
+ {
+ // Add immutable attribute to a column.
+ unique_ptr<KuduTableAlterer>
table_alterer(client_->NewTableAlterer(kTableName));
+ table_alterer->AlterColumn("c1")->Immutable();
+ ASSERT_OK(table_alterer->Alter());
+ }
+
+ InsertRows(1, 1);
+ ASSERT_OK(tablet_replica_->tablet()->Flush());
+
+ ScanToStrings(&rows);
+ ASSERT_EQ(2, rows.size());
+ EXPECT_EQ("(int32 c0=0, int32 c1=0)", rows[0]);
+ EXPECT_EQ("(int32 c0=16777216, int32 c1=1)", rows[1]);
+
+ {
+ // Remove immutable attribute from a column.
+ unique_ptr<KuduTableAlterer>
table_alterer(client_->NewTableAlterer(kTableName));
+ table_alterer->AlterColumn("c1")->Mutable();
+ ASSERT_OK(table_alterer->Alter());
+ }
+
+ InsertRows(2, 1);
+ ASSERT_OK(tablet_replica_->tablet()->Flush());
+
+ ScanToStrings(&rows);
+ ASSERT_EQ(3, rows.size());
+ EXPECT_EQ("(int32 c0=0, int32 c1=0)", rows[0]);
+ EXPECT_EQ("(int32 c0=16777216, int32 c1=1)", rows[1]);
+ EXPECT_EQ("(int32 c0=33554432, int32 c1=2)", rows[2]);
+}
+
} // namespace kudu
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index 6a7cd9f16..d4e9699ea 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -1169,6 +1169,8 @@ enum MasterFeatures {
RANGE_SPECIFIC_HASH_SCHEMA = 8;
// Similar to IGNORE_OPERATIONS, but this is for UPSERT_IGNORE specifically.
UPSERT_IGNORE = 9;
+ // Whether master supports immutable attribute on column schema.
+ IMMUTABLE_COLUMN_ATTRIBUTE = 10;
}
service MasterService {
diff --git a/src/kudu/master/master_service.cc
b/src/kudu/master/master_service.cc
index 9db93a038..6e2cfcf19 100644
--- a/src/kudu/master/master_service.cc
+++ b/src/kudu/master/master_service.cc
@@ -113,6 +113,11 @@ DEFINE_bool(master_support_upsert_ignore_operations, true,
TAG_FLAG(master_support_upsert_ignore_operations, hidden);
TAG_FLAG(master_support_upsert_ignore_operations, runtime);
+DEFINE_bool(master_support_immutable_column_attribute, true,
+ "Whether the cluster supports immutable attribute on column
schema.");
+TAG_FLAG(master_support_immutable_column_attribute, hidden);
+TAG_FLAG(master_support_immutable_column_attribute, runtime);
+
using google::protobuf::Message;
using kudu::consensus::ReplicaManagementInfoPB;
@@ -918,6 +923,8 @@ bool MasterServiceImpl::SupportsFeature(uint32_t feature)
const {
return FLAGS_enable_per_range_hash_schemas;
case MasterFeatures::UPSERT_IGNORE:
return FLAGS_master_support_upsert_ignore_operations;
+ case MasterFeatures::IMMUTABLE_COLUMN_ATTRIBUTE:
+ return FLAGS_master_support_immutable_column_attribute;
default:
return false;
}
diff --git a/src/kudu/tablet/ops/write_op.cc b/src/kudu/tablet/ops/write_op.cc
index 7ee5af815..e9e85e5fc 100644
--- a/src/kudu/tablet/ops/write_op.cc
+++ b/src/kudu/tablet/ops/write_op.cc
@@ -509,7 +509,11 @@ void WriteOpState::UpdateMetricsForOp(const RowOp& op) {
case RowOperationsPB::UPSERT_IGNORE:
if (op.error_ignored) {
op_metrics_.upsert_ignore_errors++;
- } else {
+ }
+ // This op may be completed even if it's error_ignored. It make sense
+ // when attempting to update immutable cells, the rest of cells may be
updated
+ // except the immutable cells.
+ if (!op.failed) {
op_metrics_.successful_upserts++;
}
break;
@@ -520,7 +524,11 @@ void WriteOpState::UpdateMetricsForOp(const RowOp& op) {
case RowOperationsPB::UPDATE_IGNORE:
if (op.error_ignored) {
op_metrics_.update_ignore_errors++;
- } else {
+ }
+ // This op may be completed even if it's error_ignored. It make sense
+ // when attempting to update immutable cells, the rest of cells may be
updated
+ // except the immutable cells.
+ if (!op.failed) {
op_metrics_.successful_updates++;
}
break;
diff --git a/src/kudu/tablet/row_op.cc b/src/kudu/tablet/row_op.cc
index f112f5ba5..11cd5abe1 100644
--- a/src/kudu/tablet/row_op.cc
+++ b/src/kudu/tablet/row_op.cc
@@ -48,6 +48,7 @@ void RowOp::SetFailed(const Status& s) {
DCHECK(!result) << SecureDebugString(*result);
result =
google::protobuf::Arena::CreateMessage<OperationResultPB>(pb_arena_);
StatusToPB(s, result->mutable_failed_status());
+ failed = true;
}
void RowOp::SetInsertSucceeded(int mrs_id) {
@@ -68,6 +69,7 @@ void RowOp::SetErrorIgnored() {
DCHECK(!result) << SecureDebugString(*result);
result =
google::protobuf::Arena::CreateMessage<OperationResultPB>(pb_arena_);
error_ignored = true;
+ failed = true;
}
void RowOp::SetMutateSucceeded(OperationResultPB* result) {
diff --git a/src/kudu/tablet/row_op.h b/src/kudu/tablet/row_op.h
index 5592eaedf..285cc8e8f 100644
--- a/src/kudu/tablet/row_op.h
+++ b/src/kudu/tablet/row_op.h
@@ -102,6 +102,9 @@ struct RowOp {
// True if an ignore op was ignored due to an error.
bool error_ignored = false;
+ // True if this op has any error occured and failed to compelte this op.
+ bool failed = false;
+
// The RowSet in which this op's key has been found present and alive.
// This will be null if 'checked_present' is false, or if it has been
// checked and found not to be alive in any RowSet.
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index f4e5340d4..592c7c868 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -738,18 +738,8 @@ Status Tablet::InsertOrUpsertUnlocked(const IOContext*
io_context,
if (op->present_in_rowset) {
switch (op_type) {
case RowOperationsPB::UPSERT:
- case RowOperationsPB::UPSERT_IGNORE: {
- Status s = ApplyUpsertAsUpdate(io_context, op_state, op,
op->present_in_rowset, stats);
- if (s.ok()) {
- return Status::OK();
- }
- if (s.IsImmutable() && op_type == RowOperationsPB::UPSERT_IGNORE) {
- op->SetErrorIgnored();
- return Status::OK();
- }
- op->SetFailed(s);
- return s;
- }
+ case RowOperationsPB::UPSERT_IGNORE:
+ return ApplyUpsertAsUpdate(io_context, op_state, op,
op->present_in_rowset, stats);
case RowOperationsPB::INSERT_IGNORE:
op->SetErrorIgnored();
return Status::OK();
@@ -820,18 +810,8 @@ Status Tablet::InsertOrUpsertUnlocked(const IOContext*
io_context,
if (s.IsAlreadyPresent()) {
switch (op_type) {
case RowOperationsPB::UPSERT:
- case RowOperationsPB::UPSERT_IGNORE: {
- Status s = ApplyUpsertAsUpdate(io_context, op_state, op,
comps->memrowset.get(), stats);
- if (s.ok()) {
- return Status::OK();
- }
- if (s.IsImmutable() && op_type == RowOperationsPB::UPSERT_IGNORE) {
- op->SetErrorIgnored();
- return Status::OK();
- }
- op->SetFailed(s);
- return s;
- }
+ case RowOperationsPB::UPSERT_IGNORE:
+ return ApplyUpsertAsUpdate(io_context, op_state, op,
comps->memrowset.get(), stats);
case RowOperationsPB::INSERT_IGNORE:
op->SetErrorIgnored();
return Status::OK();
@@ -855,7 +835,6 @@ Status Tablet::ApplyUpsertAsUpdate(const IOContext*
io_context,
RowSet* rowset,
ProbeStats* stats) {
const auto op_type = upsert->decoded_op.type;
- bool error_ignored = false;
const auto* schema = this->schema().get();
ConstContiguousRow row(schema, upsert->decoded_op.row_data);
faststring buf;
@@ -868,10 +847,16 @@ Status Tablet::ApplyUpsertAsUpdate(const IOContext*
io_context,
const auto& c = schema->column(i);
if (c.is_immutable()) {
if (op_type == RowOperationsPB::UPSERT) {
- return Status::Immutable("UPDATE not allowed for immutable column",
c.ToString());
+ Status s = Status::Immutable("UPDATE not allowed for immutable
column", c.ToString());
+ upsert->SetFailed(s);
+ return s;
}
DCHECK_EQ(op_type, RowOperationsPB::UPSERT_IGNORE);
- error_ignored = true;
+ // Only set upsert->error_ignored flag instead of calling
SetErrorIgnored() to avoid setting
+ // upsert->result which can be set only once. Then the upsert operation
can be continued to
+ // mutate the other cells even if the current cell has been skipped, the
upsert->result can be
+ // set normally in the next steps.
+ upsert->error_ignored = true;
continue;
}
@@ -887,9 +872,6 @@ Status Tablet::ApplyUpsertAsUpdate(const IOContext*
io_context,
op_state->pb_arena());
if (enc.is_empty()) {
upsert->SetMutateSucceeded(result);
- if (error_ignored) {
- upsert->error_ignored = true;
- }
return Status::OK();
}
diff --git a/src/kudu/tablet/tablet_metrics.cc
b/src/kudu/tablet/tablet_metrics.cc
index fd6a7c7bb..b9c1e9c89 100644
--- a/src/kudu/tablet/tablet_metrics.cc
+++ b/src/kudu/tablet/tablet_metrics.cc
@@ -45,9 +45,10 @@ METRIC_DEFINE_counter(tablet, upsert_ignore_errors, "Upsert
Ignore Errors",
"Number of upsert ignore operations for this tablet
which were "
"ignored due to an error since service start. This
metric counts "
"the number of attempts to update a present row by
changing the "
- "value of any of its immutable cells. Note that the
rest of the "
+ "value of any of its immutable cells. Note that the rest
of the "
"cells (i.e. the mutable ones) in such case are updated
accordingly "
- "to the operation's data.",
+ "to the operation's data,and rows_upserted will be
counted too if "
+ "upsert successfully.",
kudu::MetricLevel::kDebug);
METRIC_DEFINE_counter(tablet, rows_updated, "Rows Updated",
kudu::MetricUnit::kRows,
@@ -56,7 +57,11 @@ METRIC_DEFINE_counter(tablet, rows_updated, "Rows Updated",
METRIC_DEFINE_counter(tablet, update_ignore_errors, "Update Ignore Errors",
kudu::MetricUnit::kRows,
"Number of update ignore operations for this tablet
which were "
- "ignored due to an error since service start",
+ "ignored due to an error since service start. Note that
when "
+ "ignoring to update the immutable cells, the rest of the
cells "
+ "(i.e. the mutable ones) in such case are updated
accordingly to "
+ "the operation's data,and rows_updated will be counted
too if "
+ "update successfully.",
kudu::MetricLevel::kDebug);
METRIC_DEFINE_counter(tablet, rows_deleted, "Rows Deleted",
kudu::MetricUnit::kRows,