This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 39302dfa8edc1fd1268f78f3ddd3f34ca26cc43f
Author: Abhishek Chennaka <[email protected]>
AuthorDate: Thu Jun 15 21:36:58 2023 -0700

    [client] KUDU-1945 Add UPSERT support
    
    This patch adds UPSERT support to Kudu C++ and Java clients by removing
    the previously added checks to discard UPSERT operations. The functionality
    added is only supported if the entire primary key is provided including
    the auto-incrementing column.
    
    Also added verification to reject INSERT operations with
    auto-incrementing column set on the client side.
    
    Thanks to Marton Greber for diagnosing the python client side issue.
    
    Change-Id: I27a95e3a6b1d1b584cad849978313b3c8222cd3d
    Reviewed-on: http://gerrit.cloudera.org:8080/20083
    Tested-by: Kudu Jenkins
    Reviewed-by: Marton Greber <[email protected]>
    Reviewed-by: Alexey Serbin <[email protected]>
---
 .../org/apache/kudu/client/AsyncKuduSession.java   |   9 ++
 .../java/org/apache/kudu/client/KuduTable.java     |   8 --
 .../org/apache/kudu/client/TestKuduClient.java     |  84 ++++++++++---
 python/kudu/tests/test_client.py                   |  29 +++--
 src/kudu/client/client-test.cc                     | 103 ++++++++++------
 src/kudu/client/session-internal.cc                |  38 +++---
 src/kudu/common/partial_row.cc                     |   6 +
 src/kudu/common/partial_row.h                      |   4 +
 .../integration-tests/auto_incrementing-itest.cc   | 131 ++++++++++++++++++++-
 9 files changed, 319 insertions(+), 93 deletions(-)

diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
index 61289ee70..935eac318 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
@@ -45,6 +45,7 @@ import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.kudu.Schema;
 import org.apache.kudu.client.AsyncKuduClient.LookupType;
 import org.apache.kudu.util.AsyncUtil;
 import org.apache.kudu.util.LogThrottler;
