This is an automated email from the ASF dual-hosted git repository. laiyingchun pushed a commit to branch branch-1.17.x in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 14fd43ef9611e940e0fa59328183b3cf09506ca8 Author: Abhishek Chennaka <[email protected]> AuthorDate: Thu Jun 15 21:36:58 2023 -0700 [client] KUDU-1945 Add UPSERT support This patch adds UPSERT support to Kudu C++ and Java clients by removing the previously added checks to discard UPSERT operations. The functionality added is only supported if the entire primary key is provided including the auto-incrementing column. Also added verification to reject INSERT operations with auto-incrementing column set on the client side. Thanks to Marton Greber for diagnosing the python client side issue. Change-Id: I27a95e3a6b1d1b584cad849978313b3c8222cd3d Reviewed-on: http://gerrit.cloudera.org:8080/20083 Tested-by: Kudu Jenkins Reviewed-by: Marton Greber <[email protected]> Reviewed-by: Alexey Serbin <[email protected]> (cherry picked from commit 39302dfa8edc1fd1268f78f3ddd3f34ca26cc43f) Reviewed-on: http://gerrit.cloudera.org:8080/20228 Reviewed-by: Yifan Zhang <[email protected]> Tested-by: Yingchun Lai <[email protected]> --- .../org/apache/kudu/client/AsyncKuduSession.java | 9 ++ .../java/org/apache/kudu/client/KuduTable.java | 8 -- .../org/apache/kudu/client/TestKuduClient.java | 84 ++++++++++--- python/kudu/tests/test_client.py | 29 +++-- src/kudu/client/client-test.cc | 103 ++++++++++------ src/kudu/client/session-internal.cc | 38 +++--- src/kudu/common/partial_row.cc | 6 + src/kudu/common/partial_row.h | 4 + .../integration-tests/auto_incrementing-itest.cc | 131 ++++++++++++++++++++- 9 files changed, 319 insertions(+), 93 deletions(-) diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java index 9c0ea6912..aa449d391 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java @@ -44,6 +44,7 @@ import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.kudu.Schema; import org.apache.kudu.client.AsyncKuduClient.LookupType; import org.apache.kudu.util.AsyncUtil; import org.apache.kudu.util.LogThrottler; @@ -658,6 +659,14 @@ public class AsyncKuduSession implements SessionConfiguration { Preconditions.checkArgument(operation.getTable().getAsyncClient() == client, "Applied operations must be created from a KuduTable instance opened " + "from the same client that opened this KuduSession"); + // We do not want to have auto-incrementing column set for INSERT operations. + if (operation.getRow().getSchema().hasAutoIncrementingColumn() && + operation.getRow().isSet(Schema.getAutoIncrementingColumnName()) && + (operation.getChangeType() == Operation.ChangeType.INSERT || + operation.getChangeType() == Operation.ChangeType.INSERT_IGNORE)) { + throw new IllegalArgumentException("Auto-Incrementing column should not " + + "be specified for INSERT operation"); + } if (closed) { // Ideally this would be a precondition, but that may break existing // clients who have grown to rely on this unsafe behavior. diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java index 97672ecc1..d8838c1aa 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java @@ -188,10 +188,6 @@ public class KuduTable { * @throws UnsupportedOperationException if the table has auto-incrementing column */ public Upsert newUpsert() { - if (schema.hasAutoIncrementingColumn()) { - throw new UnsupportedOperationException( - "Tables with auto-incrementing column do not support UPSERT operations"); - } return new Upsert(this); } @@ -203,10 +199,6 @@ public class KuduTable { * @throws UnsupportedOperationException if the table has auto-incrementing column */ public UpsertIgnore newUpsertIgnore() { - if (schema.hasAutoIncrementingColumn()) { - throw new UnsupportedOperationException( - "Tables with auto-incrementing column do not support UPSERT_IGNORE operations"); - } return new UpsertIgnore(this); } diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java index a3138a33a..acb243cab 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java @@ -871,6 +871,29 @@ public class TestKuduClient { assertEquals(expectedRow.toString(), rowStrings.get(i)); } + // Upsert rows into the table after assigning values for the auto-incrementing + // column. The first three rows will be applied as updates and the next three as + // inserts. + for (int i = 0; i < 6; i++) { + Upsert upsert = table.newUpsert(); + row = upsert.getRow(); + row.addInt("key", i); + row.addLong(Schema.getAutoIncrementingColumnName(), i + 1); + row.addInt("c1", i * 20); + session.apply(upsert); + } + session.flush(); + + // Scan all the rows in the table with all columns. + // Verify that the auto-incrementing column is included in the rows. + rowStrings = scanTableToStrings(table); + assertEquals(6, rowStrings.size()); + for (int i = 0; i < rowStrings.size(); i++) { + String expectedRow = String.format("INT32 key=%d, INT64 %s=%d, INT32 c1=%d", + i, Schema.getAutoIncrementingColumnName(), i + 1, i * 20); + assertEquals(expectedRow, rowStrings.get(i)); + } + // Delete the first row with "key" and auto-incrementing columns. // Verify that number of rows is decreased by 1. Delete delete = table.newDelete(); @@ -879,7 +902,7 @@ public class TestKuduClient { row.addLong(schema.getColumnByIndex(1).getName(), 1); session.apply(delete); session.flush(); - assertEquals(2, countRowsInScan(client.newScannerBuilder(table).build())); + assertEquals(5, countRowsInScan(client.newScannerBuilder(table).build())); // Check that we can delete the table. client.deleteTable(TABLE_NAME); @@ -905,23 +928,21 @@ public class TestKuduClient { schema = table.getSchema(); assertTrue(schema.hasAutoIncrementingColumn()); - // Verify that UPSERT is not allowed for table with auto-incrementing column - try { - table.newUpsert(); - fail("UPSERT on table with auto-incrementing column"); - } catch (UnsupportedOperationException e) { - assertTrue(e.getMessage().contains( - "Tables with auto-incrementing column do not support UPSERT operations")); - } - - // Verify that UPSERT_IGNORE is not allowed for table with auto-incrementing column - try { - table.newUpsertIgnore(); - fail("UPSERT_IGNORE on table with auto-incrementing column"); - } catch (UnsupportedOperationException e) { - assertTrue(e.getMessage().contains( - "Tables with auto-incrementing column do not support UPSERT_IGNORE operations")); - } + // Verify that UPSERT is allowed for table with auto-incrementing column + Upsert upsert = table.newUpsert(); + PartialRow rowUpsert = upsert.getRow(); + rowUpsert.addInt("key", 0); + rowUpsert.addLong(Schema.getAutoIncrementingColumnName(), 1); + rowUpsert.addInt("c1", 10); + session.apply(upsert); + + // Verify that UPSERT_IGNORE is allowed for table with auto-incrementing column + UpsertIgnore upsertIgnore = table.newUpsertIgnore(); + PartialRow rowUpsertIgnore = upsertIgnore.getRow(); + rowUpsertIgnore.addInt("key", 1); + rowUpsertIgnore.addLong(Schema.getAutoIncrementingColumnName(), 2); + rowUpsertIgnore.addInt("c1", 20); + session.apply(upsertIgnore); // Change desired block size for auto-incrementing column client.alterTable(TABLE_NAME, new AlterTableOptions().changeDesiredBlockSize( @@ -934,6 +955,33 @@ public class TestKuduClient { Schema.getAutoIncrementingColumnName(), ColumnSchema.CompressionAlgorithm.NO_COMPRESSION)); session.flush(); + // Verify that auto-incrementing column value cannot be specified in an INSERT operation. + try { + Insert insert = table.newInsert(); + PartialRow row = insert.getRow(); + row.addInt("key", 1); + row.addLong(Schema.getAutoIncrementingColumnName(), 1); + row.addInt("c1", 10); + session.apply(insert); + fail("INSERT on table with auto-incrementing column set"); + } catch (KuduException ex) { + assertTrue(ex.getMessage().contains("Auto-Incrementing column should not " + + "be specified for INSERT operation")); + } + + // Verify that auto-incrementing column value cannot be specified in an INSERT_IGNORE operation. + try { + InsertIgnore insertIgnore = table.newInsertIgnore(); + PartialRow row = insertIgnore.getRow(); + row.addInt("key", 1); + row.addLong(Schema.getAutoIncrementingColumnName(), 1); + row.addInt("c1", 10); + session.apply(insertIgnore); + fail("INSERT on table with auto-incrementing column set"); + } catch (KuduException ex) { + assertTrue(ex.getMessage().contains("Auto-Incrementing column should not " + + "be specified for INSERT operation")); + } // Verify that auto-incrementing column cannot be added try { client.alterTable(TABLE_NAME, new AlterTableOptions().addColumn( diff --git a/python/kudu/tests/test_client.py b/python/kudu/tests/test_client.py index d3bb965e0..3a1e43e9e 100755 --- a/python/kudu/tests/test_client.py +++ b/python/kudu/tests/test_client.py @@ -461,22 +461,29 @@ class TestClient(KuduTestBase, CompatUnitTest): table = self.client.table(table_name) session = self.client.new_session() - # Insert with auto incrementing column specified + # Insert with auto-incrementing column not specified op = table.new_insert() op['key'] = 1 + session.apply(op) + session.flush() + + # TODO: the below test segfaults(KUDU-3454) + # # Insert with auto incrementing column specified + # op = table.new_insert() + # op['key'] = 1 + # op[Schema.get_auto_incrementing_column_name()] = 1 + # error_msg = 'should not be specified for Insert operation' + # with self.assertRaisesRegex(KuduBadStatus, error_msg): + # session.apply(op) + + # Upsert with auto-incrementing column specified + op = table.new_upsert() + op['key'] = 1 op[Schema.get_auto_incrementing_column_name()] = 1 session.apply(op) - try: - session.flush() - except KuduBadStatus: - message = 'is incorrectly set' - errors, overflow = session.get_pending_errors() - assert not overflow - assert len(errors) == 1 - assert message in repr(errors[0]) - # TODO: Upsert should be rejected as of now. However the test segfaults: KUDU-3454 - # TODO: Upsert ignore should be rejected. Once Python client supports upsert ignore. + # TODO: once upsert_ignore is supported by the Python client, + # check if specifying all the key columns works. # With non-unique primary key, one can't use the tuple/list initialization for new # inserts. In this case, at the second position it would like to get an int64 (the type diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc index 5070c67c4..02e7e0acb 100644 --- a/src/kudu/client/client-test.cc +++ b/src/kudu/client/client-test.cc @@ -9853,7 +9853,7 @@ class ClientTestAutoIncrementingColumn : public ClientTest { } }; -TEST_F(ClientTestAutoIncrementingColumn, ReadAndWrite) { +TEST_F(ClientTestAutoIncrementingColumn, ReadAndWriteUsingInserts) { const string kTableName = "table_with_auto_incrementing_column"; KuduSchemaBuilder b; b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->NonUniquePrimaryKey(); @@ -9912,6 +9912,70 @@ TEST_F(ClientTestAutoIncrementingColumn, ReadAndWrite) { } } +TEST_F(ClientTestAutoIncrementingColumn, ReadAndWriteUsingUpserts) { + const string kTableName = "concurrent_writes_auto_incrementing_column"; + KuduSchemaBuilder b; + b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->NonUniquePrimaryKey(); + ASSERT_OK(b.Build(&schema_)); + + // Create a table with a single range partition. + static constexpr int lower_bound = 0; + static constexpr int upper_bound = 20; + unique_ptr<KuduPartialRow> lower(schema_.NewRow()); + unique_ptr<KuduPartialRow> upper(schema_.NewRow()); + + ASSERT_OK(lower->SetInt32("key", lower_bound)); + ASSERT_OK(upper->SetInt32("key", upper_bound)); + + unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); + ASSERT_OK(table_creator->table_name(kTableName) + .schema(&schema_) + .set_range_partition_columns({"key"}) + .add_range_partition(lower.release(), upper.release()) + .num_replicas(3) + .Create()); + + // Write into these two partitions specifying values for + // auto-incrementing column using UPSERT and UPSERT_IGNORE. + shared_ptr<KuduSession> session = client_->NewSession(); + shared_ptr<KuduTable> table; + ASSERT_OK(client_->OpenTable(kTableName, &table)); + static constexpr auto kNumRows = 20; + // Iterate twice Upserting the same data so that the first iteration would have + // Upserts as Inserts and in the next iteration Upserts are Updates. + for (int j = 0; j < 2; j++) { + for (int i = 0; i < kNumRows; i+=2) { + unique_ptr<KuduUpsert> upsert(table->NewUpsert()); + KuduPartialRow* row_upsert = upsert->mutable_row(); + ASSERT_OK(row_upsert->SetInt32("key", i)); + ASSERT_OK(row_upsert->SetInt64(Schema::GetAutoIncrementingColumnName(), i + 1)); + ASSERT_OK(session->Apply(upsert.release())); + + unique_ptr<KuduUpsertIgnore> upsert_ignore(table->NewUpsertIgnore()); + KuduPartialRow* row_upsert_ignore = upsert_ignore->mutable_row(); + ASSERT_OK(row_upsert_ignore->SetInt32("key", i + 1)); + ASSERT_OK(row_upsert_ignore->SetInt64(Schema::GetAutoIncrementingColumnName(), i + 2)); + ASSERT_OK(session->Apply(upsert_ignore.release())); + } + } + FlushSessionOrDie(session); + + // Read back the rows and confirm the values of auto-incrementing column set + // correctly for each of the partitions in different scan modes. + for (const auto mode: {KuduClient::LEADER_ONLY, KuduClient::CLOSEST_REPLICA, + KuduClient::FIRST_REPLICA}) { + vector<string> rows; + KuduScanner scanner(table.get()); + ASSERT_OK(scanner.SetSelection(mode)); + ASSERT_OK(ScanToStrings(&scanner, &rows)); + ASSERT_EQ(kNumRows, rows.size()); + for (int i = 0; i < rows.size(); i++) { + ASSERT_EQ(Substitute("(int32 key=$0, int64 auto_incrementing_id=$1)", i, + i + 1), rows[i]); + } + } +} + TEST_F(ClientTestAutoIncrementingColumn, ConcurrentWrites) { const string kTableName = "concurrent_writes_auto_incrementing_column"; KuduSchemaBuilder b; @@ -10090,43 +10154,6 @@ TEST_F(ClientTestAutoIncrementingColumn, AlterOperationNegatives) { } } -TEST_F(ClientTestAutoIncrementingColumn, InsertOperationNegatives) { - const string kTableName = "insert_operation_negatives_auto_incrementing_column"; - KuduSchemaBuilder b; - // Create a schema with non-unique PK, such that auto incrementing col is present. - b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->NonUniquePrimaryKey(); - ASSERT_OK(b.Build(&schema_)); - - unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); - ASSERT_OK(table_creator->table_name(kTableName) - .schema(&schema_) - .add_hash_partitions({"key"}, 2) - .num_replicas(3) - .Create()); - - shared_ptr<KuduSession> session(client_->NewSession()); - shared_ptr<KuduTable> table; - client_->OpenTable(kTableName, &table); - - { - unique_ptr<KuduUpsert> op(table->NewUpsert()); - auto* row = op->mutable_row(); - ASSERT_OK(row->SetInt32("key", 1)); - ASSERT_STR_CONTAINS(session->Apply(op.release()).ToString(), - "Illegal state: this type of write operation is not supported on table " - "with auto-incrementing column"); - } - - { - unique_ptr<KuduUpsertIgnore> op(table->NewUpsertIgnore()); - auto* row = op->mutable_row(); - ASSERT_OK(row->SetInt32("key", 1)); - ASSERT_STR_CONTAINS(session->Apply(op.release()).ToString(), - "Illegal state: this type of write operation is not supported on table " - "with auto-incrementing column"); - } -} - TEST_F(ClientTestAutoIncrementingColumn, CreateTableFeatureFlag) { FLAGS_master_support_auto_incrementing_column = false; const string kTableName = "create_table_with_auto_incrementing_column_feature_flag"; diff --git a/src/kudu/client/session-internal.cc b/src/kudu/client/session-internal.cc index 88d507b0b..2e33de3ac 100644 --- a/src/kudu/client/session-internal.cc +++ b/src/kudu/client/session-internal.cc @@ -352,6 +352,24 @@ Status CheckForNonUniquePrimaryKey(const KuduWriteOperation& op) { return Status::OK(); } +// Ensure the auto-incrementing column is not set for the Insert operation. +Status CheckAutoIncrementingColumnForInsert(const KuduWriteOperation& op) { + if (op.row().schema()->has_auto_incrementing() && op.row().IsAutoIncrementingColumnSet()) { + return Status::InvalidArgument("Auto-Incrementing column should not be " + "specified for INSERT operation"); + } + return Status::OK(); +} + +// Ensure the auto-incrementing column is set for Non-Insert operations. +Status CheckAutoIncrementingColumnForNonInsert(const KuduWriteOperation& op) { + if (op.row().schema()->has_auto_incrementing() && !op.row().IsAutoIncrementingColumnSet()) { + return Status::InvalidArgument("Auto-Incrementing column should be " + "specified for UPSERT/UPDATE operations"); + } + return Status::OK(); +} + // Check if the values for the non-nullable columns are present. Status CheckForNonNullableColumns(const KuduWriteOperation& op) { const auto& row = op.row(); @@ -368,16 +386,6 @@ Status CheckForNonNullableColumns(const KuduWriteOperation& op) { } return Status::OK(); } - -Status CheckForAutoIncrementingColumn(const KuduWriteOperation& op) { - if (op.row().schema()->has_auto_incrementing()) { - return Status::IllegalState( - Substitute( - "this type of write operation is not supported on table with auto-incrementing column"), - KUDU_REDACT(op.ToString())); - } - return Status::OK(); -} } // anonymous namespace #define RETURN_NOT_OK_ADD_ERROR(_func, _op, _error_collector) \ @@ -396,18 +404,18 @@ Status KuduSession::Data::ValidateWriteOperation(KuduWriteOperation* op) const { } else { RETURN_NOT_OK_ADD_ERROR(CheckForPrimaryKey, op, error_collector_); } - // TODO(martongreber): UPSERT and UPSERT IGNORE are not supported initially for tables - // with a non-unique primary key. We plan to add this later. switch (op->type()) { case KuduWriteOperation::INSERT: + case KuduWriteOperation::INSERT_IGNORE: + RETURN_NOT_OK_ADD_ERROR(CheckAutoIncrementingColumnForInsert, op, error_collector_); RETURN_NOT_OK_ADD_ERROR(CheckForNonNullableColumns, op, error_collector_); break; case KuduWriteOperation::UPSERT: RETURN_NOT_OK_ADD_ERROR(CheckForNonNullableColumns, op, error_collector_); - RETURN_NOT_OK_ADD_ERROR(CheckForAutoIncrementingColumn, op, error_collector_); - break; case KuduWriteOperation::UPSERT_IGNORE: - RETURN_NOT_OK_ADD_ERROR(CheckForAutoIncrementingColumn, op, error_collector_); + case KuduWriteOperation::UPDATE: + case KuduWriteOperation::UPDATE_IGNORE: + RETURN_NOT_OK_ADD_ERROR(CheckAutoIncrementingColumnForNonInsert, op, error_collector_); break; default: // Nothing else to validate for other types of write operations. diff --git a/src/kudu/common/partial_row.cc b/src/kudu/common/partial_row.cc index 432f77e25..3f50d165c 100644 --- a/src/kudu/common/partial_row.cc +++ b/src/kudu/common/partial_row.cc @@ -902,6 +902,12 @@ bool KuduPartialRow::IsNonUniqueKeySet() const { return BitmapIsAllSet(isset_bitmap_, 0, schema_->num_key_columns() - 1); } +bool KuduPartialRow::IsAutoIncrementingColumnSet() const { + DCHECK_GE(schema_->num_key_columns(), 1); + DCHECK(schema_->has_auto_incrementing()); + return BitmapIsAllSet(isset_bitmap_, schema_->num_key_columns() - 1, schema_->num_key_columns()); +} + std::string KuduPartialRow::ToString() const { ScopedDisableRedaction no_redaction; diff --git a/src/kudu/common/partial_row.h b/src/kudu/common/partial_row.h index 25f2efec8..06a70ddd8 100644 --- a/src/kudu/common/partial_row.h +++ b/src/kudu/common/partial_row.h @@ -634,6 +634,10 @@ class KUDU_EXPORT KuduPartialRow { /// for this mutation. bool IsNonUniqueKeySet() const; + /// @return @c true if auto-incrementing column has been set + /// for this mutation. + bool IsAutoIncrementingColumnSet() const; + /// @return @c true if all column values have been set. bool AllColumnsSet() const; diff --git a/src/kudu/integration-tests/auto_incrementing-itest.cc b/src/kudu/integration-tests/auto_incrementing-itest.cc index 1be177527..4c49bde02 100644 --- a/src/kudu/integration-tests/auto_incrementing-itest.cc +++ b/src/kudu/integration-tests/auto_incrementing-itest.cc @@ -59,6 +59,8 @@ using kudu::client::KuduSchemaBuilder; using kudu::client::KuduSession; using kudu::client::KuduTable; using kudu::client::KuduTableCreator; +using kudu::client::KuduUpdate; +using kudu::client::KuduUpsert; using kudu::client::sp::shared_ptr; using kudu::env_util::ListFilesInDir; using kudu::rpc::RpcController; @@ -88,6 +90,7 @@ class AutoIncrementingItest : public KuduTest { Status CreateTableWithPartition() { KuduSchemaBuilder b; b.AddColumn("c0")->Type(KuduColumnSchema::INT32)->NotNull()->NonUniquePrimaryKey(); + b.AddColumn("c1")->Type(client::KuduColumnSchema::STRING)->NotNull(); RETURN_NOT_OK(b.Build(&kudu_schema_)); int lower_bound = 0; @@ -115,12 +118,47 @@ class AutoIncrementingItest : public KuduTest { unique_ptr<KuduInsert> insert(table_->NewInsert()); KuduPartialRow* row = insert->mutable_row(); RETURN_NOT_OK(row->SetInt32("c0", i)); + RETURN_NOT_OK(row->SetString("c1", "string_val")); RETURN_NOT_OK(session->Apply(insert.release())); SleepFor(MonoDelta::FromMilliseconds(sleep_millis)); } return Status::OK(); } + // Insert data into the above created table. + Status UpdateData(int start_c0_value, int end_c0_value, int sleep_millis = 0, + const string& c1_val = "string_val") { + shared_ptr<KuduSession> session(client_->NewSession()); + RETURN_NOT_OK(client_->OpenTable(kTableName, &table_)); + for (int i = start_c0_value; i < end_c0_value; i++) { + unique_ptr<KuduUpdate> update(table_->NewUpdate()); + KuduPartialRow* row = update->mutable_row(); + RETURN_NOT_OK(row->SetInt32("c0", i)); + RETURN_NOT_OK(row->SetInt64(Schema::GetAutoIncrementingColumnName(), i + 1)); + RETURN_NOT_OK(row->SetString("c1", c1_val)); + RETURN_NOT_OK(session->Apply(update.release())); + SleepFor(MonoDelta::FromMilliseconds(sleep_millis)); + } + return Status::OK(); + } + + // Upsert data into the above created table. + Status UpsertData(int start_c0_value, int end_c0_value, int sleep_millis = 0, + const string& c1_val = "string_val") { + shared_ptr<KuduSession> session(client_->NewSession()); + RETURN_NOT_OK(client_->OpenTable(kTableName, &table_)); + for (int i = start_c0_value; i < end_c0_value; i++) { + unique_ptr<KuduUpsert> upsert(table_->NewUpsert()); + KuduPartialRow* row = upsert->mutable_row(); + RETURN_NOT_OK(row->SetInt32("c0", i)); + RETURN_NOT_OK(row->SetInt64(Schema::GetAutoIncrementingColumnName(), i + 1)); + RETURN_NOT_OK(row->SetString("c1", c1_val)); + RETURN_NOT_OK(session->Apply(upsert.release())); + SleepFor(MonoDelta::FromMilliseconds(sleep_millis)); + } + return Status::OK(); + } + // Delete row based on the row values passed. Status DeleteRow(int c0_val, int auto_incrementing_id) { shared_ptr<KuduSession> session = client_->NewSession(); @@ -149,6 +187,7 @@ class AutoIncrementingItest : public KuduTest { Schema schema = Schema({ ColumnSchema("c0", INT32), ColumnSchema(Schema::GetAutoIncrementingColumnName(), INT64, false,false, true), + ColumnSchema("c1", STRING), },2); RETURN_NOT_OK(SchemaToColumnPBs(schema, scan->mutable_projected_columns())); RETURN_NOT_OK(cluster_->tserver_proxy(ts)->Scan(req, &resp, &rpc)); @@ -218,10 +257,96 @@ TEST_F(AutoIncrementingItest, BasicInserts) { vector<string> results; ASSERT_OK(ScanTablet(j, resp.status_and_schema(0).tablet_status().tablet_id(), &results)); for (int i = 0; i < kNumRows; i++) { - ASSERT_EQ(Substitute("(int32 c0=$0, int64 $1=$2)", i, + ASSERT_EQ(Substitute("(int32 c0=$0, int64 $1=$2, string c1=\"string_val\")", i, Schema::GetAutoIncrementingColumnName(), i + 1), results[i]); } } + + // Update column c1 with a new string and verify the data + ASSERT_OK(UpdateData(0, kNumRows, 0, "val_string")); + + // Scan all the tablet replicas and validate the results. + for (int j = 0; j < kNumTabletServers; j++) { + auto server = cluster_->tserver_proxy(j); + rpc::RpcController rpc; + tserver::ListTabletsRequestPB req; + tserver::ListTabletsResponsePB resp; + server->ListTablets(req, &resp, &rpc); + ASSERT_EQ(1, resp.status_and_schema_size()); + vector<string> results; + ASSERT_OK(ScanTablet(j, resp.status_and_schema(0).tablet_status().tablet_id(), &results)); + for (int i = 0; i < kNumRows; i++) { + ASSERT_EQ(Substitute("(int32 c0=$0, int64 $1=$2, string c1=\"val_string\")", i, + Schema::GetAutoIncrementingColumnName(), i + 1), results[i]); + } + } +} + +TEST_F(AutoIncrementingItest, BasicUpserts) { + cluster::ExternalMiniClusterOptions opts; + opts.num_tablet_servers = kNumTabletServers; + cluster_.reset(new cluster::ExternalMiniCluster(std::move(opts))); + ASSERT_OK(cluster_->Start()); + ASSERT_OK(cluster_->CreateClient(nullptr, &client_)); + + // Create a table and upsert data. These will be inserts as there is no data present. + ASSERT_OK(CreateTableWithPartition()); + ASSERT_OK(UpsertData(0, kNumRows)); + + // Scan all the tablet replicas and validate the results. + for (int j = 0; j < kNumTabletServers; j++) { + auto server = cluster_->tserver_proxy(j); + rpc::RpcController rpc; + tserver::ListTabletsRequestPB req; + tserver::ListTabletsResponsePB resp; + server->ListTablets(req, &resp, &rpc); + ASSERT_EQ(1, resp.status_and_schema_size()); + vector<string> results; + ASSERT_OK(ScanTablet(j, resp.status_and_schema(0).tablet_status().tablet_id(), &results)); + for (int i = 0; i < kNumRows; i++) { + ASSERT_EQ(Substitute("(int32 c0=$0, int64 $1=$2, string c1=\"string_val\")", i, + Schema::GetAutoIncrementingColumnName(), i + 1), results[i]); + } + } + + // Upsert data to the same rows written above but with a different c1 column value. + ASSERT_OK(UpsertData(0, kNumRows, 0, "val_string")); + // Scan all the tablet replicas and validate the results. + for (int j = 0; j < kNumTabletServers; j++) { + auto server = cluster_->tserver_proxy(j); + rpc::RpcController rpc; + tserver::ListTabletsRequestPB req; + tserver::ListTabletsResponsePB resp; + server->ListTablets(req, &resp, &rpc); + ASSERT_EQ(1, resp.status_and_schema_size()); + vector<string> results; + ASSERT_OK(ScanTablet(j, resp.status_and_schema(0).tablet_status().tablet_id(), &results)); + for (int i = 0; i < kNumRows; i++) { + ASSERT_EQ(Substitute("(int32 c0=$0, int64 $1=$2, string c1=\"val_string\")", i, + Schema::GetAutoIncrementingColumnName(), i + 1), results[i]); + } + } +} + +TEST_F(AutoIncrementingItest, TestNegatives) { + cluster::ExternalMiniClusterOptions opts; + opts.num_tablet_servers = kNumTabletServers; + cluster_.reset(new cluster::ExternalMiniCluster(std::move(opts))); + ASSERT_OK(cluster_->Start()); + ASSERT_OK(cluster_->CreateClient(nullptr, &client_)); + + // Create a table and insert data with auto-incrementing column value present. + ASSERT_OK(CreateTableWithPartition()); + shared_ptr<KuduSession> session(client_->NewSession()); + ASSERT_OK(client_->OpenTable(kTableName, &table_)); + unique_ptr<KuduInsert> insert(table_->NewInsert()); + KuduPartialRow* row = insert->mutable_row(); + ASSERT_OK(row->SetInt32("c0", 1)); + ASSERT_OK(row->SetInt64("auto_incrementing_id", 1)); + ASSERT_OK(row->SetString("c1", "string_val")); + Status s = session->Apply(insert.release()); + ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); + ASSERT_STR_CONTAINS(s.ToString(), "Auto-Incrementing column should not be specified"); } TEST_F(AutoIncrementingItest, BootstrapWithNoWals) { @@ -262,7 +387,7 @@ TEST_F(AutoIncrementingItest, BootstrapWithNoWals) { ASSERT_OK(ScanTablet(j, tablet_uuid, &results)); LOG(INFO) << "Results size: " << results.size(); for (int i = 0; i < results.size(); i++) { - ASSERT_EQ(Substitute("(int32 c0=$0, int64 $1=$2)", i + 100, + ASSERT_EQ(Substitute("(int32 c0=$0, int64 $1=$2, string c1=\"string_val\")", i + 100, Schema::GetAutoIncrementingColumnName(), i + 100 + 1), results[i]); } @@ -320,7 +445,7 @@ TEST_F(AutoIncrementingItest, BootstrapNoWalsNoData) { ASSERT_OK(ScanTablet(j, tablet_uuid, &results)); ASSERT_EQ(200, results.size()); for (int i = 0; i < results.size(); i++) { - ASSERT_EQ(Substitute("(int32 c0=$0, int64 $1=$2)", i + kNumRows, + ASSERT_EQ(Substitute("(int32 c0=$0, int64 $1=$2, string c1=\"string_val\")", i + kNumRows, Schema::GetAutoIncrementingColumnName(), i + 1), results[i]); } }
