This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new ec3a9f75b KUDU-3353 [schema] Add an immutable attribute on column 
schema (part 2)
ec3a9f75b is described below

commit ec3a9f75b6924a70ecbf08e3805228ad9b92b9f0
Author: Yingchun Lai <[email protected]>
AuthorDate: Thu Aug 11 15:23:08 2022 +0800

    KUDU-3353 [schema] Add an immutable attribute on column schema (part 2)
    
    This is a follow-up to b6eedb224f715ad86378a92d25f09c2084b0e2b7.
    This patch includes the C++ client-side changes of the "new column
    attribute IMMUTABLE" feature, including:
    1. Adds a new KuduColumnSpec::Immutable() method to add IMMUTABLE
       attribute to a column, and adds a new KuduColumnSpec::Mutable()
       method to remove IMMUTABLE attribute from a column.
    2. Adds a new KuduColumnSchema::is_immutable() to check if the
       attribute is set for a column schema.
    3. Adds a  new KuduUpsertIgnore operation in the client API: use the
       newly added KuduTable::NewUpsertIgnore() to create a new instance
       of such operation.
    4. Adds unit tests to cover the newly introduced functionality.
    5. Small refactoring on the server side.
    
    Change-Id: Id301e313eedd9cc3a494d6b4f3c14fbabe63b436
    Reviewed-on: http://gerrit.cloudera.org:8080/18835
    Tested-by: Yingchun Lai <[email protected]>
    Reviewed-by: Alexey Serbin <[email protected]>
---
 src/kudu/client/client-internal.cc             |  13 +-
 src/kudu/client/client-internal.h              |   6 +-
 src/kudu/client/client-test.cc                 | 560 ++++++++++++++++++++++---
 src/kudu/client/client-unittest.cc             |   2 +-
 src/kudu/client/client.cc                      |  28 +-
 src/kudu/client/client.h                       |   6 +
 src/kudu/client/schema-internal.h              |   1 +
 src/kudu/client/schema.cc                      |  22 +-
 src/kudu/client/schema.h                       |  14 +
 src/kudu/client/table_alterer-internal.cc      |   6 +-
 src/kudu/client/write_op.cc                    |  13 +-
 src/kudu/client/write_op.h                     |  33 +-
 src/kudu/common/row_operations.cc              |  12 +-
 src/kudu/integration-tests/alter_table-test.cc |  42 ++
 src/kudu/master/master.proto                   |   2 +
 src/kudu/master/master_service.cc              |   7 +
 src/kudu/tablet/ops/write_op.cc                |  12 +-
 src/kudu/tablet/row_op.cc                      |   2 +
 src/kudu/tablet/row_op.h                       |   3 +
 src/kudu/tablet/tablet.cc                      |  42 +-
 src/kudu/tablet/tablet_metrics.cc              |  11 +-
 21 files changed, 725 insertions(+), 112 deletions(-)

diff --git a/src/kudu/client/client-internal.cc 
b/src/kudu/client/client-internal.cc
index 321576ad2..63b5be4d3 100644
--- a/src/kudu/client/client-internal.cc
+++ b/src/kudu/client/client-internal.cc
@@ -313,13 +313,13 @@ Status KuduClient::Data::GetTabletServer(KuduClient* 
client,
   return Status::OK();
 }
 