@@ -677,6 +678,14 @@ public class AsyncKuduSession implements 
SessionConfiguration {
     Preconditions.checkArgument(operation.getTable().getAsyncClient() == 
client,
         "Applied operations must be created from a KuduTable instance opened " 
+
         "from the same client that opened this KuduSession");
+    // We do not want to have auto-incrementing column set for INSERT 
operations.
+    if (operation.getRow().getSchema().hasAutoIncrementingColumn() &&
+            operation.getRow().isSet(Schema.getAutoIncrementingColumnName()) &&
+            (operation.getChangeType() == Operation.ChangeType.INSERT ||
+                 operation.getChangeType() == 
Operation.ChangeType.INSERT_IGNORE)) {
+      throw new IllegalArgumentException("Auto-Incrementing column should not 
" +
+              "be specified for INSERT operation");
+    }
     if (closed) {
       // Ideally this would be a precondition, but that may break existing
       // clients who have grown to rely on this unsafe behavior.
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java
index 97672ecc1..d8838c1aa 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java
@@ -188,10 +188,6 @@ public class KuduTable {
    * @throws UnsupportedOperationException if the table has auto-incrementing 
column
    */
   public Upsert newUpsert() {
-    if (schema.hasAutoIncrementingColumn()) {
-      throw new UnsupportedOperationException(
-          "Tables with auto-incrementing column do not support UPSERT 
operations");
-    }
     return new Upsert(this);
   }
 
@@ -203,10 +199,6 @@ public class KuduTable {
    * @throws UnsupportedOperationException if the table has auto-incrementing 
column
    */
   public UpsertIgnore newUpsertIgnore() {
-    if (schema.hasAutoIncrementingColumn()) {
-      throw new UnsupportedOperationException(
-          "Tables with auto-incrementing column do not support UPSERT_IGNORE 
operations");
-    }
     return new UpsertIgnore(this);
   }
 
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index a3138a33a..acb243cab 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -871,6 +871,29 @@ public class TestKuduClient {
       assertEquals(expectedRow.toString(), rowStrings.get(i));
     }
 
+    // Upsert rows into the table after assigning values for the 
auto-incrementing
+    // column. The first three rows will be applied as updates and the next 
three as
+    // inserts.
+    for (int i = 0; i < 6; i++) {
+      Upsert upsert = table.newUpsert();
+      row = upsert.getRow();
+      row.addInt("key", i);
+      row.addLong(Schema.getAutoIncrementingColumnName(), i + 1);
+      row.addInt("c1", i * 20);
+      session.apply(upsert);
+    }
+    session.flush();
+
+    // Scan all the rows in the table with all columns.
+    // Verify that the auto-incrementing column is included in the rows.
+    rowStrings = scanTableToStrings(table);
+    assertEquals(6, rowStrings.size());
+    for (int i = 0; i < rowStrings.size(); i++) {
+      String expectedRow = String.format("INT32 key=%d, INT64 %s=%d, INT32 
c1=%d",
+              i, Schema.getAutoIncrementingColumnName(), i + 1, i * 20);
+      assertEquals(expectedRow, rowStrings.get(i));
+    }
+
     // Delete the first row with "key" and auto-incrementing columns.
     // Verify that number of rows is decreased by 1.
     Delete delete = table.newDelete();
@@ -879,7 +902,7 @@ public class TestKuduClient {
     row.addLong(schema.getColumnByIndex(1).getName(), 1);
     session.apply(delete);
     session.flush();
-    assertEquals(2, countRowsInScan(client.newScannerBuilder(table).build()));
+    assertEquals(5, countRowsInScan(client.newScannerBuilder(table).build()));
 
     // Check that we can delete the table.
     client.deleteTable(TABLE_NAME);
@@ -905,23 +928,21 @@ public class TestKuduClient {
     schema = table.getSchema();
     assertTrue(schema.hasAutoIncrementingColumn());
 
-    // Verify that UPSERT is not allowed for table with auto-incrementing 
column
-    try {
-      table.newUpsert();
-      fail("UPSERT on table with auto-incrementing column");
-    } catch (UnsupportedOperationException e) {
-      assertTrue(e.getMessage().contains(
-          "Tables with auto-incrementing column do not support UPSERT 
operations"));
-    }
-
-    // Verify that UPSERT_IGNORE is not allowed for table with 
auto-incrementing column
-    try {
-      table.newUpsertIgnore();
-      fail("UPSERT_IGNORE on table with auto-incrementing column");
-    } catch (UnsupportedOperationException e) {
-      assertTrue(e.getMessage().contains(
-          "Tables with auto-incrementing column do not support UPSERT_IGNORE 
operations"));
-    }
+    // Verify that UPSERT is allowed for table with auto-incrementing column
+    Upsert upsert = table.newUpsert();
+    PartialRow rowUpsert = upsert.getRow();
+    rowUpsert.addInt("key", 0);
+    rowUpsert.addLong(Schema.getAutoIncrementingColumnName(), 1);
+    rowUpsert.addInt("c1", 10);
+    session.apply(upsert);
+
+    // Verify that UPSERT_IGNORE is allowed for table with auto-incrementing 
column
+    UpsertIgnore upsertIgnore = table.newUpsertIgnore();
+    PartialRow rowUpsertIgnore = upsertIgnore.getRow();
+    rowUpsertIgnore.addInt("key", 1);
+    rowUpsertIgnore.addLong(Schema.getAutoIncrementingColumnName(), 2);
+    rowUpsertIgnore.addInt("c1", 20);
+    session.apply(upsertIgnore);
 
     // Change desired block size for auto-incrementing column
     client.alterTable(TABLE_NAME, new 
AlterTableOptions().changeDesiredBlockSize(
@@ -934,6 +955,33 @@ public class TestKuduClient {
         Schema.getAutoIncrementingColumnName(), 
ColumnSchema.CompressionAlgorithm.NO_COMPRESSION));
     session.flush();
 
+    // Verify that auto-incrementing column value cannot be specified in an 
INSERT operation.
+    try {
+      Insert insert = table.newInsert();
+      PartialRow row = insert.getRow();
+      row.addInt("key", 1);
+      row.addLong(Schema.getAutoIncrementingColumnName(), 1);
+      row.addInt("c1", 10);
+      session.apply(insert);
+      fail("INSERT on table with auto-incrementing column set");
+    } catch (KuduException ex) {
+      assertTrue(ex.getMessage().contains("Auto-Incrementing column should not 
" +
+          "be specified for INSERT operation"));
+    }
+
+    // Verify that auto-incrementing column value cannot be specified in an 
INSERT_IGNORE operation.
+    try {
+      InsertIgnore insertIgnore = table.newInsertIgnore();
+      PartialRow row = insertIgnore.getRow();
+      row.addInt("key", 1);
+      row.addLong(Schema.getAutoIncrementingColumnName(), 1);
+      row.addInt("c1", 10);
+      session.apply(insertIgnore);
+      fail("INSERT on table with auto-incrementing column set");
+    } catch (KuduException ex) {
+      assertTrue(ex.getMessage().contains("Auto-Incrementing column should not 
" +
+          "be specified for INSERT operation"));
+    }
     // Verify that auto-incrementing column cannot be added
     try {
       client.alterTable(TABLE_NAME, new AlterTableOptions().addColumn(
diff --git a/python/kudu/tests/test_client.py b/python/kudu/tests/test_client.py
index 6d60508de..020dec96a 100755
--- a/python/kudu/tests/test_client.py
+++ b/python/kudu/tests/test_client.py
@@ -470,22 +470,29 @@ class TestClient(KuduTestBase, CompatUnitTest):
             table = self.client.table(table_name)
             session = self.client.new_session()
 
-            # Insert with auto incrementing column specified
+            # Insert with auto-incrementing column not specified
             op = table.new_insert()
             op['key'] = 1
+            session.apply(op)
+            session.flush()
+
+            # TODO: the below test segfaults(KUDU-3454)
+            # # Insert with auto incrementing column specified
+            # op = table.new_insert()
+            # op['key'] = 1
+            # op[Schema.get_auto_incrementing_column_name()] = 1
+            # error_msg = 'should not be specified for Insert operation'
+            # with self.assertRaisesRegex(KuduBadStatus, error_msg):
+            #     session.apply(op)
+
+            # Upsert with auto-incrementing column specified
+            op = table.new_upsert()
+            op['key'] = 1
             op[Schema.get_auto_incrementing_column_name()] = 1
             session.apply(op)
-            try:
-                session.flush()
-            except KuduBadStatus:
-                message = 'should not be set for'
-                errors, overflow = session.get_pending_errors()
-                assert not overflow
-                assert len(errors) == 1
-                assert message in repr(errors[0])
 
-            # TODO: Upsert should be rejected as of now. However the test 
segfaults: KUDU-3454
-            # TODO: Upsert ignore should be rejected. Once Python client 
supports upsert ignore.
+            # TODO: once upsert_ignore is supported by the Python client,
+            # check if specifying all the key columns works.
 
             # With non-unique primary key, one can't use the tuple/list 
initialization for new
             # inserts. In this case, at the second position it would like to 
get an int64 (the type
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 41e5f694a..8608c21c5 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -9865,7 +9865,7 @@ class ClientTestAutoIncrementingColumn : public 
ClientTest {
   }
 };
 
-TEST_F(ClientTestAutoIncrementingColumn, ReadAndWrite) {
+TEST_F(ClientTestAutoIncrementingColumn, ReadAndWriteUsingInserts) {
   const string kTableName = "table_with_auto_incrementing_column";
   KuduSchemaBuilder b;
   
b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->NonUniquePrimaryKey();
@@ -9924,6 +9924,70 @@ TEST_F(ClientTestAutoIncrementingColumn, ReadAndWrite) {
   }
 }
 
+TEST_F(ClientTestAutoIncrementingColumn, ReadAndWriteUsingUpserts) {
+  const string kTableName = "concurrent_writes_auto_incrementing_column";
+  KuduSchemaBuilder b;
+  
b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->NonUniquePrimaryKey();
+  ASSERT_OK(b.Build(&schema_));
+
+  // Create a table with a single range partition.
+  static constexpr int lower_bound = 0;
+  static constexpr int upper_bound = 20;
+  unique_ptr<KuduPartialRow> lower(schema_.NewRow());
+  unique_ptr<KuduPartialRow> upper(schema_.NewRow());
+
+  ASSERT_OK(lower->SetInt32("key", lower_bound));
+  ASSERT_OK(upper->SetInt32("key", upper_bound));
+
+  unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+  ASSERT_OK(table_creator->table_name(kTableName)
+                .schema(&schema_)
+                .set_range_partition_columns({"key"})
+                .add_range_partition(lower.release(), upper.release())
+                .num_replicas(3)
+                .Create());
+
+  // Write into these two partitions specifying values for
+  // auto-incrementing column using UPSERT and UPSERT_IGNORE.
+  shared_ptr<KuduSession> session = client_->NewSession();
+  shared_ptr<KuduTable> table;
+  ASSERT_OK(client_->OpenTable(kTableName, &table));
+  static constexpr auto kNumRows = 20;
+  // Iterate twice Upserting the same data so that the first iteration would 
have
+  // Upserts as Inserts and in the next iteration Upserts are Updates.
+  for (int j = 0; j < 2; j++) {
+    for (int i = 0; i < kNumRows; i+=2) {
+      unique_ptr<KuduUpsert> upsert(table->NewUpsert());
+      KuduPartialRow* row_upsert = upsert->mutable_row();
+      ASSERT_OK(row_upsert->SetInt32("key", i));
+      ASSERT_OK(row_upsert->SetInt64(Schema::GetAutoIncrementingColumnName(), 
i + 1));
+      ASSERT_OK(session->Apply(upsert.release()));
+
+      unique_ptr<KuduUpsertIgnore> upsert_ignore(table->NewUpsertIgnore());
+      KuduPartialRow* row_upsert_ignore = upsert_ignore->mutable_row();
+      ASSERT_OK(row_upsert_ignore->SetInt32("key", i + 1));
+      
ASSERT_OK(row_upsert_ignore->SetInt64(Schema::GetAutoIncrementingColumnName(), 
i + 2));
+      ASSERT_OK(session->Apply(upsert_ignore.release()));
+    }
+  }
+  FlushSessionOrDie(session);
+
+  // Read back the rows and confirm the values of auto-incrementing column set
+  // correctly for each of the partitions in different scan modes.
+  for (const auto mode: {KuduClient::LEADER_ONLY, KuduClient::CLOSEST_REPLICA,
+                         KuduClient::FIRST_REPLICA}) {
+    vector<string> rows;
+    KuduScanner scanner(table.get());
+    ASSERT_OK(scanner.SetSelection(mode));
+    ASSERT_OK(ScanToStrings(&scanner, &rows));
+    ASSERT_EQ(kNumRows, rows.size());
+    for (int i = 0; i < rows.size(); i++) {
+      ASSERT_EQ(Substitute("(int32 key=$0, int64 auto_incrementing_id=$1)", i,
+                           i + 1), rows[i]);
+    }
+  }
+}
+
 TEST_F(ClientTestAutoIncrementingColumn, ConcurrentWrites) {
   const string kTableName = "concurrent_writes_auto_incrementing_column";
   KuduSchemaBuilder b;
@@ -10102,43 +10166,6 @@ TEST_F(ClientTestAutoIncrementingColumn, 
AlterOperationNegatives) {
   }
 }
 
-TEST_F(ClientTestAutoIncrementingColumn, InsertOperationNegatives) {
-  const string kTableName = 
"insert_operation_negatives_auto_incrementing_column";
-  KuduSchemaBuilder b;
-  // Create a schema with non-unique PK, such that auto incrementing col is 
present.
-  
b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->NonUniquePrimaryKey();
-  ASSERT_OK(b.Build(&schema_));
-
-  unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
-  ASSERT_OK(table_creator->table_name(kTableName)
-            .schema(&schema_)
-            .add_hash_partitions({"key"}, 2)
-            .num_replicas(3)
-            .Create());
-
-  shared_ptr<KuduSession> session(client_->NewSession());
-  shared_ptr<KuduTable> table;
-  client_->OpenTable(kTableName, &table);
-
-  {
-    unique_ptr<KuduUpsert> op(table->NewUpsert());
-    auto* row = op->mutable_row();
-    ASSERT_OK(row->SetInt32("key", 1));
-    ASSERT_STR_CONTAINS(session->Apply(op.release()).ToString(),
-                        "Illegal state: this type of write operation is not 
supported on table "
-                        "with auto-incrementing column");
-  }
-
-  {
-    unique_ptr<KuduUpsertIgnore> op(table->NewUpsertIgnore());
-    auto* row = op->mutable_row();
-    ASSERT_OK(row->SetInt32("key", 1));
-    ASSERT_STR_CONTAINS(session->Apply(op.release()).ToString(),
-                        "Illegal state: this type of write operation is not 
supported on table "
-                        "with auto-incrementing column");
-  }
-}
-
 TEST_F(ClientTestAutoIncrementingColumn, CreateTableFeatureFlag) {
   FLAGS_master_support_auto_incrementing_column = false;
   const string kTableName = 
"create_table_with_auto_incrementing_column_feature_flag";
diff --git a/src/kudu/client/session-internal.cc 
b/src/kudu/client/session-internal.cc
index 88d507b0b..2e33de3ac 100644
--- a/src/kudu/client/session-internal.cc
+++ b/src/kudu/client/session-internal.cc
@@ -352,6 +352,24 @@ Status CheckForNonUniquePrimaryKey(const 
KuduWriteOperation& op) {
   return Status::OK();
 }
 
+// Ensure the auto-incrementing column is not set for the Insert operation.
+Status CheckAutoIncrementingColumnForInsert(const KuduWriteOperation& op) {
+  if (op.row().schema()->has_auto_incrementing() && 
op.row().IsAutoIncrementingColumnSet()) {
+    return Status::InvalidArgument("Auto-Incrementing column should not be "
+                                   "specified for INSERT operation");
+  }
+  return Status::OK();
+}
+
+// Ensure the auto-incrementing column is set for Non-Insert operations.
+Status CheckAutoIncrementingColumnForNonInsert(const KuduWriteOperation& op) {
+  if (op.row().schema()->has_auto_incrementing() && 
!op.row().IsAutoIncrementingColumnSet()) {
+    return Status::InvalidArgument("Auto-Incrementing column should be "
+                                   "specified for UPSERT/UPDATE operations");
+  }
+  return Status::OK();
+}
+
 // Check if the values for the non-nullable columns are present.
 Status CheckForNonNullableColumns(const KuduWriteOperation& op) {
   const auto& row = op.row();
@@ -368,16 +386,6 @@ Status CheckForNonNullableColumns(const 
KuduWriteOperation& op) {
   }
   return Status::OK();
 }
-
-Status CheckForAutoIncrementingColumn(const KuduWriteOperation& op) {
-  if (op.row().schema()->has_auto_incrementing()) {
-    return Status::IllegalState(
-        Substitute(
-            "this type of write operation is not supported on table with 
auto-incrementing column"),
-        KUDU_REDACT(op.ToString()));
-  }
-  return Status::OK();
-}
 } // anonymous namespace
 
 #define RETURN_NOT_OK_ADD_ERROR(_func, _op, _error_collector) \
@@ -396,18 +404,18 @@ Status 
KuduSession::Data::ValidateWriteOperation(KuduWriteOperation* op) const {
   } else {
     RETURN_NOT_OK_ADD_ERROR(CheckForPrimaryKey, op, error_collector_);
   }
-  // TODO(martongreber): UPSERT and UPSERT IGNORE are not supported initially 
for tables
-  // with a non-unique primary key. We plan to add this later.
   switch (op->type()) {
     case KuduWriteOperation::INSERT:
+    case KuduWriteOperation::INSERT_IGNORE:
+      RETURN_NOT_OK_ADD_ERROR(CheckAutoIncrementingColumnForInsert, op, 
error_collector_);
       RETURN_NOT_OK_ADD_ERROR(CheckForNonNullableColumns, op, 
error_collector_);
       break;
     case KuduWriteOperation::UPSERT:
       RETURN_NOT_OK_ADD_ERROR(CheckForNonNullableColumns, op, 
error_collector_);
-      RETURN_NOT_OK_ADD_ERROR(CheckForAutoIncrementingColumn, op, 
error_collector_);
-      break;
     case KuduWriteOperation::UPSERT_IGNORE:
-      RETURN_NOT_OK_ADD_ERROR(CheckForAutoIncrementingColumn, op, 
error_collector_);
+    case KuduWriteOperation::UPDATE:
+    case KuduWriteOperation::UPDATE_IGNORE:
+      RETURN_NOT_OK_ADD_ERROR(CheckAutoIncrementingColumnForNonInsert, op, 
error_collector_);
       break;
     default:
       // Nothing else to validate for other types of write operations.
diff --git a/src/kudu/common/partial_row.cc b/src/kudu/common/partial_row.cc
index 432f77e25..3f50d165c 100644
--- a/src/kudu/common/partial_row.cc
+++ b/src/kudu/common/partial_row.cc
@@ -902,6 +902,12 @@ bool KuduPartialRow::IsNonUniqueKeySet() const {
   return BitmapIsAllSet(isset_bitmap_, 0, schema_->num_key_columns() - 1);
 }
 
+bool KuduPartialRow::IsAutoIncrementingColumnSet() const {
+  DCHECK_GE(schema_->num_key_columns(), 1);
+  DCHECK(schema_->has_auto_incrementing());
+  return BitmapIsAllSet(isset_bitmap_, schema_->num_key_columns() - 1, 
schema_->num_key_columns());
+}
+
 std::string KuduPartialRow::ToString() const {
   ScopedDisableRedaction no_redaction;
 
diff --git a/src/kudu/common/partial_row.h b/src/kudu/common/partial_row.h
index 25f2efec8..06a70ddd8 100644
--- a/src/kudu/common/partial_row.h
+++ b/src/kudu/common/partial_row.h
@@ -634,6 +634,10 @@ class KUDU_EXPORT KuduPartialRow {
   ///   for this mutation.
   bool IsNonUniqueKeySet() const;
 
+  /// @return @c true if auto-incrementing column has been set
+  ///   for this mutation.
+  bool IsAutoIncrementingColumnSet() const;
+
   /// @return @c true if all column values have been set.
   bool AllColumnsSet() const;
 
diff --git a/src/kudu/integration-tests/auto_incrementing-itest.cc 
b/src/kudu/integration-tests/auto_incrementing-itest.cc
index 28879890f..8d72bd46f 100644
--- a/src/kudu/integration-tests/auto_incrementing-itest.cc
+++ b/src/kudu/integration-tests/auto_incrementing-itest.cc
@@ -59,6 +59,8 @@ using kudu::client::KuduSchemaBuilder;
 using kudu::client::KuduSession;
 using kudu::client::KuduTable;
 using kudu::client::KuduTableCreator;
+using kudu::client::KuduUpdate;
+using kudu::client::KuduUpsert;
 using kudu::client::sp::shared_ptr;
 using kudu::env_util::ListFilesInDir;
 using kudu::rpc::RpcController;
@@ -88,6 +90,7 @@ class AutoIncrementingItest : public KuduTest {
   Status CreateTableWithPartition() {
     KuduSchemaBuilder b;
     
b.AddColumn("c0")->Type(KuduColumnSchema::INT32)->NotNull()->NonUniquePrimaryKey();
+    b.AddColumn("c1")->Type(client::KuduColumnSchema::STRING)->NotNull();
     RETURN_NOT_OK(b.Build(&kudu_schema_));
 
     int lower_bound = 0;
@@ -115,12 +118,47 @@ class AutoIncrementingItest : public KuduTest {
       unique_ptr<KuduInsert> insert(table_->NewInsert());
       KuduPartialRow* row = insert->mutable_row();
       RETURN_NOT_OK(row->SetInt32("c0", i));
+      RETURN_NOT_OK(row->SetString("c1", "string_val"));
       RETURN_NOT_OK(session->Apply(insert.release()));
       SleepFor(MonoDelta::FromMilliseconds(sleep_millis));
     }
     return Status::OK();
   }
 
+  // Insert data into the above created table.
+  Status UpdateData(int start_c0_value, int end_c0_value, int sleep_millis = 0,
+                    const string& c1_val = "string_val") {
+    shared_ptr<KuduSession> session(client_->NewSession());
+        RETURN_NOT_OK(client_->OpenTable(kTableName, &table_));
+    for (int i = start_c0_value; i < end_c0_value; i++) {
+      unique_ptr<KuduUpdate> update(table_->NewUpdate());
+      KuduPartialRow* row = update->mutable_row();
+      RETURN_NOT_OK(row->SetInt32("c0", i));
+      RETURN_NOT_OK(row->SetInt64(Schema::GetAutoIncrementingColumnName(), i + 
1));
+      RETURN_NOT_OK(row->SetString("c1", c1_val));
+      RETURN_NOT_OK(session->Apply(update.release()));
+      SleepFor(MonoDelta::FromMilliseconds(sleep_millis));
+    }
+    return Status::OK();
+  }
+
+  // Upsert data into the above created table.
+  Status UpsertData(int start_c0_value, int end_c0_value, int sleep_millis = 0,
+                    const string& c1_val = "string_val") {
+    shared_ptr<KuduSession> session(client_->NewSession());
+        RETURN_NOT_OK(client_->OpenTable(kTableName, &table_));
+    for (int i = start_c0_value; i < end_c0_value; i++) {
+      unique_ptr<KuduUpsert> upsert(table_->NewUpsert());
+      KuduPartialRow* row = upsert->mutable_row();
+      RETURN_NOT_OK(row->SetInt32("c0", i));
+      RETURN_NOT_OK(row->SetInt64(Schema::GetAutoIncrementingColumnName(), i + 
1));
+      RETURN_NOT_OK(row->SetString("c1", c1_val));
+      RETURN_NOT_OK(session->Apply(upsert.release()));
+      SleepFor(MonoDelta::FromMilliseconds(sleep_millis));
+    }
+    return Status::OK();
+  }
+
   // Delete row based on the row values passed.
   Status DeleteRow(int c0_val, int auto_incrementing_id) {
     shared_ptr<KuduSession> session = client_->NewSession();
@@ -149,6 +187,7 @@ class AutoIncrementingItest : public KuduTest {
     Schema schema = Schema({ ColumnSchema("c0", INT32),
                              
ColumnSchema(Schema::GetAutoIncrementingColumnName(),
                                           INT64, false,false, true),
+                             ColumnSchema("c1", STRING),
                            },2);
     RETURN_NOT_OK(SchemaToColumnPBs(schema, 
scan->mutable_projected_columns()));
     RETURN_NOT_OK(cluster_->tserver_proxy(ts)->Scan(req, &resp, &rpc));
@@ -218,10 +257,96 @@ TEST_F(AutoIncrementingItest, BasicInserts) {
     vector<string> results;
     ASSERT_OK(ScanTablet(j, 
resp.status_and_schema(0).tablet_status().tablet_id(), &results));
     for (int i = 0; i < kNumRows; i++) {
-      ASSERT_EQ(Substitute("(int32 c0=$0, int64 $1=$2)", i,
+      ASSERT_EQ(Substitute("(int32 c0=$0, int64 $1=$2, string 
c1=\"string_val\")", i,
                            Schema::GetAutoIncrementingColumnName(), i + 1), 
results[i]);
     }
   }
+
+  // Update column c1 with a new string and verify the data
+  ASSERT_OK(UpdateData(0, kNumRows, 0, "val_string"));
+
+  // Scan all the tablet replicas and validate the results.
+  for (int j = 0; j < kNumTabletServers; j++) {
+    auto server = cluster_->tserver_proxy(j);
+    rpc::RpcController rpc;
+    tserver::ListTabletsRequestPB req;
+    tserver::ListTabletsResponsePB resp;
+    server->ListTablets(req, &resp, &rpc);
+    ASSERT_EQ(1, resp.status_and_schema_size());
+    vector<string> results;
+    ASSERT_OK(ScanTablet(j, 
resp.status_and_schema(0).tablet_status().tablet_id(), &results));
+    for (int i = 0; i < kNumRows; i++) {
+      ASSERT_EQ(Substitute("(int32 c0=$0, int64 $1=$2, string 
c1=\"val_string\")", i,
+                           Schema::GetAutoIncrementingColumnName(), i + 1), 
results[i]);
+    }
+  }
+}
+
+TEST_F(AutoIncrementingItest, BasicUpserts) {
+  cluster::ExternalMiniClusterOptions opts;
+  opts.num_tablet_servers = kNumTabletServers;
+  cluster_.reset(new cluster::ExternalMiniCluster(std::move(opts)));
+  ASSERT_OK(cluster_->Start());
+  ASSERT_OK(cluster_->CreateClient(nullptr, &client_));
+
+  // Create a table and upsert data. These will be inserts as there is no data 
present.
+  ASSERT_OK(CreateTableWithPartition());
+  ASSERT_OK(UpsertData(0, kNumRows));
+
+  // Scan all the tablet replicas and validate the results.
+  for (int j = 0; j < kNumTabletServers; j++) {
+    auto server = cluster_->tserver_proxy(j);
+    rpc::RpcController rpc;
+    tserver::ListTabletsRequestPB req;
+    tserver::ListTabletsResponsePB resp;
+    server->ListTablets(req, &resp, &rpc);
+    ASSERT_EQ(1, resp.status_and_schema_size());
+    vector<string> results;
+    ASSERT_OK(ScanTablet(j, 
resp.status_and_schema(0).tablet_status().tablet_id(), &results));
+    for (int i = 0; i < kNumRows; i++) {
+      ASSERT_EQ(Substitute("(int32 c0=$0, int64 $1=$2, string 
c1=\"string_val\")", i,
+                           Schema::GetAutoIncrementingColumnName(), i + 1), 
results[i]);
+    }
+  }
+
+  // Upsert data to the same rows written above but with a different c1 column 
value.
+  ASSERT_OK(UpsertData(0, kNumRows, 0, "val_string"));
+  // Scan all the tablet replicas and validate the results.
+  for (int j = 0; j < kNumTabletServers; j++) {
+    auto server = cluster_->tserver_proxy(j);
+    rpc::RpcController rpc;
+    tserver::ListTabletsRequestPB req;
+    tserver::ListTabletsResponsePB resp;
+    server->ListTablets(req, &resp, &rpc);
+    ASSERT_EQ(1, resp.status_and_schema_size());
+    vector<string> results;
+    ASSERT_OK(ScanTablet(j, 
resp.status_and_schema(0).tablet_status().tablet_id(), &results));
+    for (int i = 0; i < kNumRows; i++) {
+      ASSERT_EQ(Substitute("(int32 c0=$0, int64 $1=$2, string 
c1=\"val_string\")", i,
+                           Schema::GetAutoIncrementingColumnName(), i + 1), 
results[i]);
+    }
+  }
+}
+
+TEST_F(AutoIncrementingItest, TestNegatives) {
+  cluster::ExternalMiniClusterOptions opts;
+  opts.num_tablet_servers = kNumTabletServers;
+  cluster_.reset(new cluster::ExternalMiniCluster(std::move(opts)));
+  ASSERT_OK(cluster_->Start());
+  ASSERT_OK(cluster_->CreateClient(nullptr, &client_));
+
+  // Create a table and insert data with auto-incrementing column value 
present.
+  ASSERT_OK(CreateTableWithPartition());
+  shared_ptr<KuduSession> session(client_->NewSession());
+  ASSERT_OK(client_->OpenTable(kTableName, &table_));
+  unique_ptr<KuduInsert> insert(table_->NewInsert());
+  KuduPartialRow* row = insert->mutable_row();
+  ASSERT_OK(row->SetInt32("c0", 1));
+  ASSERT_OK(row->SetInt64("auto_incrementing_id", 1));
+  ASSERT_OK(row->SetString("c1", "string_val"));
+  Status s = session->Apply(insert.release());
+  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "Auto-Incrementing column should not be 
specified");
 }
 
 TEST_F(AutoIncrementingItest, BootstrapWithNoWals) {
@@ -262,7 +387,7 @@ TEST_F(AutoIncrementingItest, BootstrapWithNoWals) {
     ASSERT_OK(ScanTablet(j, tablet_uuid, &results));
     LOG(INFO) << "Results size: " << results.size();
     for (int i = 0; i < results.size(); i++) {
-      ASSERT_EQ(Substitute("(int32 c0=$0, int64 $1=$2)", i + 100,
+      ASSERT_EQ(Substitute("(int32 c0=$0, int64 $1=$2, string 
c1=\"string_val\")", i + 100,
                            Schema::GetAutoIncrementingColumnName(), i + 100 + 
1), results[i]);
 
     }
@@ -320,7 +445,7 @@ TEST_F(AutoIncrementingItest, BootstrapNoWalsNoData) {
     ASSERT_OK(ScanTablet(j, tablet_uuid, &results));
     ASSERT_EQ(200, results.size());
     for (int i = 0; i < results.size(); i++) {
-      ASSERT_EQ(Substitute("(int32 c0=$0, int64 $1=$2)", i + kNumRows,
+      ASSERT_EQ(Substitute("(int32 c0=$0, int64 $1=$2, string 
c1=\"string_val\")", i + kNumRows,
                            Schema::GetAutoIncrementingColumnName(), i + 1), 
results[i]);
     }
   }

Reply via email to