This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new cbbb60e53 [server] KUDU-1945 Auto_incrementing column UPSERT support
cbbb60e53 is described below
commit cbbb60e532e388665480f1c8be383305eb8fae62
Author: Abhishek Chennaka <[email protected]>
AuthorDate: Mon Jun 12 13:52:31 2023 -0700
[server] KUDU-1945 Auto_incrementing column UPSERT support
Add UPSERT operation support on the server side when the entire row
is present in the request including the auto-incrementing column.
There will be followup patches to update the clients and test the
corresponding changes as well.
With this patch the expected behavior for different operations
from Kudu clients is as follows:
Operation Presence of auto-incrementing column
INSERT/INSERT_IGNORE Should not be specified
UPDATE/UPDATE_IGNORE Has to be specified
UPSERT/UPSERT_IGNORE Has to be specified
DELETE/DELETE_IGNORE Has to be specified
This implementation of UPSERT operation behavior in this patch
is to facilitate incremental restores consistently when using
Kudu backup and restore tool.
More details in the design document:
https://docs.google.com/document/d/1-x6F5TkRYMqt2umDBQ5x-Aa4dqKwFYpbxo1A6QoMafo/edit#heading=h.vnagsa4oxwrf
Change-Id: Ib5cc4d80f77c165452572948f68c76fc70394d47
Reviewed-on: http://gerrit.cloudera.org:8080/20061
Reviewed-by: Alexey Serbin <[email protected]>
Tested-by: Alexey Serbin <[email protected]>
---
python/kudu/tests/test_client.py | 2 +-
src/kudu/common/row_operations.cc | 80 ++++++++---
src/kudu/tablet/tablet_auto_incrementing-test.cc | 171 +++++++++++++++++++----
3 files changed, 203 insertions(+), 50 deletions(-)
diff --git a/python/kudu/tests/test_client.py b/python/kudu/tests/test_client.py
index 0ce6d9bb4..6d60508de 100755
--- a/python/kudu/tests/test_client.py
+++ b/python/kudu/tests/test_client.py
@@ -478,7 +478,7 @@ class TestClient(KuduTestBase, CompatUnitTest):
try:
session.flush()
except KuduBadStatus:
- message = 'is incorrectly set'
+ message = 'should not be set for'
errors, overflow = session.get_pending_errors()
assert not overflow
assert len(errors) == 1
diff --git a/src/kudu/common/row_operations.cc
b/src/kudu/common/row_operations.cc
index b3a359323..5e8d560ec 100644
--- a/src/kudu/common/row_operations.cc
+++ b/src/kudu/common/row_operations.cc
@@ -472,39 +472,78 @@ Status RowOperationsPBDecoder::DecodeInsertOrUpsert(const
uint8_t* prototype_row
tablet_row.set_null(tablet_col_idx, client_set_to_null);
}
if (!client_set_to_null) {
- // Copy the value if it's not null.
- Status row_status;
- RETURN_NOT_OK(ReadColumn(
- col, tablet_row.mutable_cell_ptr(tablet_col_idx), &row_status));
- if (PREDICT_FALSE(!row_status.ok())) {
- op->SetFailureStatusOnce(row_status);
+ // Check if the non-null value is present for auto-incrementing
column. For UPSERT or
+ // UPSERT_IGNORE operations we allow the user to specify the
auto-incrementing value.
+ if (tablet_col_idx != auto_incrementing_col_idx) {
+ // Copy the non-null value.
+ Status row_status;
+ RETURN_NOT_OK(ReadColumn(
+ col, tablet_row.mutable_cell_ptr(tablet_col_idx), &row_status));
+ if (PREDICT_FALSE(!row_status.ok())) {
+ op->SetFailureStatusOnce(row_status);
+ }
+ } else {
+ if (op->type == RowOperationsPB_Type_INSERT ||
+ op->type == RowOperationsPB_Type_INSERT_IGNORE) {
+ // auto-incrementing column values not to be set for
INSERT/INSERT_IGNORE operations.
+ static const Status kErrFieldIncorrectlySet =
Status::InvalidArgument(
+ "auto-incrementing column should not be set for
INSERT/INSERT_IGNORE operations");
+ op->SetFailureStatusOnce(kErrFieldIncorrectlySet);
+ RETURN_NOT_OK(ReadColumnAndDiscard(col));
+ return kErrFieldIncorrectlySet;
+ }
+ // Fetch the auto-incrementing counter from the request.
+ Status row_status;
+ int64_t counter = 0;
+ RETURN_NOT_OK(ReadColumn(
+ col, reinterpret_cast<uint8_t*>(&counter), &row_status));
+ if (PREDICT_FALSE(!row_status.ok())) {
+ op->SetFailureStatusOnce(row_status);
+ } else {
+ // Make sure it is positive.
+ if (counter < 0) {
+ static const Status kErrorValue = Status::InvalidArgument(
+ "auto-incrementing column value must be greater than zero");
+ op->SetFailureStatusOnce(kErrorValue);
+ return kErrorValue;
+ }
+ // Check if the provided counter value is less than what is in
memory
+ // and update the counter in memory.
+ if (counter > *auto_incrementing_counter) {
+ *auto_incrementing_counter = counter;
+ }
+ memcpy(tablet_row.mutable_cell_ptr(tablet_col_idx), &counter, 8);
+ }
}
} else if (PREDICT_FALSE(!col.is_nullable())) {
op->SetFailureStatusOnce(Status::InvalidArgument(
"NULL values not allowed for non-nullable column",
col.ToString()));
RETURN_NOT_OK(ReadColumnAndDiscard(col));
}
- if (PREDICT_FALSE(tablet_col_idx == auto_incrementing_col_idx)) {
- static const Status err_field_incorrectly_set =
Status::InvalidArgument(
- "auto-incrementing column is incorrectly set");
- op->SetFailureStatusOnce(err_field_incorrectly_set);
- return err_field_incorrectly_set;
- }
} else {
// If the client didn't provide a value, check if it's an
auto-incrementing
// field. If so, populate the field as appropriate.
if (tablet_col_idx == auto_incrementing_col_idx) {
+ if (op->type == RowOperationsPB_Type_UPSERT ||
+ op->type == RowOperationsPB_Type_UPSERT_IGNORE) {
+ static const Status kErrMaxValue =
Status::InvalidArgument("auto-incrementing column "
+ "should
be set for "
+
"UPSERT/UPSERT_IGNORE "
+
"operations");
+ op->SetFailureStatusOnce(kErrMaxValue);
+ return kErrMaxValue;
+ }
if (*DCHECK_NOTNULL(auto_incrementing_counter) == INT64_MAX) {
- static const Status err_max_value = Status::IllegalState("max
auto-incrementing column "
+ static const Status kErrMaxValue = Status::IllegalState("max
auto-incrementing column "
"value
reached");
- op->SetFailureStatusOnce(err_max_value);
- return err_max_value;
+ op->SetFailureStatusOnce(kErrMaxValue);
+ return kErrMaxValue;
}
if (*DCHECK_NOTNULL(auto_incrementing_counter) < 0) {
- static const Status err_value = Status::IllegalState("invalid
auto-incrementing "
+ static const Status kErrValue = Status::IllegalState("invalid
auto-incrementing "
"column value");
- op->SetFailureStatusOnce(err_value);
- return err_value;
+ op->SetFailureStatusOnce(kErrValue);
+ return kErrValue;
}
// We increment the auto incrementing counter at this point regardless
of future failures
// in the op for simplicity. The auto-incrementing column key space is
large enough to
@@ -762,11 +801,6 @@ Status
RowOperationsPBDecoder::DecodeOp<DecoderMode::WRITE_OPS>(
switch (type) {
case RowOperationsPB::UPSERT:
case RowOperationsPB::UPSERT_IGNORE:
- if (tablet_schema_->has_auto_incrementing()) {
- return Status::NotSupported(
- Substitute("tables with auto-incrementing column do not support "
- "$0 operations", RowOperationsPB_Type_Name(type)));
- }
case RowOperationsPB::INSERT:
case RowOperationsPB::INSERT_IGNORE:
return DecodeInsertOrUpsert(prototype_row_storage, mapping, op,
diff --git a/src/kudu/tablet/tablet_auto_incrementing-test.cc
b/src/kudu/tablet/tablet_auto_incrementing-test.cc
index 1650f7195..91ce282d1 100644
--- a/src/kudu/tablet/tablet_auto_incrementing-test.cc
+++ b/src/kudu/tablet/tablet_auto_incrementing-test.cc
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+#include <cstdint>
#include <memory>
#include <string>
#include <type_traits>
@@ -82,36 +83,154 @@ TEST_F(AutoIncrementingTabletTest, TestInsertOp) {
}
}
-TEST_F(AutoIncrementingTabletTest, TestInsertOpWithAutoIncrementSet) {
- // Insert a row into the tablet populating auto-incrementing column.
- unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
- ASSERT_OK(row->SetInt64(0, 10));
- ASSERT_OK(row->SetInt32(1, 1337));
- Status s = writer_->Insert(*row);
- ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
- ASSERT_EQ("Invalid argument: auto-incrementing column is incorrectly set",
s.ToString());
-}
TEST_F(AutoIncrementingTabletTest, TestUpsertOp) {
- // Insert a row into the tablet populating only non auto-incrementing columns
- // using UPSERT.
- unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
- ASSERT_OK(row->SetInt32(1, 1337));
- Status s = writer_->Upsert(*row);
- ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
- ASSERT_EQ("Not implemented: tables with auto-incrementing "
- "column do not support UPSERT operations", s.ToString());
+ // Insert 20 rows with auto-incrementing column populated into an empty
tablet
+ // and validate the data written using Upsert() and UpsertIgnore().
+
+ {
+ unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
+ for (int i = 1; i <= 10; i++) {
+ ASSERT_OK(row->SetInt64(0, i));
+ ASSERT_OK(row->SetInt32(1, 1337));
+ ASSERT_OK(writer_->Upsert(*row));
+ }
+ for (int i = 10; i <= 20; i++) {
+ ASSERT_OK(row->SetInt64(0, i));
+ ASSERT_OK(row->SetInt32(1, 1337));
+ ASSERT_OK(writer_->UpsertIgnore(*row));
+ }
+ unique_ptr<RowwiseIterator> iter;
+ ASSERT_OK(tablet()->NewRowIterator(schema_.CopyWithoutColumnIds(), &iter));
+ ASSERT_OK(iter->Init(nullptr));
+ vector<string> out;
+ IterateToStringList(iter.get(), &out);
+ for (int i = 1; i <= 20; i++) {
+ ASSERT_STR_MATCHES(out[i - 1], Substitute("(int64 key=$0, int32
val=1337)", i));
+ }
+ }
+ // Update the same 20 rows with auto-incrementing column populated but with
different
+ // non-primary key column data and validate the data written using Upsert()
and UpsertIgnore().
+ {
+ unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
+ for (int i = 1; i <= 10; i++) {
+ ASSERT_OK(row->SetInt64(0, i));
+ ASSERT_OK(row->SetInt32(1, 1338));
+ ASSERT_OK(writer_->Upsert(*row));
+ }
+ for (int i = 10; i <= 20; i++) {
+ ASSERT_OK(row->SetInt64(0, i));
+ ASSERT_OK(row->SetInt32(1, 1338));
+ ASSERT_OK(writer_->UpsertIgnore(*row));
+ }
+ unique_ptr<RowwiseIterator> iter;
+ ASSERT_OK(tablet()->NewRowIterator(schema_.CopyWithoutColumnIds(), &iter));
+ ASSERT_OK(iter->Init(nullptr));
+ vector<string> out;
+ IterateToStringList(iter.get(), &out);
+ for (int i = 1; i <= 20; i++) {
+ ASSERT_STR_MATCHES(out[i - 1], Substitute("(int64 key=$0, int32
val=1338)", i));
+ }
+ }
+ // Now insert new set of rows and validate the data written.
+ {
+ unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
+ for (int i = 20; i <= 30; i++) {
+ ASSERT_OK(row->SetInt32(1, 1337));
+ ASSERT_OK(writer_->Insert(*row));
+ }
+
+ // Scan the tablet data and verify the auto increment counter is set
correctly.
+ unique_ptr<RowwiseIterator> iter;
+ ASSERT_OK(tablet()->NewRowIterator(schema_.CopyWithoutColumnIds(), &iter));
+ ASSERT_OK(iter->Init(nullptr));
+ vector<string> out;
+ IterateToStringList(iter.get(), &out);
+ for (int i = 21; i <= 30; i++) {
+ ASSERT_STR_MATCHES(out[i-1], Substitute("int64 key=$0, int32 val=1337",
i));
+ }
+ }
}
-TEST_F(AutoIncrementingTabletTest, TestUpsertIgnoreOp) {
- // Insert a row into the tablet populating only non auto-incrementing columns
- // using UPSERT_IGNORE.
- unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
- ASSERT_OK(row->SetInt32(1, 1337));
- Status s = writer_->UpsertIgnore(*row);
- ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
- ASSERT_EQ("Not implemented: tables with auto-incrementing "
- "column do not support UPSERT_IGNORE operations", s.ToString());
+TEST_F(AutoIncrementingTabletTest, TestNegatives) {
+ // Insert a row into the tablet setting auto-incrementing column.
+ {
+ unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
+ ASSERT_OK(row->SetInt64(0, 10));
+ ASSERT_OK(row->SetInt32(1, 1337));
+ Status s = writer_->Insert(*row);
+ ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+ ASSERT_EQ("Invalid argument: auto-incrementing column should not be set "
+ "for INSERT/INSERT_IGNORE operations", s.ToString());
+ }
+
+ // Upsert a row without auto-incrementing column set
+ {
+ unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
+ ASSERT_OK(row->SetInt32(1, 1337));
+ Status s = writer_->Upsert(*row);
+ ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+ ASSERT_EQ("Invalid argument: auto-incrementing column should be set "
+ "for UPSERT/UPSERT_IGNORE operations", s.ToString());
+ }
+
+ // Upsert a row with auto-incrementing set to negative value
+ {
+ unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
+ ASSERT_OK(row->SetInt64(0, -1));
+ ASSERT_OK(row->SetInt32(1, 1337));
+ Status s = writer_->Upsert(*row);
+ ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+ ASSERT_EQ("Invalid argument: auto-incrementing column value must be
greater than zero",
+ s.ToString());
+ }
+ // Upsert a row with auto-incrementing set to INT64_MAX and insert a row
+ {
+ unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
+ ASSERT_OK(row->SetInt64(0, INT64_MAX));
+ ASSERT_OK(row->SetInt32(1, 1337));
+ ASSERT_OK(writer_->Upsert(*row));
+ }
+ {
+ unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
+ ASSERT_OK(row->SetInt32(1, 1337));
+ Status s = writer_->Insert(*row);
+ ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+ ASSERT_EQ("Illegal state: max auto-incrementing column value reached",
+ s.ToString());
+ }
+
+ // Upsert rows with auto-incrementing set to INT64_MAX and a new value
respectively.
+ // Insert a new row and later upsert a row with INT64_MAX.
+ // Note: Technically, since we are operating on the same table as above,
UPSERT with
+ // Auto-incrementing column value set to INT64_MAX is not needed as we have
already
+ // done it above, but having it enhances the readability of the sub-test.
+ {
+ unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
+ ASSERT_OK(row->SetInt64(0, INT64_MAX));
+ ASSERT_OK(row->SetInt32(1, 1337));
+ ASSERT_OK(writer_->Upsert(*row));
+ }
+ {
+ unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
+ ASSERT_OK(row->SetInt64(0, 100));
+ ASSERT_OK(row->SetInt32(1, 1337));
+ ASSERT_OK(writer_->Upsert(*row));
+ }
+ {
+ unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
+ ASSERT_OK(row->SetInt32(1, 1337));
+ Status s = writer_->Insert(*row);
+ ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+ ASSERT_EQ("Illegal state: max auto-incrementing column value reached",
+ s.ToString());
+ }
+ {
+ unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
+ ASSERT_OK(row->SetInt64(0, INT64_MAX));
+ ASSERT_OK(row->SetInt32(1, 1338));
+ ASSERT_OK(writer_->Upsert(*row));
+ }
}
} // namespace tablet