This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch branch-1.17.x
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/branch-1.17.x by this push:
     new 89f5afe06 [server] KUDU-1945 Auto_incrementing column UPSERT support
89f5afe06 is described below

commit 89f5afe06649b6cc5e2a293edb829d39fc29ca31
Author: Abhishek Chennaka <[email protected]>
AuthorDate: Mon Jun 12 13:52:31 2023 -0700

    [server] KUDU-1945 Auto_incrementing column UPSERT support
    
    Add UPSERT operation support on the server side when the entire row
    is present in the request including the auto-incrementing column.
    There will be followup patches to update the clients and test the
    corresponding changes as well.
    
    With this patch the expected behavior for different operations
    from Kudu clients is as follows:
    
    Operation             Presence of auto-incrementing column
    INSERT/INSERT_IGNORE  Should not be specified
    UPDATE/UPDATE_IGNORE  Has to be specified
    UPSERT/UPSERT_IGNORE  Has to be specified
    DELETE/DELETE_IGNORE  Has to be specified
    
    This implementation of UPSERT operation behavior in this patch
    is to facilitate incremental restores consistently when using
    Kudu backup and restore tool.
    
    More details in the design document:
    
https://docs.google.com/document/d/1-x6F5TkRYMqt2umDBQ5x-Aa4dqKwFYpbxo1A6QoMafo/edit#heading=h.vnagsa4oxwrf
    
    Change-Id: Ib5cc4d80f77c165452572948f68c76fc70394d47
    Reviewed-on: http://gerrit.cloudera.org:8080/20061
    Reviewed-by: Alexey Serbin <[email protected]>
    Tested-by: Alexey Serbin <[email protected]>
    (cherry picked from commit cbbb60e532e388665480f1c8be383305eb8fae62)
    Reviewed-on: http://gerrit.cloudera.org:8080/20227
    Reviewed-by: Yingchun Lai <[email protected]>
    Tested-by: Yingchun Lai <[email protected]>
---
 src/kudu/common/row_operations.cc                |  80 ++++++++---
 src/kudu/tablet/tablet_auto_incrementing-test.cc | 171 +++++++++++++++++++----
 2 files changed, 202 insertions(+), 49 deletions(-)