-// TODO(yingchun): Add has_immutable_column_schema
 Status KuduClient::Data::CreateTable(KuduClient* client,
                                      const CreateTableRequestPB& req,
                                      CreateTableResponsePB* resp,
                                      const MonoTime& deadline,
                                      bool has_range_partition_bounds,
-                                     bool has_range_specific_hash_schema) {
+                                     bool has_range_specific_hash_schema,
+                                     bool has_immutable_column_schema) {
   vector<uint32_t> features;
   if (has_range_partition_bounds) {
     features.push_back(MasterFeatures::RANGE_PARTITION_BOUNDS);
@@ -327,6 +327,9 @@ Status KuduClient::Data::CreateTable(KuduClient* client,
   if (has_range_specific_hash_schema) {
     features.push_back(MasterFeatures::RANGE_SPECIFIC_HASH_SCHEMA);
   }
+  if (has_immutable_column_schema) {
+    features.push_back(MasterFeatures::IMMUTABLE_COLUMN_ATTRIBUTE);
+  }
   Synchronizer sync;
   AsyncLeaderMasterRpc<CreateTableRequestPB, CreateTableResponsePB> rpc(
       deadline, client, BackoffType::EXPONENTIAL, req, resp,
@@ -416,7 +419,8 @@ Status KuduClient::Data::AlterTable(KuduClient* client,
                                     AlterTableResponsePB* resp,
                                     const MonoTime& deadline,
                                     bool has_add_drop_partition,
-                                    bool adding_range_with_custom_hash_schema) 
{
+                                    bool adding_range_with_custom_hash_schema,
+                                    bool has_immutable_column_schema) {
   vector<uint32_t> required_feature_flags;
   if (has_add_drop_partition) {
     
required_feature_flags.push_back(MasterFeatures::ADD_DROP_RANGE_PARTITIONS);
@@ -424,6 +428,9 @@ Status KuduClient::Data::AlterTable(KuduClient* client,
   if (adding_range_with_custom_hash_schema) {
     
required_feature_flags.push_back(MasterFeatures::RANGE_SPECIFIC_HASH_SCHEMA);
   }
+  if (has_immutable_column_schema) {
+    
required_feature_flags.push_back(MasterFeatures::IMMUTABLE_COLUMN_ATTRIBUTE);
+  }
   Synchronizer sync;
   AsyncLeaderMasterRpc<AlterTableRequestPB, AlterTableResponsePB> rpc(
       deadline, client, BackoffType::EXPONENTIAL, req, resp,
diff --git a/src/kudu/client/client-internal.h 
b/src/kudu/client/client-internal.h
index 3e820f91f..7d70b5056 100644
--- a/src/kudu/client/client-internal.h
+++ b/src/kudu/client/client-internal.h
@@ -108,7 +108,8 @@ class KuduClient::Data {
                             master::CreateTableResponsePB* resp,
                             const MonoTime& deadline,
                             bool has_range_partition_bounds,
-                            bool has_range_specific_hash_schema);
+                            bool has_range_specific_hash_schema,
+                            bool has_immutable_column_schema);
 
   static Status IsCreateTableInProgress(KuduClient* client,
                                         master::TableIdentifierPB table,
@@ -135,7 +136,8 @@ class KuduClient::Data {
                            master::AlterTableResponsePB* resp,
                            const MonoTime& deadline,
                            bool has_add_drop_partition,
-                           bool adding_range_with_custom_hash_schema);
+                           bool adding_range_with_custom_hash_schema,
+                           bool has_immutable_column_schema);
 
   static Status IsAlterTableInProgress(KuduClient* client,
                                        master::TableIdentifierPB table,
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 7b59cbe57..d97d87ebe 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -34,6 +34,7 @@
 #include <set>
 #include <string>
 #include <thread>
+#include <tuple>
 #include <type_traits>
 #include <unordered_set>
 #include <utility>
@@ -146,6 +147,7 @@ DECLARE_bool(location_mapping_by_uuid);
 DECLARE_bool(log_inject_latency);
 DECLARE_bool(master_client_location_assignment_enabled);
 DECLARE_bool(master_support_connect_to_master_rpc);
+DECLARE_bool(master_support_immutable_column_attribute);
 DECLARE_bool(mock_table_metrics_for_testing);
 DECLARE_bool(rpc_listen_on_unix_domain_socket);
 DECLARE_bool(rpc_trace_negotiation);
@@ -235,18 +237,19 @@ namespace client {
 
 class ClientTest : public KuduTest {
  public:
-  ClientTest() {
+  virtual Status BuildSchema() {
     KuduSchemaBuilder b;
     b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
     b.AddColumn("int_val")->Type(KuduColumnSchema::INT32)->NotNull();
     b.AddColumn("string_val")->Type(KuduColumnSchema::STRING)->Nullable();
     
b.AddColumn("non_null_with_default")->Type(KuduColumnSchema::INT32)->NotNull()
-      ->Default(KuduValue::FromInt(12345));
-    CHECK_OK(b.Build(&schema_));
+        ->Default(KuduValue::FromInt(12345));
+    return b.Build(&schema_);
   }
 
   void SetUp() override {
     KuduTest::SetUp();
+    ASSERT_OK(BuildSchema());
 
     // Reduce the TS<->Master heartbeat interval
     FLAGS_heartbeat_interval_ms = 10;
@@ -285,7 +288,7 @@ class ClientTest : public KuduTest {
 
   // Looks up the remote tablet entry for a given partition key in the meta 
cache.
   scoped_refptr<internal::RemoteTablet> MetaCacheLookup(
-      KuduTable* table, const PartitionKey& partition_key) {
+      KuduTable* table, const PartitionKey& partition_key) const {
     scoped_refptr<internal::RemoteTablet> rt;
     Synchronizer sync;
     client_->data_->meta_cache_->LookupTabletByKey(
@@ -298,7 +301,7 @@ class ClientTest : public KuduTest {
   }
 
   Status MetaCacheLookupById(
-      const string& tablet_id, scoped_refptr<internal::RemoteTablet>* 
remote_tablet) {
+      const string& tablet_id, scoped_refptr<internal::RemoteTablet>* 
remote_tablet) const {
     remote_tablet->reset();
     scoped_refptr<internal::RemoteTablet> rt;
     Synchronizer sync;
@@ -311,7 +314,7 @@ class ClientTest : public KuduTest {
   }
 
   // Generate a set of split rows for tablets used in this test.
-  vector<unique_ptr<KuduPartialRow>> GenerateSplitRows() {
+  vector<unique_ptr<KuduPartialRow>> GenerateSplitRows() const {
     vector<unique_ptr<KuduPartialRow>> rows;
     unique_ptr<KuduPartialRow> row(schema_.NewRow());
     CHECK_OK(row->SetInt32(0, 9));
@@ -322,7 +325,7 @@ class ClientTest : public KuduTest {
   // Count the rows of a table, checking that the operation succeeds.
   //
   // Must be public to use as a thread closure.
-  void CheckRowCount(KuduTable* table, int expected) {
+  void CheckRowCount(KuduTable* table, int expected) const {
     CHECK_EQ(CountRowsFromClient(table), expected);
   }
 
@@ -372,7 +375,7 @@ class ClientTest : public KuduTest {
   // derived classes to test client location assignment.
   virtual void SetLocationMappingCmd() {}
 
-  string GetFirstTabletId(KuduTable* table) {
+  string GetFirstTabletId(KuduTable* table) const {
     GetTableLocationsRequestPB req;
     GetTableLocationsResponsePB resp;
     req.mutable_table()->set_table_name(table->name());
@@ -385,7 +388,7 @@ class ClientTest : public KuduTest {
     return resp.tablet_locations(0).tablet_id();
   }
 
-  void CheckNoRpcOverflow() {
+  void CheckNoRpcOverflow() const {
     for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
       MiniTabletServer* server = cluster_->mini_tablet_server(i);
       if (server->is_started()) {
@@ -411,8 +414,8 @@ class ClientTest : public KuduTest {
 
   // Inserts given number of tests rows into the specified table
   // in the context of the session.
-  static void InsertTestRows(KuduTable* table, KuduSession* session,
-                             int num_rows, int first_row = 0) {
+  void InsertTestRows(KuduTable* table, KuduSession* session,
+                      int num_rows, int first_row = 0) const {
     for (int i = first_row; i < num_rows + first_row; ++i) {
       unique_ptr<KuduInsert> insert(BuildTestInsert(table, i));
       ASSERT_OK(session->Apply(insert.release()));
@@ -421,7 +424,7 @@ class ClientTest : public KuduTest {
 
   // Inserts 'num_rows' test rows using 'client'
   void InsertTestRows(KuduClient* client, KuduTable* table,
-                      int num_rows, int first_row = 0) {
+                      int num_rows, int first_row = 0) const {
     shared_ptr<KuduSession> session = client->NewSession();
     ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
     session->SetTimeoutMillis(60000);
@@ -431,11 +434,11 @@ class ClientTest : public KuduTest {
   }
 
   // Inserts 'num_rows' using the default client.
-  void InsertTestRows(KuduTable* table, int num_rows, int first_row = 0) {
+  void InsertTestRows(KuduTable* table, int num_rows, int first_row = 0) const 
{
     InsertTestRows(client_.get(), table, num_rows, first_row);
   }
 
-  void UpdateTestRows(KuduTable* table, int lo, int hi) {
+  void UpdateTestRows(KuduTable* table, int lo, int hi) const {
     shared_ptr<KuduSession> session = client_->NewSession();
     ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
     session->SetTimeoutMillis(10000);
@@ -447,7 +450,7 @@ class ClientTest : public KuduTest {
     NO_FATALS(CheckNoRpcOverflow());
   }
 
-  void DeleteTestRows(KuduTable* table, int lo, int hi) {
+  void DeleteTestRows(KuduTable* table, int lo, int hi) const {
     shared_ptr<KuduSession> session = client_->NewSession();
     ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
     session->SetTimeoutMillis(10000);
@@ -459,27 +462,41 @@ class ClientTest : public KuduTest {
     NO_FATALS(CheckNoRpcOverflow());
   }
 
-  static unique_ptr<KuduInsert> BuildTestInsert(KuduTable* table, int index) {
+  unique_ptr<KuduInsert> BuildTestInsert(KuduTable* table, int index) const {
     unique_ptr<KuduInsert> insert(table->NewInsert());
     KuduPartialRow* row = insert->mutable_row();
     PopulateDefaultRow(row, index);
     return insert;
   }
 
-  static unique_ptr<KuduInsertIgnore> BuildTestInsertIgnore(KuduTable* table, 
int index) {
+  unique_ptr<KuduInsertIgnore> BuildTestInsertIgnore(KuduTable* table, int 
index) const {
     unique_ptr<KuduInsertIgnore> insert_ignore(table->NewInsertIgnore());
     KuduPartialRow* row = insert_ignore->mutable_row();
     PopulateDefaultRow(row, index);
     return insert_ignore;
   }
 
-  static unique_ptr<KuduUpdateIgnore> BuildTestUpdateIgnore(KuduTable* table, 
int index) {
+  unique_ptr<KuduUpdate> BuildTestUpdate(KuduTable* table, int index) const {
+    unique_ptr<KuduUpdate> update(table->NewUpdate());
+    KuduPartialRow* row = update->mutable_row();
+    PopulateDefaultRow(row, index);
+    return update;
+  }
+
+  unique_ptr<KuduUpdateIgnore> BuildTestUpdateIgnore(KuduTable* table, int 
index) const {
     unique_ptr<KuduUpdateIgnore> update_ignore(table->NewUpdateIgnore());
     KuduPartialRow* row = update_ignore->mutable_row();
     PopulateDefaultRow(row, index);
     return update_ignore;
   }
 
+  unique_ptr<KuduUpsertIgnore> BuildTestUpsertIgnore(KuduTable* table, int 
index) const {
+    unique_ptr<KuduUpsertIgnore> upsert_ignore(table->NewUpsertIgnore());
+    KuduPartialRow* row = upsert_ignore->mutable_row();
+    PopulateDefaultRow(row, index);
+    return upsert_ignore;
+  }
+
   static unique_ptr<KuduDeleteIgnore> BuildTestDeleteIgnore(KuduTable* table, 
int index) {
     unique_ptr<KuduDeleteIgnore> delete_ignore(table->NewDeleteIgnore());
     KuduPartialRow* row = delete_ignore->mutable_row();
@@ -487,13 +504,26 @@ class ClientTest : public KuduTest {
     return delete_ignore;
   }
 
-  static void PopulateDefaultRow(KuduPartialRow* row, int index) {
+  virtual void PopulateDefaultRow(KuduPartialRow* row, int index) const {
     CHECK_OK(row->SetInt32(0, index));
     CHECK_OK(row->SetInt32(1, index * 2));
     CHECK_OK(row->SetStringCopy(2, Slice(StringPrintf("hello %d", index))));
     CHECK_OK(row->SetInt32(3, index * 3));
   }
 
+  virtual void DoTestVerifyRows(const shared_ptr<KuduTable>& tbl, int 
num_rows) const {
+    vector<string> rows;
+    KuduScanner scanner(tbl.get());
+    ASSERT_OK(ScanToStrings(&scanner, &rows));
+    ASSERT_EQ(num_rows, rows.size());
+    for (int i = 0; i < num_rows; i++) {
+      int key = i + 1;
+      ASSERT_EQ(StringPrintf("(int32 key=%d, int32 int_val=%d, string 
string_val=\"hello %d\", "
+                             "int32 non_null_with_default=%d)", key, key*2, 
key, key*3),
+                rows[i]);
+    }
+  }
+
   static unique_ptr<KuduUpdate> UpdateTestRow(KuduTable* table, int index) {
     unique_ptr<KuduUpdate> update(table->NewUpdate());
     KuduPartialRow* row = update->mutable_row();
@@ -668,14 +698,14 @@ class ClientTest : public KuduTest {
     }
   }
 
-  int CountRowsFromClient(KuduTable* table) {
+  int CountRowsFromClient(KuduTable* table) const {
     return CountRowsFromClient(table, KuduScanner::READ_LATEST);
   }
 
   int CountRowsFromClient(KuduTable* table,
                           KuduScanner::ReadMode scan_mode,
                           int32_t lower_bound = kNoBound,
-                          int32_t upper_bound = kNoBound) {
+                          int32_t upper_bound = kNoBound) const {
     return CountRowsFromClient(table, KuduClient::LEADER_ONLY, scan_mode,
                                lower_bound, upper_bound);
   }
@@ -684,7 +714,7 @@ class ClientTest : public KuduTest {
                           KuduClient::ReplicaSelection selection,
                           KuduScanner::ReadMode scan_mode,
                           int32_t lower_bound = kNoBound,
-                          int32_t upper_bound = kNoBound) {
+                          int32_t upper_bound = kNoBound) const {
     KuduScanner scanner(table);
     CHECK_OK(scanner.SetSelection(selection));
     CHECK_OK(scanner.SetProjectedColumnNames({}));
@@ -2933,22 +2963,11 @@ TEST_F(ClientTest, TestInsertSingleRowManualBatch) {
   FlushSessionOrDie(session);
 }
 
-static void DoTestVerifyRows(const shared_ptr<KuduTable>& tbl, int num_rows) {
-  vector<string> rows;
-  KuduScanner scanner(tbl.get());
-  ASSERT_OK(ScanToStrings(&scanner, &rows));
-  ASSERT_EQ(num_rows, rows.size());
-  for (int i = 0; i < num_rows; i++) {
-    int key = i + 1;
-    ASSERT_EQ(StringPrintf("(int32 key=%d, int32 int_val=%d, string 
string_val=\"hello %d\", "
-        "int32 non_null_with_default=%d)", key, key*2, key, key*3), rows[i]);
-  }
-}
-
 static void DoVerifyMetrics(const KuduSession* session,
                             int64_t successful_inserts,
                             int64_t insert_ignore_errors,
                             int64_t successful_upserts,
+                            int64_t upsert_ignore_errors,
                             int64_t successful_updates,
                             int64_t update_ignore_errors,
                             int64_t successful_deletes,
@@ -2957,7 +2976,7 @@ static void DoVerifyMetrics(const KuduSession* session,
   ASSERT_EQ(successful_inserts, metrics["successful_inserts"]);
   ASSERT_EQ(insert_ignore_errors, metrics["insert_ignore_errors"]);
   ASSERT_EQ(successful_upserts, metrics["successful_upserts"]);
-  // TODO(yingchun): should test upsert_ignore_errors
+  ASSERT_EQ(upsert_ignore_errors, metrics["upsert_ignore_errors"]);
   ASSERT_EQ(successful_updates, metrics["successful_updates"]);
   ASSERT_EQ(update_ignore_errors, metrics["update_ignore_errors"]);
   ASSERT_EQ(successful_deletes, metrics["successful_deletes"]);
@@ -2972,16 +2991,16 @@ TEST_F(ClientTest, TestInsertIgnore) {
   {
     unique_ptr<KuduInsert> insert(BuildTestInsert(client_table_.get(), 1));
     ASSERT_OK(session->Apply(insert.release()));
-    DoTestVerifyRows(client_table_, 1);
-    DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 0, 0);
+    NO_FATALS(DoTestVerifyRows(client_table_, 1));
+    NO_FATALS(DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 0, 0, 0));
   }
 
   {
     // INSERT IGNORE results in no error on duplicate primary key.
     unique_ptr<KuduInsertIgnore> 
insert_ignore(BuildTestInsertIgnore(client_table_.get(), 1));
     ASSERT_OK(session->Apply(insert_ignore.release()));
-    DoTestVerifyRows(client_table_, 1);
-    DoVerifyMetrics(session.get(), 1, 1, 0, 0, 0, 0, 0);
+    NO_FATALS(DoTestVerifyRows(client_table_, 1));
+    NO_FATALS(DoVerifyMetrics(session.get(), 1, 1, 0, 0, 0, 0, 0, 0));
   }
 
   {
@@ -2992,16 +3011,63 @@ TEST_F(ClientTest, TestInsertIgnore) {
     ASSERT_OK(insert_ignore->mutable_row()->SetStringCopy("string_val", "hello 
world"));
     ASSERT_OK(insert_ignore->mutable_row()->SetInt32("non_null_with_default", 
999));
     ASSERT_OK(session->Apply(insert_ignore.release())); // returns ok but 
results in no change
-    DoTestVerifyRows(client_table_, 1);
-    DoVerifyMetrics(session.get(), 1, 2, 0, 0, 0, 0, 0);
+    NO_FATALS(DoTestVerifyRows(client_table_, 1));
+    NO_FATALS(DoVerifyMetrics(session.get(), 1, 2, 0, 0, 0, 0, 0, 0));
   }
 
   {
     // INSERT IGNORE can insert new row.
     unique_ptr<KuduInsertIgnore> 
insert_ignore(BuildTestInsertIgnore(client_table_.get(), 2));
     ASSERT_OK(session->Apply(insert_ignore.release()));
-    DoTestVerifyRows(client_table_, 2);
-    DoVerifyMetrics(session.get(), 2, 2, 0, 0, 0, 0, 0);
+    NO_FATALS(DoTestVerifyRows(client_table_, 2));
+    NO_FATALS(DoVerifyMetrics(session.get(), 2, 2, 0, 0, 0, 0, 0, 0));
+  }
+}
+
+TEST_F(ClientTest, TestUpdate) {
+  shared_ptr<KuduSession> session = client_->NewSession();
+  session->SetTimeoutMillis(10000);
+  ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+
+  {
+    // UPDATE results in an error on missing primary key.
+    unique_ptr<KuduUpdate> update(BuildTestUpdate(client_table_.get(), 1));
+    Status s = session->Apply(update.release());
+    ASSERT_TRUE(s.IsIOError()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(),
+                        "failed to flush data: error details are available "
+                        "via KuduSession::GetPendingErrors()");
+    vector<KuduError*> errors;
+    ElementDeleter d(&errors);
+    bool overflow;
+    session->GetPendingErrors(&errors, &overflow);
+    NO_FATALS(DoTestVerifyRows(client_table_, 0));
+    NO_FATALS(DoVerifyMetrics(session.get(), 0, 0, 0, 0, 0, 0, 0, 0));
+  }
+
+  {
+    unique_ptr<KuduInsert> insert(BuildTestInsert(client_table_.get(), 1));
+    ASSERT_OK(session->Apply(insert.release()));
+    NO_FATALS(DoTestVerifyRows(client_table_, 1));
+    NO_FATALS(DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 0, 0, 0));  // 
successful_inserts++
+  }
+
+  {
+    // UPDATE can update row.
+    unique_ptr<KuduUpdate> update(client_table_->NewUpdate());
+    ASSERT_OK(update->mutable_row()->SetInt32("key", 1));
+    ASSERT_OK(update->mutable_row()->SetInt32("int_val", 999));
+    ASSERT_OK(update->mutable_row()->SetStringCopy("string_val", "hello 
world"));
+    ASSERT_OK(update->mutable_row()->SetInt32("non_null_with_default", 999));
+    ASSERT_OK(session->Apply(update.release()));
+    NO_FATALS(DoVerifyMetrics(session.get(), 1, 0, 0, 0, 1, 0, 0, 0));  // 
successful_updates++
+
+    vector<string> rows;
+    KuduScanner scanner(client_table_.get());
+    ASSERT_OK(ScanToStrings(&scanner, &rows));
+    ASSERT_EQ(1, rows.size());
+    ASSERT_EQ("(int32 key=1, int32 int_val=999, string string_val=\"hello 
world\", "
+        "int32 non_null_with_default=999)", rows[0]);
   }
 }
 
@@ -3014,15 +3080,15 @@ TEST_F(ClientTest, TestUpdateIgnore) {
     // UPDATE IGNORE results in no error on missing primary key.
     unique_ptr<KuduUpdateIgnore> 
update_ignore(BuildTestUpdateIgnore(client_table_.get(), 1));
     ASSERT_OK(session->Apply(update_ignore.release()));
-    DoTestVerifyRows(client_table_, 0);
-    DoVerifyMetrics(session.get(), 0, 0, 0, 0, 1, 0, 0);
+    NO_FATALS(DoTestVerifyRows(client_table_, 0));
+    NO_FATALS(DoVerifyMetrics(session.get(), 0, 0, 0, 0, 0, 1, 0, 0));
   }
 
   {
     unique_ptr<KuduInsert> insert(BuildTestInsert(client_table_.get(), 1));
     ASSERT_OK(session->Apply(insert.release()));
-    DoTestVerifyRows(client_table_, 1);
-    DoVerifyMetrics(session.get(), 1, 0, 0, 0, 1, 0, 0);
+    NO_FATALS(DoTestVerifyRows(client_table_, 1));
+    NO_FATALS(DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 1, 0, 0));
   }
 
   {
@@ -3033,7 +3099,7 @@ TEST_F(ClientTest, TestUpdateIgnore) {
     ASSERT_OK(update_ignore->mutable_row()->SetStringCopy("string_val", "hello 
world"));
     ASSERT_OK(update_ignore->mutable_row()->SetInt32("non_null_with_default", 
999));
     ASSERT_OK(session->Apply(update_ignore.release()));
-    DoVerifyMetrics(session.get(), 1, 0, 0, 1, 1, 0, 0);
+    NO_FATALS(DoVerifyMetrics(session.get(), 1, 0, 0, 0, 1, 1, 0, 0));
 
     vector<string> rows;
     KuduScanner scanner(client_table_.get());
@@ -3053,7 +3119,7 @@ TEST_F(ClientTest, TestDeleteIgnore) {
     unique_ptr<KuduInsert> insert(BuildTestInsert(client_table_.get(), 1));
     ASSERT_OK(session->Apply(insert.release()));
     DoTestVerifyRows(client_table_, 1);
-    DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 0, 0);
+    NO_FATALS(DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 0, 0, 0));
   }
 
   {
@@ -3061,7 +3127,7 @@ TEST_F(ClientTest, TestDeleteIgnore) {
     unique_ptr<KuduDeleteIgnore> 
delete_ignore(BuildTestDeleteIgnore(client_table_.get(), 1));
     ASSERT_OK(session->Apply(delete_ignore.release()));
     DoTestVerifyRows(client_table_, 0);
-    DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 1, 0);
+    NO_FATALS(DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 0, 1, 0));
   }
 
   {
@@ -3069,7 +3135,7 @@ TEST_F(ClientTest, TestDeleteIgnore) {
     unique_ptr<KuduDeleteIgnore> 
delete_ignore(BuildTestDeleteIgnore(client_table_.get(), 1));
     ASSERT_OK(session->Apply(delete_ignore.release()));
     DoTestVerifyRows(client_table_, 0);
-    DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 1, 1);
+    NO_FATALS(DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 0, 1, 1));
   }
 }
 
@@ -3122,11 +3188,22 @@ static Status ApplyUpsertToSession(KuduSession* session,
                                    const shared_ptr<KuduTable>& table,
                                    int row_key,
                                    int int_val,
-                                   const char* string_val) {
+                                   const char* string_val,
+                                   optional<int> imm_val = nullopt) {
   unique_ptr<KuduUpsert> upsert(table->NewUpsert());
   RETURN_NOT_OK(upsert->mutable_row()->SetInt32("key", row_key));
   RETURN_NOT_OK(upsert->mutable_row()->SetInt32("int_val", int_val));
   RETURN_NOT_OK(upsert->mutable_row()->SetStringCopy("string_val", 
string_val));
+  // Here we use the first column to initialize an object of KuduColumnSchema
+  // for there is no default constructor for it.
+  KuduColumnSchema col_schema = table->schema().Column(0);
+  if (table->schema().HasColumn("imm_val", &col_schema)) {
+    if (imm_val) {
+      RETURN_NOT_OK(upsert->mutable_row()->SetInt32("imm_val", *imm_val));
+    } else {
+      RETURN_NOT_OK(upsert->mutable_row()->SetNull("imm_val"));
+    }
+  }
   return session->Apply(upsert.release());
 }
 
@@ -4477,7 +4554,7 @@ TEST_F(ClientTest, TestUpsert) {
   // Perform and verify UPSERT which acts as an INSERT.
   ASSERT_OK(ApplyUpsertToSession(session.get(), client_table_, 1, 1, "original 
row"));
   FlushSessionOrDie(session);
-  DoVerifyMetrics(session.get(), 0, 0, 1, 0, 0, 0, 0);
+  NO_FATALS(DoVerifyMetrics(session.get(), 0, 0, 1, 0, 0, 0, 0, 0));
 
   {
     vector<string> rows;
@@ -4490,7 +4567,7 @@ TEST_F(ClientTest, TestUpsert) {
   // Perform and verify UPSERT which acts as an UPDATE.
   ASSERT_OK(ApplyUpsertToSession(session.get(), client_table_, 1, 2, "upserted 
row"));
   FlushSessionOrDie(session);
-  DoVerifyMetrics(session.get(), 0, 0, 2, 0, 0, 0, 0);
+  NO_FATALS(DoVerifyMetrics(session.get(), 0, 0, 2, 0, 0, 0, 0, 0));
 
   {
     vector<string> rows;
@@ -4509,7 +4586,7 @@ TEST_F(ClientTest, TestUpsert) {
     ASSERT_OK(row->SetInt32("non_null_with_default", 999));
     ASSERT_OK(session->Apply(update.release()));
     FlushSessionOrDie(session);
-    DoVerifyMetrics(session.get(), 0, 0, 2, 1, 0, 0, 0);
+    NO_FATALS(DoVerifyMetrics(session.get(), 0, 0, 2, 0, 1, 0, 0, 0));
   }
   {
     vector<string> rows;
@@ -4523,7 +4600,7 @@ TEST_F(ClientTest, TestUpsert) {
   // column, and therefore should not revert it back to its default.
   ASSERT_OK(ApplyUpsertToSession(session.get(), client_table_, 1, 3, "upserted 
row 2"));
   FlushSessionOrDie(session);
-  DoVerifyMetrics(session.get(), 0, 0, 3, 1, 0, 0, 0);
+  NO_FATALS(DoVerifyMetrics(session.get(), 0, 0, 3, 0, 1, 0, 0, 0));
   {
     vector<string> rows;
     ASSERT_OK(ScanTableToStrings(client_table_.get(), &rows));
@@ -4535,7 +4612,7 @@ TEST_F(ClientTest, TestUpsert) {
   // Delete the row.
   ASSERT_OK(ApplyDeleteToSession(session.get(), client_table_, 1));
   FlushSessionOrDie(session);
-  DoVerifyMetrics(session.get(), 0, 0, 3, 1, 0, 1, 0);
+  NO_FATALS(DoVerifyMetrics(session.get(), 0, 0, 3, 0, 1, 0, 1, 0));
   {
     vector<string> rows;
     ASSERT_OK(ScanTableToStrings(client_table_.get(), &rows));
@@ -8717,12 +8794,379 @@ TEST_F(ClientTest, TestProjectionPredicatesFuzz) {
             unordered_set<string>(rows.begin(), rows.end())) << rows;
 }
 
+class ClientTestImmutableColumn : public ClientTest,
+                                  public 
::testing::WithParamInterface<std::tuple<bool, bool>> {
+ public:
+  ClientTestImmutableColumn() {
+    init_immu_col_to_null_ = std::get<0>(GetParam());
+    update_immu_col_to_null_ = std::get<1>(GetParam());
+  }
+
+  Status BuildSchema() override {
+    KuduSchemaBuilder b;
+    b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
+    b.AddColumn("int_val")->Type(KuduColumnSchema::INT32)->NotNull();
+    b.AddColumn("string_val")->Type(KuduColumnSchema::STRING)->Nullable();
+    
b.AddColumn("non_null_with_default")->Type(KuduColumnSchema::INT32)->NotNull()
+        ->Default(KuduValue::FromInt(12345));
+    
b.AddColumn("imm_val")->Type(KuduColumnSchema::INT32)->Immutable()->Nullable();
+    return b.Build(&schema_);
+  }
+
+  void PopulateImmutableCell(KuduPartialRow* row, int index) const {
+    if (init_immu_col_to_null_) {
+      ASSERT_OK(row->SetNull(4));
+    } else {
+      ASSERT_OK(row->SetInt32(4, index * 4));
+    }
+  }
+
+  void PopulateDefaultRow(KuduPartialRow* row, int index) const override {
+    ClientTest::PopulateDefaultRow(row, index);
+    NO_FATALS(PopulateImmutableCell(row, index));
+  }
+
+  string ExpectImmuColCell(int index) const {
+    return init_immu_col_to_null_ ? "NULL" : std::to_string(index*4);
+  }
+
+  void DoTestVerifyRows(const shared_ptr<KuduTable>& tbl, int num_rows) const 
override {
+    vector<string> rows;
+    KuduScanner scanner(tbl.get());
+    ASSERT_OK(ScanToStrings(&scanner, &rows));
+    ASSERT_EQ(num_rows, rows.size());
+    for (int i = 0; i < num_rows; i++) {
+      int key = i + 1;
+      ASSERT_EQ(
+          Substitute("(int32 key=$0, int32 int_val=$1, string 
string_val=\"hello $2\", "
+                     "int32 non_null_with_default=$3, int32 imm_val=$4)",
+                     key, key*2, key, key*3, ExpectImmuColCell(key)),
+          rows[i]);
+    }
+  }
+
+ protected:
+  bool init_immu_col_to_null_;
+  bool update_immu_col_to_null_;
+};
+
+INSTANTIATE_TEST_SUITE_P(Params, ClientTestImmutableColumn,
+                         ::testing::Combine(::testing::Bool(),
+                                            ::testing::Bool()));
+
+TEST_P(ClientTestImmutableColumn, TestUpdate) {
+  shared_ptr<KuduSession> session = client_->NewSession();
+  session->SetTimeoutMillis(10000);
+  ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+
+  {
+    unique_ptr<KuduInsert> insert(BuildTestInsert(client_table_.get(), 1));
+    ASSERT_OK(session->Apply(insert.release()));
+    NO_FATALS(DoTestVerifyRows(client_table_, 1));
+    NO_FATALS(DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 0, 0, 0));  // 
successful_inserts++
+  }
+
+  const string expect_row = Substitute(
+      "(int32 key=1, int32 int_val=999, string string_val=\"hello world\", "
+      "int32 non_null_with_default=999, int32 imm_val=$0)",
+      ExpectImmuColCell(1));
+  {
+    // UPDATE can update row without immutable column set.
+    unique_ptr<KuduUpdate> update(client_table_->NewUpdate());
+    ASSERT_OK(update->mutable_row()->SetInt32("key", 1));
+    ASSERT_OK(update->mutable_row()->SetInt32("int_val", 999));
+    ASSERT_OK(update->mutable_row()->SetStringCopy("string_val", "hello 
world"));
+    ASSERT_OK(update->mutable_row()->SetInt32("non_null_with_default", 999));
+    ASSERT_OK(session->Apply(update.release()));
+    NO_FATALS(DoVerifyMetrics(session.get(), 1, 0, 0, 0, 1, 0, 0, 0));  // 
successful_updates++
+
+    vector<string> rows;
+    KuduScanner scanner(client_table_.get());
+    ASSERT_OK(ScanToStrings(&scanner, &rows));
+    ASSERT_EQ(1, rows.size());
+    ASSERT_EQ(expect_row, rows[0]);
+  }
+
+  {
+    // UPDATE results in an error when attempting to update row having at 
least one column with the
+    // immutable attribute set.
+    unique_ptr<KuduUpdate> update(client_table_->NewUpdate());
+    ASSERT_OK(update->mutable_row()->SetInt32("key", 1));
+    ASSERT_OK(update->mutable_row()->SetInt32("int_val", 888));
+    ASSERT_OK(update->mutable_row()->SetStringCopy("string_val", "world 
hello"));
+    ASSERT_OK(update->mutable_row()->SetInt32("non_null_with_default", 888));
+    if (update_immu_col_to_null_) {
+      ASSERT_OK(update->mutable_row()->SetNull("imm_val"));
+    } else {
+      ASSERT_OK(update->mutable_row()->SetInt32("imm_val", 888));
+    }
+    Status s = session->Apply(update.release());
+    ASSERT_TRUE(s.IsIOError()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(),
+                        "failed to flush data: error details are available "
+                        "via KuduSession::GetPendingErrors()");
+    vector<KuduError*> errors;
+    ElementDeleter d(&errors);
+    bool overflow;
+    session->GetPendingErrors(&errors, &overflow);
+    ASSERT_FALSE(overflow);
+    ASSERT_EQ(1, errors.size());
+    ASSERT_STR_CONTAINS(
+        errors[0]->status().ToString(),
+        "Immutable: UPDATE not allowed for immutable column: imm_val INT32 
NULLABLE IMMUTABLE");
+    NO_FATALS(DoVerifyMetrics(session.get(), 1, 0, 0, 0, 1, 0, 0, 0));  // 
nothing changed
+
+    vector<string> rows;
+    KuduScanner scanner(client_table_.get());
+    ASSERT_OK(ScanToStrings(&scanner, &rows));
+    ASSERT_EQ(1, rows.size());
+    ASSERT_EQ(expect_row, rows[0]);
+  }
+}
+
+TEST_P(ClientTestImmutableColumn, TestUpdateIgnore) {
+  shared_ptr<KuduSession> session = client_->NewSession();
+  session->SetTimeoutMillis(10000);
+  ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+
+  {
+    unique_ptr<KuduInsert> insert(BuildTestInsert(client_table_.get(), 1));
+    ASSERT_OK(session->Apply(insert.release()));
+    NO_FATALS(DoTestVerifyRows(client_table_, 1));
+    NO_FATALS(DoVerifyMetrics(session.get(), 1, 0, 0, 0, 0, 0, 0, 0));  // 
successful_inserts++
+  }
+
+  const string expect_row = Substitute(
+      "(int32 key=1, int32 int_val=888, string string_val=\"world hello\", "
+      "int32 non_null_with_default=888, int32 imm_val=$0)",
+      ExpectImmuColCell(1));
+  {
+    // UPDATE IGNORE can update a row without changing the immutable column 
cell, the error of
+    // updating the immutable column will be ignored.
+    unique_ptr<KuduUpdateIgnore> 
update_ignore(client_table_->NewUpdateIgnore());
+    ASSERT_OK(update_ignore->mutable_row()->SetInt32("key", 1));
+    ASSERT_OK(update_ignore->mutable_row()->SetInt32("int_val", 888));
+    ASSERT_OK(update_ignore->mutable_row()->SetStringCopy("string_val", "world 
hello"));
+    ASSERT_OK(update_ignore->mutable_row()->SetInt32("non_null_with_default", 
888));
+    if (update_immu_col_to_null_) {
+      ASSERT_OK(update_ignore->mutable_row()->SetNull("imm_val"));
+    } else {
+      ASSERT_OK(update_ignore->mutable_row()->SetInt32("imm_val", 888));
+    }
+    ASSERT_OK(session->Apply(update_ignore.release()));
+    // successful_updates++, update_ignore_errors++,
+    NO_FATALS(DoVerifyMetrics(session.get(), 1, 0, 0, 0, 1, 1, 0, 0));
+
+    vector<string> rows;
+    KuduScanner scanner(client_table_.get());
+    ASSERT_OK(ScanToStrings(&scanner, &rows));
+    ASSERT_EQ(1, rows.size());
+    ASSERT_EQ(expect_row, rows[0]);
+  }
+
+  {
+    // UPDATE IGNORE only on immutable column. Note that this will result in
+    // a 'Invalid argument: No fields updated' error.
+    unique_ptr<KuduUpdateIgnore> 
update_ignore(client_table_->NewUpdateIgnore());
+    ASSERT_OK(update_ignore->mutable_row()->SetInt32("key", 1));
+    if (update_immu_col_to_null_) {
+      ASSERT_OK(update_ignore->mutable_row()->SetNull("imm_val"));
+    } else {
+      ASSERT_OK(update_ignore->mutable_row()->SetInt32("imm_val", 888));
+    }
+    Status s = session->Apply(update_ignore.release());
+    ASSERT_TRUE(s.IsIOError()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(),
+                        "failed to flush data: error details are available "
+                        "via KuduSession::GetPendingErrors()");
+    vector<KuduError*> errors;
+    ElementDeleter d(&errors);
+    bool overflow;
+    session->GetPendingErrors(&errors, &overflow);
+    ASSERT_FALSE(overflow);
+    ASSERT_EQ(1, errors.size());
+    ASSERT_STR_CONTAINS(errors[0]->status().ToString(),
+                        "Invalid argument: No fields updated, key is: (int32 
key=1)");
+    NO_FATALS(DoVerifyMetrics(session.get(), 1, 0, 0, 0, 1, 1, 0, 0));  // 
nothing changed
+
+    vector<string> rows;
+    KuduScanner scanner(client_table_.get());
+    ASSERT_OK(ScanToStrings(&scanner, &rows));
+    ASSERT_EQ(1, rows.size());
+    ASSERT_EQ(expect_row, rows[0]);
+  }
+}
+
+TEST_P(ClientTestImmutableColumn, TestUpsertIgnore) {
+  shared_ptr<KuduSession> session = client_->NewSession();
+  session->SetTimeoutMillis(10000);
+  ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+
+  {
+    // UPSERT IGNORE can insert row.
+    unique_ptr<KuduUpsertIgnore> 
upsert_ignore(BuildTestUpsertIgnore(client_table_.get(), 1));
+    ASSERT_OK(session->Apply(upsert_ignore.release()));
+    NO_FATALS(DoTestVerifyRows(client_table_, 1));
+    NO_FATALS(DoVerifyMetrics(session.get(), 0, 0, 1, 0, 0, 0, 0, 0)); // 
successful_upserts++
+  }
+
+  {
+    // UPSERT IGNORE can update row without immutable column set.
+    unique_ptr<KuduUpsertIgnore> 
upsert_ignore(client_table_->NewUpsertIgnore());
+    ASSERT_OK(upsert_ignore->mutable_row()->SetInt32("key", 1));
+    ASSERT_OK(upsert_ignore->mutable_row()->SetInt32("int_val", 999));
+    ASSERT_OK(upsert_ignore->mutable_row()->SetStringCopy("string_val", "hello 
world"));
+    ASSERT_OK(upsert_ignore->mutable_row()->SetInt32("non_null_with_default", 
999));
+    ASSERT_OK(session->Apply(upsert_ignore.release()));
+    NO_FATALS(DoVerifyMetrics(session.get(), 0, 0, 2, 0, 0, 0, 0, 0));  // 
successful_upserts++
+
+    vector<string> rows;
+    KuduScanner scanner(client_table_.get());
+    ASSERT_OK(ScanToStrings(&scanner, &rows));
+    ASSERT_EQ(1, rows.size());
+    ASSERT_EQ(Substitute("(int32 key=1, int32 int_val=999, string 
string_val=\"hello world\", "
+                         "int32 non_null_with_default=999, int32 imm_val=$0)",
+                         ExpectImmuColCell(1)),
+              rows[0]);
+  }
+
+  {
+    // UPSERT IGNORE can update row with immutable column set.
+    unique_ptr<KuduUpsertIgnore> 
upsert_ignore(client_table_->NewUpsertIgnore());
+    ASSERT_OK(upsert_ignore->mutable_row()->SetInt32("key", 1));
+    ASSERT_OK(upsert_ignore->mutable_row()->SetInt32("int_val", 888));
+    ASSERT_OK(upsert_ignore->mutable_row()->SetStringCopy("string_val", "world 
hello"));
+    ASSERT_OK(upsert_ignore->mutable_row()->SetInt32("non_null_with_default", 
888));
+    if (update_immu_col_to_null_) {
+      ASSERT_OK(upsert_ignore->mutable_row()->SetNull("imm_val"));
+    } else {
+      ASSERT_OK(upsert_ignore->mutable_row()->SetInt32("imm_val", 888));
+    }
+    ASSERT_OK(session->Apply(upsert_ignore.release()));
+    // successful_upserts++, upsert_ignore_errors++
+    NO_FATALS(DoVerifyMetrics(session.get(), 0, 0, 3, 1, 0, 0, 0, 0));
+
+    vector<string> rows;
+    KuduScanner scanner(client_table_.get());
+    ASSERT_OK(ScanToStrings(&scanner, &rows));
+    ASSERT_EQ(1, rows.size());
+    ASSERT_EQ(Substitute("(int32 key=1, int32 int_val=888, string 
string_val=\"world hello\", "
+                         "int32 non_null_with_default=888, int32 imm_val=$0)",
+                         ExpectImmuColCell(1)),
+              rows[0]);
+  }
+}
+
+TEST_P(ClientTestImmutableColumn, TestUpsert) {
+  shared_ptr<KuduSession> session = client_->NewSession();
+  ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+
+  // Perform and verify UPSERT which acts as an INSERT.
+  ASSERT_OK(ApplyUpsertToSession(session.get(), client_table_, 1, 1, "original 
row", 1));
+  NO_FATALS(DoVerifyMetrics(session.get(), 0, 0, 1, 0, 0, 0, 0, 0));  // 
successful_upserts++
+
+  const string expect_row = R"((int32 key=1, int32 int_val=1, string 
string_val="original row", )"
+                            "int32 non_null_with_default=12345, int32 
imm_val=1)";
+  {
+    vector<string> rows;
+    ASSERT_OK(ScanTableToStrings(client_table_.get(), &rows));
+    ASSERT_EQ(1, rows.size());
+    EXPECT_EQ(expect_row, rows[0]);
+  }
+
+  // Perform an UPSERT. This upsert will attemp to update an immutable column,
+  // which will result an error.
+  Status s = ApplyUpsertToSession(session.get(), client_table_, 1, 4, 
"upserted row 3",
+                                  update_immu_col_to_null_ ? nullopt : 
optional<int>(999));
+  ASSERT_TRUE(s.IsIOError()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(),
+                      "failed to flush data: error details are available "
+                      "via KuduSession::GetPendingErrors()");
+  vector<KuduError*> errors;
+  ElementDeleter d(&errors);
+  bool overflow;
+  session->GetPendingErrors(&errors, &overflow);
+  ASSERT_FALSE(overflow);
+  ASSERT_EQ(1, errors.size());
+  ASSERT_STR_CONTAINS(
+      errors[0]->status().ToString(),
+      "Immutable: UPDATE not allowed for immutable column: imm_val INT32 
NULLABLE IMMUTABLE");
+  NO_FATALS(DoVerifyMetrics(session.get(), 0, 0, 1, 0, 0, 0, 0, 0));  // 
nothing changed
+  {
+    vector<string> rows;
+    ASSERT_OK(ScanTableToStrings(client_table_.get(), &rows));
+    ASSERT_EQ(1, rows.size());
+    EXPECT_EQ(expect_row, rows[0]);  // nothing changed
+  }
+}
+
+class ClientTestImmutableColumnCompatablity : public ClientTest {
+ public:
+  void SetUp() override {
+    // Disable the immutable column attribute feature in master for testing.
+    FLAGS_master_support_immutable_column_attribute = false;
+
+    KuduTest::SetUp();
+
+    // Start minicluster and wait for tablet servers to connect to master.
+    InternalMiniClusterOptions options;
+    options.num_tablet_servers = 1;
+    cluster_.reset(new InternalMiniCluster(env_, std::move(options)));
+    ASSERT_OK(cluster_->StartSync());
+    ASSERT_OK(cluster_->CreateClient(nullptr, &client_));
+  }
+};
+
+TEST_F(ClientTestImmutableColumnCompatablity, CreateTable) {
+  const string kTableName = "create_table_with_immutable_attribute_column";
+  KuduSchemaBuilder b;
+  b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
+  
b.AddColumn("imm_val")->Type(KuduColumnSchema::INT32)->Immutable()->Nullable();
+  ASSERT_OK(b.Build(&schema_));
+
+  unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+  Status s = table_creator->table_name(kTableName)
+                .schema(&schema_)
+                .add_hash_partitions({"key"}, 2)
+                .num_replicas(1)
+                .Create();
+  ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
+  ASSERT_STR_CONTAINS(
+      s.ToString(),
+      Substitute("Error creating table $0 on the master: cluster does not 
support "
+                 "CreateTable with feature(s) IMMUTABLE_COLUMN_ATTRIBUTE", 
kTableName));
+}
+
+TEST_F(ClientTestImmutableColumnCompatablity, AlterTable) {
+  const string kTableName = "alter_table_with_immutable_attribute_column";
+  KuduSchemaBuilder b;
+  b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
+  b.AddColumn("int_val")->Type(KuduColumnSchema::INT32)->Nullable();
+  ASSERT_OK(b.Build(&schema_));
+
+  unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+  ASSERT_OK(table_creator->table_name(kTableName)
+                .schema(&schema_)
+                .add_hash_partitions({"key"}, 2)
+                .num_replicas(1)
+                .Create());
+
+  unique_ptr<KuduTableAlterer> 
table_alterer(client_->NewTableAlterer(kTableName));
+  
table_alterer->AddColumn("imm_val")->Type(KuduColumnSchema::INT32)->Immutable()->Nullable();
+  Status s = table_alterer->Alter();
+  ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
+  ASSERT_STR_CONTAINS(
+      s.ToString(),
+      "cluster does not support AlterTable with feature(s) 
IMMUTABLE_COLUMN_ATTRIBUTE");
+}
+
 class ClientTestUnixSocket : public ClientTest {
  public:
   void SetUp() override {
     FLAGS_rpc_listen_on_unix_domain_socket = true;
     FLAGS_client_use_unix_domain_sockets = true;
     ClientTest::SetUp();
+    ASSERT_OK(BuildSchema());
   }
 };
 
@@ -8744,6 +9188,7 @@ class MultiTServerClientTest : public ClientTest {
  public:
   void SetUp() override {
     KuduTest::SetUp();
+    ASSERT_OK(BuildSchema());
 
     // Start minicluster and wait for tablet servers to connect to master.
     InternalMiniClusterOptions options;
@@ -8862,6 +9307,7 @@ class ReplicationFactorLimitsTest : public ClientTest {
     FLAGS_max_num_replicas = 5;
 
     KuduTest::SetUp();
+    ASSERT_OK(BuildSchema());
 
     // Start minicluster and wait for tablet servers to connect to master.
     InternalMiniClusterOptions options;
diff --git a/src/kudu/client/client-unittest.cc 
b/src/kudu/client/client-unittest.cc
index 27f5d666f..63fe3824f 100644
--- a/src/kudu/client/client-unittest.cc
+++ b/src/kudu/client/client-unittest.cc
@@ -372,7 +372,7 @@ TEST(KuduColumnSchemaTest, TestEquals) {
 
   const int kDefaultOf7 = 7;
   KuduColumnSchema a32_dflt("a", KuduColumnSchema::INT32, 
/*is_nullable=*/false,
-                              /*default_value=*/&kDefaultOf7);
+                            /*is_immutable=*/false, 
/*default_value=*/&kDefaultOf7);
   ASSERT_NE(a32, a32_dflt);
 }
 
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 47cb589cd..1c1dd95b5 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -51,6 +51,7 @@
 #include "kudu/client/scan_predicate-internal.h"
 #include "kudu/client/scan_token-internal.h"
 #include "kudu/client/scanner-internal.h"
+#include "kudu/client/schema-internal.h"
 #include "kudu/client/session-internal.h"
 #include "kudu/client/table-internal.h"
 #include "kudu/client/table_alterer-internal.h"
@@ -1040,6 +1041,15 @@ Status KuduTableCreator::Create() {
     }
   }
 
+  bool has_immutable_column_schema = false;
+  for (size_t i = 0; i < data_->schema_->num_columns(); i++) {
+    const auto& col_schema = data_->schema_->Column(i);
+    if (col_schema.is_immutable()) {
+      has_immutable_column_schema = true;
+      break;
+    }
+  }
+
   if (data_->table_type_) {
     req.set_table_type(*data_->table_type_);
   }
@@ -1058,7 +1068,8 @@ Status KuduTableCreator::Create() {
                                          &resp,
                                          deadline,
                                          !data_->range_partitions_.empty(),
-                                         has_range_with_custom_hash_schema),
+                                         has_range_with_custom_hash_schema,
+                                         has_immutable_column_schema),
       Substitute("Error creating table $0 on the master", data_->table_name_));
   // Spin until the table is fully created, if requested.
   if (data_->wait_) {
@@ -1183,6 +1194,10 @@ KuduUpsert* KuduTable::NewUpsert() {
   return new KuduUpsert(shared_from_this());
 }
 
+KuduUpsertIgnore* KuduTable::NewUpsertIgnore() {
+  return new KuduUpsertIgnore(shared_from_this());
+}
+
 KuduUpdate* KuduTable::NewUpdate() {
   return new KuduUpdate(shared_from_this());
 }
@@ -1667,6 +1682,14 @@ Status KuduTableAlterer::Alter() {
   AlterTableResponsePB resp;
   RETURN_NOT_OK(data_->ToRequest(&req));
 
+  bool has_immutable_column_schema = false;
+  for (const auto& step : data_->steps_) {
+    if (step.step_type == AlterTableRequestPB::ADD_COLUMN && 
step.spec->data_->immutable) {
+      has_immutable_column_schema = true;
+      break;
+    }
+  }
+
   MonoDelta timeout = data_->timeout_.Initialized() ?
     data_->timeout_ :
     data_->client_->default_admin_operation_timeout();
@@ -1674,7 +1697,8 @@ Status KuduTableAlterer::Alter() {
   RETURN_NOT_OK(data_->client_->data_->AlterTable(
       data_->client_, req, &resp, deadline,
       data_->has_alter_partitioning_steps,
-      data_->adding_range_with_custom_hash_schema));
+      data_->adding_range_with_custom_hash_schema,
+      has_immutable_column_schema));
 
   if (data_->has_alter_partitioning_steps) {
     // If the table partitions change, clear the local meta cache so that the
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index f39cd8f3c..cbe9877d0 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -107,6 +107,7 @@ class KuduTabletServer;
 class KuduUpdate;
 class KuduUpdateIgnore;
 class KuduUpsert;
+class KuduUpsertIgnore;
 class KuduValue;
 class KuduWriteOperation;
 class ResourceMetrics;
@@ -1624,6 +1625,11 @@ class KUDU_EXPORT KuduTable : public 
sp::enable_shared_from_this<KuduTable> {
   ///   KuduSession::Apply().
   KuduUpsert* NewUpsert();
 
+  /// @return New @c UPSERT_IGNORE operation for this table. It is the
+  ///   caller's responsibility to free the result, unless it is passed to
+  ///   KuduSession::Apply().
+  KuduUpsertIgnore* NewUpsertIgnore();
+
   /// @return New @c UPDATE operation for this table. It is the caller's
   ///   responsibility to free the result, unless it is passed to
   ///   KuduSession::Apply().
diff --git a/src/kudu/client/schema-internal.h 
b/src/kudu/client/schema-internal.h
index df5a963e0..0b8357545 100644
--- a/src/kudu/client/schema-internal.h
+++ b/src/kudu/client/schema-internal.h
@@ -80,6 +80,7 @@ class KuduColumnSpec::Data {
   std::optional<KuduColumnStorageAttributes::CompressionType> compression;
   std::optional<int32_t> block_size;
   std::optional<bool> nullable;
+  std::optional<bool> immutable;
   bool primary_key;
   std::optional<KuduValue*> default_val;  // Owned.
   bool remove_default;                      // For ALTER
diff --git a/src/kudu/client/schema.cc b/src/kudu/client/schema.cc
index e3e48324f..75d3ba16b 100644
--- a/src/kudu/client/schema.cc
+++ b/src/kudu/client/schema.cc
@@ -345,6 +345,16 @@ KuduColumnSpec* KuduColumnSpec::Nullable() {
   return this;
 }
 
+KuduColumnSpec* KuduColumnSpec::Immutable() {
+  data_->immutable = true;
+  return this;
+}
+
+KuduColumnSpec* KuduColumnSpec::Mutable() {
+  data_->immutable = false;
+  return this;
+}
+
 KuduColumnSpec* KuduColumnSpec::RemoveDefault() {
   data_->remove_default = true;
   return this;
@@ -453,6 +463,7 @@ Status KuduColumnSpec::ToColumnSchema(KuduColumnSchema* 
col) const {
   KuduColumnTypeAttributes type_attrs(precision, scale, length);
   DataType internal_type = ToInternalDataType(data_->type.value(), type_attrs);
   bool nullable = data_->nullable ? data_->nullable.value() : true;
+  bool immutable = data_->immutable ? data_->immutable.value() : false;
 
   void* default_val = nullptr;
   // TODO(unknown): distinguish between DEFAULT NULL and no default?
@@ -474,7 +485,7 @@ Status KuduColumnSpec::ToColumnSchema(KuduColumnSchema* 
col) const {
 #pragma GCC diagnostic push
 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
   *col = KuduColumnSchema(data_->name, data_->type.value(), nullable,
-                          default_val,
+                          immutable, default_val,
                           KuduColumnStorageAttributes(encoding, compression, 
block_size),
                           type_attrs,
                           data_->comment ? data_->comment.value() : "");
@@ -732,6 +743,7 @@ string KuduColumnSchema::DataTypeToString(DataType type) {
 KuduColumnSchema::KuduColumnSchema(const string &name,
                                    DataType type,
                                    bool is_nullable,
+                                   bool is_immutable,
                                    const void* default_value,
                                    const KuduColumnStorageAttributes& 
storage_attributes,
                                    const KuduColumnTypeAttributes& 
type_attributes,
@@ -746,7 +758,7 @@ KuduColumnSchema::KuduColumnSchema(const string &name,
   type_attr_private.length = type_attributes.length();
   col_ = new ColumnSchema(name, ToInternalDataType(type, type_attributes),
                           is_nullable,
-                          false,   // TODO(yingchun): set according to a new 
added parameter later
+                          is_immutable,
                           default_value, default_value, attr_private,
                           type_attr_private, comment);
 }
@@ -800,6 +812,10 @@ bool KuduColumnSchema::is_nullable() const {
   return DCHECK_NOTNULL(col_)->is_nullable();
 }
 
+bool KuduColumnSchema::is_immutable() const {
+  return DCHECK_NOTNULL(col_)->is_immutable();
+}
+
 KuduColumnSchema::DataType KuduColumnSchema::type() const {
   return FromInternalDataType(DCHECK_NOTNULL(col_)->type_info()->type());
 }
@@ -901,7 +917,7 @@ KuduColumnSchema KuduSchema::Column(size_t idx) const {
   KuduColumnTypeAttributes type_attrs(col.type_attributes().precision, 
col.type_attributes().scale,
                                       col.type_attributes().length);
   return KuduColumnSchema(col.name(), 
FromInternalDataType(col.type_info()->type()),
-                          col.is_nullable(), col.read_default_value(),
+                          col.is_nullable(), col.is_immutable(), 
col.read_default_value(),
                           attrs, type_attrs, col.comment());
 }
 
diff --git a/src/kudu/client/schema.h b/src/kudu/client/schema.h
index 2fae75907..a7a8af6e5 100644
--- a/src/kudu/client/schema.h
+++ b/src/kudu/client/schema.h
@@ -303,6 +303,9 @@ class KUDU_EXPORT KuduColumnSchema {
 
   /// @return @c true iff the column schema has the nullable attribute set.
   bool is_nullable() const;
+
+  /// @return @c true iff the column schema has the immutable attribute set.
+  bool is_immutable() const;
   ///@}
 
   /// @return Type attributes of the column schema.
@@ -340,6 +343,7 @@ class KUDU_EXPORT KuduColumnSchema {
       const std::string &name,
       DataType type,
       bool is_nullable = false,
+      bool is_immutable = false,
       const void* default_value = NULL, //NOLINT(modernize-use-nullptr)
       const KuduColumnStorageAttributes& storage_attributes = 
KuduColumnStorageAttributes(),
       const KuduColumnTypeAttributes& type_attributes = 
KuduColumnTypeAttributes(),
@@ -497,6 +501,16 @@ class KUDU_EXPORT KuduColumnSpec {
   /// @return Pointer to the modified object.
   KuduColumnSpec* Nullable();
 
+  /// Set the column to be immutable.
+  ///
+  /// @return Pointer to the modified object.
+  KuduColumnSpec* Immutable();
+
+  /// Set the column to be mutable (the default).
+  ///
+  /// @return Pointer to the modified object.
+  KuduColumnSpec* Mutable();
+
   /// Set the data type of the column.
   ///
   /// @note Column data types may not be changed once a table is created.
diff --git a/src/kudu/client/table_alterer-internal.cc 
b/src/kudu/client/table_alterer-internal.cc
index 68a8b8def..ff869d5f0 100644
--- a/src/kudu/client/table_alterer-internal.cc
+++ b/src/kudu/client/table_alterer-internal.cc
@@ -138,7 +138,8 @@ Status 
KuduTableAlterer::Data::ToRequest(AlterTableRequestPB* req) {
             !s.spec->data_->encoding &&
             !s.spec->data_->compression &&
             !s.spec->data_->block_size &&
-            !s.spec->data_->comment) {
+            !s.spec->data_->comment &&
+            !s.spec->data_->immutable) {
           return Status::InvalidArgument("no alter operation specified",
                                          s.spec->data_->name);
         }
@@ -150,7 +151,8 @@ Status 
KuduTableAlterer::Data::ToRequest(AlterTableRequestPB* req) {
             !s.spec->data_->encoding &&
             !s.spec->data_->compression &&
             !s.spec->data_->block_size &&
-            !s.spec->data_->comment) {
+            !s.spec->data_->comment &&
+            !s.spec->data_->immutable) {
           pb_step->set_type(AlterTableRequestPB::RENAME_COLUMN);
           pb_step->mutable_rename_column()->set_old_name(s.spec->data_->name);
           
pb_step->mutable_rename_column()->set_new_name(*(s.spec->data_->rename_to));
diff --git a/src/kudu/client/write_op.cc b/src/kudu/client/write_op.cc
index ca472057f..24b760f6c 100644
--- a/src/kudu/client/write_op.cc
+++ b/src/kudu/client/write_op.cc
@@ -18,16 +18,17 @@
 #include "kudu/client/write_op.h"
 
 #include <ostream>
+#include <type_traits>
 
 #include <glog/logging.h>
 
-#include "kudu/client/client.h"
+#include "kudu/client/client.h" // IWYU pragma: keep
 #include "kudu/client/schema.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/common/row.h"
+#include "kudu/common/row_operations.pb.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/types.h"
-#include "kudu/common/wire_protocol.pb.h"
 #include "kudu/util/bitmap.h"
 #include "kudu/util/slice.h"
 
@@ -46,6 +47,7 @@ RowOperationsPB_Type 
ToInternalWriteType(KuduWriteOperation::Type type) {
     case KuduWriteOperation::INSERT_IGNORE: return 
RowOperationsPB_Type_INSERT_IGNORE;
     case KuduWriteOperation::UPDATE_IGNORE: return 
RowOperationsPB_Type_UPDATE_IGNORE;
     case KuduWriteOperation::DELETE_IGNORE: return 
RowOperationsPB_Type_DELETE_IGNORE;
+    case KuduWriteOperation::UPSERT_IGNORE: return 
RowOperationsPB_Type_UPSERT_IGNORE;
     default: LOG(FATAL) << "Unexpected write operation type: " << type;
   }
 }
@@ -144,6 +146,13 @@ KuduUpsert::KuduUpsert(const shared_ptr<KuduTable>& table)
 
 KuduUpsert::~KuduUpsert() {}
 
+// UpsertIgnore 
-----------------------------------------------------------------
+
+KuduUpsertIgnore::KuduUpsertIgnore(const shared_ptr<KuduTable>& table)
+    : KuduWriteOperation(table) {
+}
+
+KuduUpsertIgnore::~KuduUpsertIgnore() {}
 
 } // namespace client
 } // namespace kudu
diff --git a/src/kudu/client/write_op.h b/src/kudu/client/write_op.h
index a1e9e30a6..ea30d8a12 100644
--- a/src/kudu/client/write_op.h
+++ b/src/kudu/client/write_op.h
@@ -73,7 +73,8 @@ class KUDU_EXPORT KuduWriteOperation {
     UPSERT = 4,
     INSERT_IGNORE = 5,
     UPDATE_IGNORE = 6,
-    DELETE_IGNORE = 7
+    DELETE_IGNORE = 7,
+    UPSERT_IGNORE = 8
   };
   virtual ~KuduWriteOperation();
 
@@ -215,6 +216,33 @@ class KUDU_EXPORT KuduUpsert : public KuduWriteOperation {
 };
 
 
+/// @brief A single row upsert ignore to be sent to the cluster, errors on 
updating immutable
+///   cells are ignored.
+///
+/// See KuduUpsert for more details.
+class KUDU_EXPORT KuduUpsertIgnore : public KuduWriteOperation {
+ public:
+  ~KuduUpsertIgnore() override;
+
+  /// @copydoc KuduWriteOperation::ToString()
+  std::string ToString() const override { return "UPSERT IGNORE " + 
row_.ToString(); }
+
+ protected:
+  /// @cond PROTECTED_MEMBERS_DOCUMENTED
+
+  /// @copydoc KuduWriteOperation::type()
+  Type type() const override {
+    return UPSERT_IGNORE;
+  }
+
+  /// @endcond
+
+ private:
+  friend class KuduTable;
+  explicit KuduUpsertIgnore(const sp::shared_ptr<KuduTable>& table);
+};
+
+
 /// @brief A single row update to be sent to the cluster.
 ///
 /// @pre An update requires the key columns and at least one other column
@@ -241,7 +269,8 @@ class KUDU_EXPORT KuduUpdate : public KuduWriteOperation {
   explicit KuduUpdate(const sp::shared_ptr<KuduTable>& table);
 };
 
-/// @brief A single row update ignore to be sent to the cluster, missing row 
errors are ignored.
+/// @brief A single row update ignore to be sent to the cluster, missing row 
errors and
+///   errors on updating immutable cells are ignored.
 ///
 /// @pre An update ignore requires the key columns and at least one other 
column
 ///   in the schema to be set in the embedded KuduPartialRow object.
diff --git a/src/kudu/common/row_operations.cc 
b/src/kudu/common/row_operations.cc
index 20ac21444..1ed07cdeb 100644
--- a/src/kudu/common/row_operations.cc
+++ b/src/kudu/common/row_operations.cc
@@ -562,11 +562,15 @@ Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const 
ClientServerMapping& m
     RowChangeListEncoder rcl_encoder(&buf);
 
     // Now process the rest of columns as updates.
+    DCHECK_EQ(client_schema_->num_key_columns(), client_col_idx);
     for (; client_col_idx < client_schema_->num_columns(); client_col_idx++) {
       size_t tablet_col_idx = GetTabletColIdx(mapping, client_col_idx);
       const ColumnSchema& col = tablet_schema_->column(tablet_col_idx);
 
       if (BitmapTest(client_isset_map, client_col_idx)) {
+        bool client_set_to_null = client_schema_->has_nullables() &&
+          BitmapTest(client_null_map, client_col_idx);
+
         if (col.is_immutable()) {
           if (op->type == RowOperationsPB::UPDATE) {
             op->SetFailureStatusOnce(
@@ -575,12 +579,13 @@ Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const 
ClientServerMapping& m
             DCHECK_EQ(RowOperationsPB::UPDATE_IGNORE, op->type);
             op->error_ignored = true;
           }
-          RETURN_NOT_OK(ReadColumnAndDiscard(col));
+          if (!client_set_to_null) {
+            RETURN_NOT_OK(ReadColumnAndDiscard(col));
+          }
           // Use 'continue' not 'break' to consume the rest row data.
           continue;
         }
-        bool client_set_to_null = client_schema_->has_nullables() &&
-          BitmapTest(client_null_map, client_col_idx);
+
         uint8_t scratch[kLargestTypeSize];
         uint8_t* val_to_add = nullptr;
         if (!client_set_to_null) {
@@ -593,6 +598,7 @@ Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const 
ClientServerMapping& m
           RETURN_NOT_OK(ReadColumnAndDiscard(col));
           continue;
         }
+
         rcl_encoder.AddColumnUpdate(col, 
tablet_schema_->column_id(tablet_col_idx), val_to_add);
       }
     }
diff --git a/src/kudu/integration-tests/alter_table-test.cc 
b/src/kudu/integration-tests/alter_table-test.cc
index f1c2d0b62..9e05280a1 100644
--- a/src/kudu/integration-tests/alter_table-test.cc
+++ b/src/kudu/integration-tests/alter_table-test.cc
@@ -26,6 +26,7 @@
 #include <ostream>
 #include <string>
 #include <thread>
+#include <type_traits>
 #include <unordered_map>
 #include <utility>
 #include <vector>
@@ -2545,4 +2546,45 @@ TEST_F(AlterTableTest, TestDisableCompactAlter) {
   ASSERT_TRUE(s.IsInvalidArgument());
 }
 
+TEST_F(AlterTableTest, AddAndRemoveImmutableAttribute) {
+  InsertRows(0, 1);
+  ASSERT_OK(tablet_replica_->tablet()->Flush());
+
+  vector<string> rows;
+  ScanToStrings(&rows);
+  ASSERT_EQ(1, rows.size());
+  EXPECT_EQ("(int32 c0=0, int32 c1=0)", rows[0]);
+
+  {
+    // Add immutable attribute to a column.
+    unique_ptr<KuduTableAlterer> 
table_alterer(client_->NewTableAlterer(kTableName));
+    table_alterer->AlterColumn("c1")->Immutable();
+    ASSERT_OK(table_alterer->Alter());
+  }
+
+  InsertRows(1, 1);
+  ASSERT_OK(tablet_replica_->tablet()->Flush());
+
+  ScanToStrings(&rows);
+  ASSERT_EQ(2, rows.size());
+  EXPECT_EQ("(int32 c0=0, int32 c1=0)", rows[0]);
+  EXPECT_EQ("(int32 c0=16777216, int32 c1=1)", rows[1]);
+
+  {
+    // Remove immutable attribute from a column.
+    unique_ptr<KuduTableAlterer> 
table_alterer(client_->NewTableAlterer(kTableName));
+    table_alterer->AlterColumn("c1")->Mutable();
+    ASSERT_OK(table_alterer->Alter());
+  }
+
+  InsertRows(2, 1);
+  ASSERT_OK(tablet_replica_->tablet()->Flush());
+
+  ScanToStrings(&rows);
+  ASSERT_EQ(3, rows.size());
+  EXPECT_EQ("(int32 c0=0, int32 c1=0)", rows[0]);
+  EXPECT_EQ("(int32 c0=16777216, int32 c1=1)", rows[1]);
+  EXPECT_EQ("(int32 c0=33554432, int32 c1=2)", rows[2]);
+}
+
 } // namespace kudu
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index 6a7cd9f16..d4e9699ea 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -1169,6 +1169,8 @@ enum MasterFeatures {
   RANGE_SPECIFIC_HASH_SCHEMA = 8;
   // Similar to IGNORE_OPERATIONS, but this is for UPSERT_IGNORE specifically.
   UPSERT_IGNORE = 9;
+  // Whether master supports immutable attribute on column schema.
+  IMMUTABLE_COLUMN_ATTRIBUTE = 10;
 }
 
 service MasterService {
diff --git a/src/kudu/master/master_service.cc 
b/src/kudu/master/master_service.cc
index 9db93a038..6e2cfcf19 100644
--- a/src/kudu/master/master_service.cc
+++ b/src/kudu/master/master_service.cc
@@ -113,6 +113,11 @@ DEFINE_bool(master_support_upsert_ignore_operations, true,
 TAG_FLAG(master_support_upsert_ignore_operations, hidden);
 TAG_FLAG(master_support_upsert_ignore_operations, runtime);
 
+DEFINE_bool(master_support_immutable_column_attribute, true,
+            "Whether the cluster supports immutable attribute on column 
schema.");
+TAG_FLAG(master_support_immutable_column_attribute, hidden);
+TAG_FLAG(master_support_immutable_column_attribute, runtime);
+
 
 using google::protobuf::Message;
 using kudu::consensus::ReplicaManagementInfoPB;
@@ -918,6 +923,8 @@ bool MasterServiceImpl::SupportsFeature(uint32_t feature) 
const {
       return FLAGS_enable_per_range_hash_schemas;
     case MasterFeatures::UPSERT_IGNORE:
       return FLAGS_master_support_upsert_ignore_operations;
+    case MasterFeatures::IMMUTABLE_COLUMN_ATTRIBUTE:
+      return FLAGS_master_support_immutable_column_attribute;
     default:
       return false;
   }
diff --git a/src/kudu/tablet/ops/write_op.cc b/src/kudu/tablet/ops/write_op.cc
index 7ee5af815..e9e85e5fc 100644
--- a/src/kudu/tablet/ops/write_op.cc
+++ b/src/kudu/tablet/ops/write_op.cc
@@ -509,7 +509,11 @@ void WriteOpState::UpdateMetricsForOp(const RowOp& op) {
     case RowOperationsPB::UPSERT_IGNORE:
       if (op.error_ignored) {
         op_metrics_.upsert_ignore_errors++;
-      } else {
+      }
+      // This op may be completed even if it's error_ignored. It make sense
+      // when attempting to update immutable cells, the rest of cells may be 
updated
+      // except the immutable cells.
+      if (!op.failed) {
         op_metrics_.successful_upserts++;
       }
       break;
@@ -520,7 +524,11 @@ void WriteOpState::UpdateMetricsForOp(const RowOp& op) {
     case RowOperationsPB::UPDATE_IGNORE:
       if (op.error_ignored) {
         op_metrics_.update_ignore_errors++;
-      } else {
+      }
+      // This op may be completed even if it's error_ignored. It make sense
+      // when attempting to update immutable cells, the rest of cells may be 
updated
+      // except the immutable cells.
+      if (!op.failed) {
         op_metrics_.successful_updates++;
       }
       break;
diff --git a/src/kudu/tablet/row_op.cc b/src/kudu/tablet/row_op.cc
index f112f5ba5..11cd5abe1 100644
--- a/src/kudu/tablet/row_op.cc
+++ b/src/kudu/tablet/row_op.cc
@@ -48,6 +48,7 @@ void RowOp::SetFailed(const Status& s) {
   DCHECK(!result) << SecureDebugString(*result);
   result = 
google::protobuf::Arena::CreateMessage<OperationResultPB>(pb_arena_);
   StatusToPB(s, result->mutable_failed_status());
+  failed = true;
 }
 
 void RowOp::SetInsertSucceeded(int mrs_id) {
@@ -68,6 +69,7 @@ void RowOp::SetErrorIgnored() {
   DCHECK(!result) << SecureDebugString(*result);
   result = 
google::protobuf::Arena::CreateMessage<OperationResultPB>(pb_arena_);
   error_ignored = true;
+  failed = true;
 }
 
 void RowOp::SetMutateSucceeded(OperationResultPB* result) {
diff --git a/src/kudu/tablet/row_op.h b/src/kudu/tablet/row_op.h
index 5592eaedf..285cc8e8f 100644
--- a/src/kudu/tablet/row_op.h
+++ b/src/kudu/tablet/row_op.h
@@ -102,6 +102,9 @@ struct RowOp {
   // True if an ignore op was ignored due to an error.
   bool error_ignored = false;
 
+  // True if this op has any error occured and failed to compelte this op.
+  bool failed = false;
+
   // The RowSet in which this op's key has been found present and alive.
   // This will be null if 'checked_present' is false, or if it has been
   // checked and found not to be alive in any RowSet.
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index f4e5340d4..592c7c868 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -738,18 +738,8 @@ Status Tablet::InsertOrUpsertUnlocked(const IOContext* 
io_context,
   if (op->present_in_rowset) {
     switch (op_type) {
       case RowOperationsPB::UPSERT:
-      case RowOperationsPB::UPSERT_IGNORE: {
-        Status s = ApplyUpsertAsUpdate(io_context, op_state, op, 
op->present_in_rowset, stats);
-        if (s.ok()) {
-          return Status::OK();
-        }
-        if (s.IsImmutable() && op_type == RowOperationsPB::UPSERT_IGNORE) {
-          op->SetErrorIgnored();
-          return Status::OK();
-        }
-        op->SetFailed(s);
-        return s;
-      }
+      case RowOperationsPB::UPSERT_IGNORE:
+        return ApplyUpsertAsUpdate(io_context, op_state, op, 
op->present_in_rowset, stats);
       case RowOperationsPB::INSERT_IGNORE:
         op->SetErrorIgnored();
         return Status::OK();
@@ -820,18 +810,8 @@ Status Tablet::InsertOrUpsertUnlocked(const IOContext* 
io_context,
     if (s.IsAlreadyPresent()) {
       switch (op_type) {
         case RowOperationsPB::UPSERT:
-        case RowOperationsPB::UPSERT_IGNORE: {
-          Status s = ApplyUpsertAsUpdate(io_context, op_state, op, 
comps->memrowset.get(), stats);
-          if (s.ok()) {
-            return Status::OK();
-          }
-          if (s.IsImmutable() && op_type == RowOperationsPB::UPSERT_IGNORE) {
-            op->SetErrorIgnored();
-            return Status::OK();
-          }
-          op->SetFailed(s);
-          return s;
-        }
+        case RowOperationsPB::UPSERT_IGNORE:
+          return ApplyUpsertAsUpdate(io_context, op_state, op, 
comps->memrowset.get(), stats);
         case RowOperationsPB::INSERT_IGNORE:
           op->SetErrorIgnored();
           return Status::OK();
@@ -855,7 +835,6 @@ Status Tablet::ApplyUpsertAsUpdate(const IOContext* 
io_context,
                                    RowSet* rowset,
                                    ProbeStats* stats) {
   const auto op_type = upsert->decoded_op.type;
-  bool error_ignored = false;
   const auto* schema = this->schema().get();
   ConstContiguousRow row(schema, upsert->decoded_op.row_data);
   faststring buf;
@@ -868,10 +847,16 @@ Status Tablet::ApplyUpsertAsUpdate(const IOContext* 
io_context,
     const auto& c = schema->column(i);
     if (c.is_immutable()) {
       if (op_type == RowOperationsPB::UPSERT) {
-        return Status::Immutable("UPDATE not allowed for immutable column", 
c.ToString());
+        Status s = Status::Immutable("UPDATE not allowed for immutable 
column", c.ToString());
+        upsert->SetFailed(s);
+        return s;
       }
       DCHECK_EQ(op_type, RowOperationsPB::UPSERT_IGNORE);
-      error_ignored = true;
+      // Only set upsert->error_ignored flag instead of calling 
SetErrorIgnored() to avoid setting
+      // upsert->result which can be set only once. Then the upsert operation 
can be continued to
+      // mutate the other cells even if the current cell has been skipped, the 
upsert->result can be
+      // set normally in the next steps.
+      upsert->error_ignored = true;
       continue;
     }
 
@@ -887,9 +872,6 @@ Status Tablet::ApplyUpsertAsUpdate(const IOContext* 
io_context,
       op_state->pb_arena());
   if (enc.is_empty()) {
     upsert->SetMutateSucceeded(result);
-    if (error_ignored) {
-      upsert->error_ignored = true;
-    }
     return Status::OK();
   }
 
diff --git a/src/kudu/tablet/tablet_metrics.cc 
b/src/kudu/tablet/tablet_metrics.cc
index fd6a7c7bb..b9c1e9c89 100644
--- a/src/kudu/tablet/tablet_metrics.cc
+++ b/src/kudu/tablet/tablet_metrics.cc
@@ -45,9 +45,10 @@ METRIC_DEFINE_counter(tablet, upsert_ignore_errors, "Upsert 
Ignore Errors",
                       "Number of upsert ignore operations for this tablet 
which were "
                       "ignored due to an error since service start. This 
metric counts "
                       "the number of attempts to update a present row by 
changing the "
-                      "value of any of its immutable cells.  Note that the 
rest of the "
+                      "value of any of its immutable cells. Note that the rest 
of the "
                       "cells (i.e. the mutable ones) in such case are updated 
accordingly "
-                      "to the operation's data.",
+                      "to the operation's data,and rows_upserted will be 
counted too if "
+                      "upsert successfully.",
                       kudu::MetricLevel::kDebug);
 METRIC_DEFINE_counter(tablet, rows_updated, "Rows Updated",
     kudu::MetricUnit::kRows,
@@ -56,7 +57,11 @@ METRIC_DEFINE_counter(tablet, rows_updated, "Rows Updated",
 METRIC_DEFINE_counter(tablet, update_ignore_errors, "Update Ignore Errors",
                       kudu::MetricUnit::kRows,
                       "Number of update ignore operations for this tablet 
which were "
-                      "ignored due to an error since service start",
+                      "ignored due to an error since service start. Note that 
when "
+                      "ignoring to update the immutable cells, the rest of the 
cells "
+                      "(i.e. the mutable ones) in such case are updated 
accordingly to "
+                      "the operation's data,and rows_updated will be counted 
too if "
+                      "update successfully.",
                       kudu::MetricLevel::kDebug);
 METRIC_DEFINE_counter(tablet, rows_deleted, "Rows Deleted",
     kudu::MetricUnit::kRows,

Reply via email to