diff --git a/src/kudu/common/row_operations.cc 
b/src/kudu/common/row_operations.cc
index b3a359323..5e8d560ec 100644
--- a/src/kudu/common/row_operations.cc
+++ b/src/kudu/common/row_operations.cc
@@ -472,39 +472,78 @@ Status RowOperationsPBDecoder::DecodeInsertOrUpsert(const 
uint8_t* prototype_row
         tablet_row.set_null(tablet_col_idx, client_set_to_null);
       }
       if (!client_set_to_null) {
-        // Copy the value if it's not null.
-        Status row_status;
-        RETURN_NOT_OK(ReadColumn(
-            col, tablet_row.mutable_cell_ptr(tablet_col_idx), &row_status));
-        if (PREDICT_FALSE(!row_status.ok())) {
-          op->SetFailureStatusOnce(row_status);
+        // Check if the non-null value is present for auto-incrementing 
column. For UPSERT or
+        // UPSERT_IGNORE operations we allow the user to specify the 
auto-incrementing value.
+        if (tablet_col_idx != auto_incrementing_col_idx) {
+          // Copy the non-null value.
+          Status row_status;
+          RETURN_NOT_OK(ReadColumn(
+              col, tablet_row.mutable_cell_ptr(tablet_col_idx), &row_status));
+          if (PREDICT_FALSE(!row_status.ok())) {
+            op->SetFailureStatusOnce(row_status);
+          }
+        } else {
+          if (op->type == RowOperationsPB_Type_INSERT ||
+              op->type == RowOperationsPB_Type_INSERT_IGNORE) {
+            // auto-incrementing column values not to be set for 
INSERT/INSERT_IGNORE operations.
+            static const Status kErrFieldIncorrectlySet = 
Status::InvalidArgument(
+                "auto-incrementing column should not be set for 
INSERT/INSERT_IGNORE operations");
+            op->SetFailureStatusOnce(kErrFieldIncorrectlySet);
+            RETURN_NOT_OK(ReadColumnAndDiscard(col));
+            return kErrFieldIncorrectlySet;
+          }
+          // Fetch the auto-incrementing counter from the request.
+          Status row_status;
+          int64_t counter = 0;
+          RETURN_NOT_OK(ReadColumn(
+              col, reinterpret_cast<uint8_t*>(&counter), &row_status));
+          if (PREDICT_FALSE(!row_status.ok())) {
+            op->SetFailureStatusOnce(row_status);
+          } else {
+            // Make sure it is positive.
+            if (counter < 0) {
+              static const Status kErrorValue = Status::InvalidArgument(
+                  "auto-incrementing column value must be greater than zero");
+              op->SetFailureStatusOnce(kErrorValue);
+              return kErrorValue;
+            }
+            // Check if the provided counter value is less than what is in 
memory
+            // and update the counter in memory.
+            if (counter > *auto_incrementing_counter) {
+              *auto_incrementing_counter = counter;
+            }
+            memcpy(tablet_row.mutable_cell_ptr(tablet_col_idx), &counter, 8);
+          }
         }
       } else if (PREDICT_FALSE(!col.is_nullable())) {
         op->SetFailureStatusOnce(Status::InvalidArgument(
             "NULL values not allowed for non-nullable column", 
col.ToString()));
         RETURN_NOT_OK(ReadColumnAndDiscard(col));
       }
-      if (PREDICT_FALSE(tablet_col_idx == auto_incrementing_col_idx)) {
-        static const Status err_field_incorrectly_set = 
Status::InvalidArgument(
-            "auto-incrementing column is incorrectly set");
-        op->SetFailureStatusOnce(err_field_incorrectly_set);
-        return err_field_incorrectly_set;
-      }
     } else {
       // If the client didn't provide a value, check if it's an 
auto-incrementing
       // field. If so, populate the field as appropriate.
       if (tablet_col_idx == auto_incrementing_col_idx) {
+        if (op->type == RowOperationsPB_Type_UPSERT ||
+            op->type == RowOperationsPB_Type_UPSERT_IGNORE) {
+          static const Status kErrMaxValue = 
Status::InvalidArgument("auto-incrementing column "
+                                                                     "should 
be set for "
+                                                                     
"UPSERT/UPSERT_IGNORE "
+                                                                     
"operations");
+          op->SetFailureStatusOnce(kErrMaxValue);
+          return kErrMaxValue;
+        }
         if (*DCHECK_NOTNULL(auto_incrementing_counter) == INT64_MAX) {
-          static const Status err_max_value = Status::IllegalState("max 
auto-incrementing column "
+          static const Status kErrMaxValue = Status::IllegalState("max 
auto-incrementing column "
                                                                    "value 
reached");
-          op->SetFailureStatusOnce(err_max_value);
-          return err_max_value;
+          op->SetFailureStatusOnce(kErrMaxValue);
+          return kErrMaxValue;
         }
         if (*DCHECK_NOTNULL(auto_incrementing_counter) < 0) {
-          static const Status err_value = Status::IllegalState("invalid 
auto-incrementing "
+          static const Status kErrValue = Status::IllegalState("invalid 
auto-incrementing "
                                                                "column value");
-          op->SetFailureStatusOnce(err_value);
-          return err_value;
+          op->SetFailureStatusOnce(kErrValue);
+          return kErrValue;
         }
         // We increment the auto incrementing counter at this point regardless 
of future failures
         // in the op for simplicity. The auto-incrementing column key space is 
large enough to
@@ -762,11 +801,6 @@ Status 
RowOperationsPBDecoder::DecodeOp<DecoderMode::WRITE_OPS>(
   switch (type) {
     case RowOperationsPB::UPSERT:
     case RowOperationsPB::UPSERT_IGNORE:
-      if (tablet_schema_->has_auto_incrementing()) {
-        return Status::NotSupported(
-            Substitute("tables with auto-incrementing column do not support "
-                       "$0 operations", RowOperationsPB_Type_Name(type)));
-      }
     case RowOperationsPB::INSERT:
     case RowOperationsPB::INSERT_IGNORE:
       return DecodeInsertOrUpsert(prototype_row_storage, mapping, op,
diff --git a/src/kudu/tablet/tablet_auto_incrementing-test.cc 
b/src/kudu/tablet/tablet_auto_incrementing-test.cc
index 1650f7195..91ce282d1 100644
--- a/src/kudu/tablet/tablet_auto_incrementing-test.cc
+++ b/src/kudu/tablet/tablet_auto_incrementing-test.cc
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <cstdint>
 #include <memory>
 #include <string>
 #include <type_traits>
@@ -82,36 +83,154 @@ TEST_F(AutoIncrementingTabletTest, TestInsertOp) {
   }
 }
 
-TEST_F(AutoIncrementingTabletTest, TestInsertOpWithAutoIncrementSet) {
-  // Insert a row into the tablet populating auto-incrementing column.
-  unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
-  ASSERT_OK(row->SetInt64(0, 10));
-  ASSERT_OK(row->SetInt32(1, 1337));
-  Status s = writer_->Insert(*row);
-  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
-  ASSERT_EQ("Invalid argument: auto-incrementing column is incorrectly set", 
s.ToString());
-}
 
 TEST_F(AutoIncrementingTabletTest, TestUpsertOp) {
-  // Insert a row into the tablet populating only non auto-incrementing columns
-  // using UPSERT.
-  unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
-  ASSERT_OK(row->SetInt32(1, 1337));
-  Status s = writer_->Upsert(*row);
-  ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
-  ASSERT_EQ("Not implemented: tables with auto-incrementing "
-            "column do not support UPSERT operations", s.ToString());
+  // Insert 20 rows with auto-incrementing column populated into an empty 
tablet
+  // and validate the data written using Upsert() and UpsertIgnore().
+
+  {
+    unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
+    for (int i = 1; i <= 10; i++) {
+      ASSERT_OK(row->SetInt64(0, i));
+      ASSERT_OK(row->SetInt32(1, 1337));
+      ASSERT_OK(writer_->Upsert(*row));
+    }
+    for (int i = 10; i <= 20; i++) {
+      ASSERT_OK(row->SetInt64(0, i));
+      ASSERT_OK(row->SetInt32(1, 1337));
+      ASSERT_OK(writer_->UpsertIgnore(*row));
+    }
+    unique_ptr<RowwiseIterator> iter;
+    ASSERT_OK(tablet()->NewRowIterator(schema_.CopyWithoutColumnIds(), &iter));
+    ASSERT_OK(iter->Init(nullptr));
+    vector<string> out;
+    IterateToStringList(iter.get(), &out);
+    for (int i = 1; i <= 20; i++) {
+      ASSERT_STR_MATCHES(out[i - 1], Substitute("(int64 key=$0, int32 
val=1337)", i));
+    }
+  }
+  // Update the same 20 rows with auto-incrementing column populated but with 
different
+  // non-primary key column data and validate the data written using Upsert() 
and UpsertIgnore().
+  {
+    unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
+    for (int i = 1; i <= 10; i++) {
+      ASSERT_OK(row->SetInt64(0, i));
+      ASSERT_OK(row->SetInt32(1, 1338));
+      ASSERT_OK(writer_->Upsert(*row));
+    }
+    for (int i = 10; i <= 20; i++) {
+      ASSERT_OK(row->SetInt64(0, i));
+      ASSERT_OK(row->SetInt32(1, 1338));
+      ASSERT_OK(writer_->UpsertIgnore(*row));
+    }
+    unique_ptr<RowwiseIterator> iter;
+    ASSERT_OK(tablet()->NewRowIterator(schema_.CopyWithoutColumnIds(), &iter));
+    ASSERT_OK(iter->Init(nullptr));
+    vector<string> out;
+    IterateToStringList(iter.get(), &out);
+    for (int i = 1; i <= 20; i++) {
+      ASSERT_STR_MATCHES(out[i - 1], Substitute("(int64 key=$0, int32 
val=1338)", i));
+    }
+  }
+  // Now insert new set of rows and validate the data written.
+  {
+    unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
+    for (int i = 20; i <= 30; i++) {
+      ASSERT_OK(row->SetInt32(1, 1337));
+      ASSERT_OK(writer_->Insert(*row));
+    }
+
+    // Scan the tablet data and verify the auto increment counter is set 
correctly.
+    unique_ptr<RowwiseIterator> iter;
+    ASSERT_OK(tablet()->NewRowIterator(schema_.CopyWithoutColumnIds(), &iter));
+    ASSERT_OK(iter->Init(nullptr));
+    vector<string> out;
+    IterateToStringList(iter.get(), &out);
+    for (int i = 21; i <= 30; i++) {
+      ASSERT_STR_MATCHES(out[i-1], Substitute("int64 key=$0, int32 val=1337", 
i));
+    }
+  }
 }
 
-TEST_F(AutoIncrementingTabletTest, TestUpsertIgnoreOp) {
-  // Insert a row into the tablet populating only non auto-incrementing columns
-  // using UPSERT_IGNORE.
-  unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
-  ASSERT_OK(row->SetInt32(1, 1337));
-  Status s = writer_->UpsertIgnore(*row);
-  ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
-  ASSERT_EQ("Not implemented: tables with auto-incrementing "
-            "column do not support UPSERT_IGNORE operations", s.ToString());
+TEST_F(AutoIncrementingTabletTest, TestNegatives) {
+  // Insert a row into the tablet setting auto-incrementing column.
+  {
+    unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
+    ASSERT_OK(row->SetInt64(0, 10));
+    ASSERT_OK(row->SetInt32(1, 1337));
+    Status s = writer_->Insert(*row);
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+    ASSERT_EQ("Invalid argument: auto-incrementing column should not be set "
+              "for INSERT/INSERT_IGNORE operations", s.ToString());
+  }
+
+  // Upsert a row without auto-incrementing column set
+  {
+    unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
+    ASSERT_OK(row->SetInt32(1, 1337));
+    Status s = writer_->Upsert(*row);
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+    ASSERT_EQ("Invalid argument: auto-incrementing column should be set "
+              "for UPSERT/UPSERT_IGNORE operations", s.ToString());
+  }
+
+  // Upsert a row with auto-incrementing set to negative value
+  {
+    unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
+    ASSERT_OK(row->SetInt64(0, -1));
+    ASSERT_OK(row->SetInt32(1, 1337));
+    Status s = writer_->Upsert(*row);
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+    ASSERT_EQ("Invalid argument: auto-incrementing column value must be 
greater than zero",
+              s.ToString());
+  }
+  // Upsert a row with auto-incrementing set to INT64_MAX and insert a row
+  {
+    unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
+    ASSERT_OK(row->SetInt64(0, INT64_MAX));
+    ASSERT_OK(row->SetInt32(1, 1337));
+    ASSERT_OK(writer_->Upsert(*row));
+  }
+  {
+    unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
+    ASSERT_OK(row->SetInt32(1, 1337));
+    Status s = writer_->Insert(*row);
+    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+    ASSERT_EQ("Illegal state: max auto-incrementing column value reached",
+              s.ToString());
+  }
+
+  // Upsert rows with auto-incrementing set to INT64_MAX and a new value 
respectively.
+  // Insert a new row and later upsert a row with INT64_MAX.
+  // Note: Technically, since we are operating on the same table as above, 
UPSERT with
+  // Auto-incrementing column value set to INT64_MAX is not needed as we have 
already
+  // done it above, but having it enhances the readability of the sub-test.
+  {
+    unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
+    ASSERT_OK(row->SetInt64(0, INT64_MAX));
+    ASSERT_OK(row->SetInt32(1, 1337));
+    ASSERT_OK(writer_->Upsert(*row));
+  }
+  {
+    unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
+    ASSERT_OK(row->SetInt64(0, 100));
+    ASSERT_OK(row->SetInt32(1, 1337));
+    ASSERT_OK(writer_->Upsert(*row));
+  }
+  {
+    unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
+    ASSERT_OK(row->SetInt32(1, 1337));
+    Status s = writer_->Insert(*row);
+    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+    ASSERT_EQ("Illegal state: max auto-incrementing column value reached",
+              s.ToString());
+  }
+  {
+    unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
+    ASSERT_OK(row->SetInt64(0, INT64_MAX));
+    ASSERT_OK(row->SetInt32(1, 1338));
+    ASSERT_OK(writer_->Upsert(*row));
+  }
 }
 
 } // namespace tablet

Reply via email to