This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new ad5446fff KUDU-1261 control the maximum number of elements in an array
ad5446fff is described below

commit ad5446fff129bf1814e8fbfe43958384959d22eb
Author: Alexey Serbin <[email protected]>
AuthorDate: Fri Oct 10 15:56:58 2025 -0700

    KUDU-1261 control the maximum number of elements in an array
    
    This patch introduces a new --array_cell_max_elem_num runtime flag
    to control the maximum number of array elements that can be written
    into a Kudu table when ingesting data via RPC.  Corresponding unit
    tests scenarios are added as well.
    
    Change-Id: I63c0f9bd09723c15b2666fe5f5e683f8c85cf3a7
    Reviewed-on: http://gerrit.cloudera.org:8080/23527
    Reviewed-by: Abhishek Chennaka <[email protected]>
    Tested-by: Alexey Serbin <[email protected]>
---
 src/kudu/common/array_cell_view.h      |   4 +-
 src/kudu/common/row_operations-test.cc | 156 +++++++++++++++++++++++++++++++++
 src/kudu/common/row_operations.cc      |  34 +++++++
 3 files changed, 193 insertions(+), 1 deletion(-)

diff --git a/src/kudu/common/array_cell_view.h 
b/src/kudu/common/array_cell_view.h
index 96d1c7ccc..fdbd60b4f 100644
--- a/src/kudu/common/array_cell_view.h
+++ b/src/kudu/common/array_cell_view.h
@@ -82,6 +82,8 @@ constexpr serdes::ScalarArray KuduToScalarArrayType(DataType 
data_type) {
 
 class ArrayCellMetadataView final {
  public:
+  static constexpr const size_t kArrayMaxElemNum = 65536;
+
   // buf: data raw pointer
   // len: size of the buffer (bytes) pointed at by the 'buf' pointer
   ArrayCellMetadataView(const uint8_t* buf, const size_t size)
@@ -123,7 +125,7 @@ class ArrayCellMetadataView final {
       //     comparison criteria in the flatbuffers' logic that asserts the
       //     buffers size restriction
       opt.max_depth = 3;
-      opt.max_tables = 65536 + 1;
+      opt.max_tables = kArrayMaxElemNum + 1;
       opt.max_size = size_ + 1;
 
       flatbuffers::Verifier verifier(data_, size_, opt);
diff --git a/src/kudu/common/row_operations-test.cc 
b/src/kudu/common/row_operations-test.cc
index a81f6d9f1..808f4ebe6 100644
--- a/src/kudu/common/row_operations-test.cc
+++ b/src/kudu/common/row_operations-test.cc
@@ -21,6 +21,7 @@
 #include <cstdlib>
 #include <initializer_list>
 #include <memory>
+#include <optional>
 #include <ostream>
 #include <string>
 #include <type_traits>
@@ -47,12 +48,15 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+using std::nullopt;
+using std::optional;
 using std::shared_ptr;
 using std::string;
 using std::vector;
 using strings::Substitute;
 
 DECLARE_int32(max_cell_size_bytes);
+DECLARE_uint32(array_cell_max_elem_num);
 
 namespace kudu {
 
@@ -948,6 +952,98 @@ void CheckSplitExceedCellLimit(const Schema& client_schema,
     NO_FATALS(CheckExceedCellLimit(client_schema, col_values, op_type, 
expect_status, expect_msg));
   }
 }
+
+void CheckArrayMaxElemNumLimit(const Schema& schema,
+                               size_t elem_num,
+                               RowOperationsPB::Type op_type,
+                               const Status& expected_status = Status::OK(),
+                               const optional<string>& expected_msg = nullopt) 
{
+  switch (op_type) {
+    case RowOperationsPB::INSERT:
+    case RowOperationsPB::UPDATE:
+    case RowOperationsPB::DELETE:
+    case RowOperationsPB::UPSERT:
+    case RowOperationsPB::INSERT_IGNORE:
+    case RowOperationsPB::UPDATE_IGNORE:
+    case RowOperationsPB::DELETE_IGNORE:
+    case RowOperationsPB::UPSERT_IGNORE:
+      break;
+    default:
+      LOG(FATAL) << "unsupported op_type: " << 
RowOperationsPB::Type_Name(op_type);
+      break;  // unreachable
+  }
+
+  KuduPartialRow row(&schema);
+  for (size_t i = 0; i < schema.num_key_columns(); ++i) {
+    const ColumnSchema& cs = schema.column(i);
+    switch (cs.type_info()->type()) {
+      case INT32:
+        ASSERT_OK(row.SetInt32(i, 0));
+        break;
+      default:
+        LOG(FATAL) << "non-INT32 key column";
+        break;  // unreachable
+    }
+  }
+  for (size_t i = schema.num_key_columns(); i < schema.num_columns(); ++i) {
+    if (op_type == RowOperationsPB::DELETE || op_type == 
RowOperationsPB::DELETE_IGNORE) {
+      // DELETE should not have a value for non-key column.
+      break;
+    }
+    const ColumnSchema& cs = schema.column(i);
+    if (!cs.type_info()->is_array()) {
+      // Testing only array cells, so any non-key non-array column supposed to
+      // be nullable and is not set here.
+      continue;
+    }
+
+    const TypeInfo* elem_type_info = GetArrayElementTypeInfo(*cs.type_info());
+    ASSERT_NE(nullptr, elem_type_info);
+    switch (elem_type_info->type()) {
+      case BOOL:
+        ASSERT_OK(row.SetArrayBool(
+            i, vector<bool>(elem_num, true), vector<bool>(elem_num, true)));
+        break;
+      case INT32:
+        ASSERT_OK(row.SetArrayInt32(
+            i, vector<int32_t>(elem_num, 0), vector<bool>(elem_num, true)));
+        break;
+      case DOUBLE:
+        ASSERT_OK(row.SetArrayDouble(
+            i, vector<double>(elem_num, 0), vector<bool>(elem_num, true)));
+        break;
+      case STRING:
+        ASSERT_OK(row.SetArrayString(
+            i, vector<Slice>(elem_num, Slice()), vector<bool>(elem_num, 
true)));
+        break;
+      default:
+        LOG(FATAL) << Substitute("not implemented for arrays of type $0",
+                                 DataType_Name(elem_type_info->type()));
+        break;  // unreachable
+    }
+  }
+
+  RowOperationsPB pb;
+  RowOperationsPBEncoder(&pb).Add(op_type, row);
+
+  Arena arena(1024 * 1024);
+  Schema result_schema = schema.CopyWithColumnIds();
+  RowOperationsPBDecoder decoder(&pb, &schema, &result_schema, &arena);
+
+  vector<DecodedRowOperation> ops;
+  const auto s = decoder.DecodeOperations<WRITE_OPS>(&ops);
+  ASSERT_OK(s);
+
+  for (const auto& op : ops) {
+    SCOPED_TRACE(Substitute("op_type $0", 
RowOperationsPB::Type_Name(op_type)));
+    ASSERT_EQ(expected_status.CodeAsString(), op.result.CodeAsString())
+        << op.result.message().ToString();
+    if (expected_msg.has_value()) {
+      ASSERT_STR_CONTAINS(op.result.ToString(), expected_msg.value());
+    }
+  }
+}
+
 } // anonymous namespace
 
 // Decodes a split row using RowOperationsPBDecoder with 
DecoderMode::SPLIT_ROWS under
@@ -1016,6 +1112,66 @@ TEST_F(RowOperationsTest, ExceedCellLimit) {
   }
 }
 
+// Make sure it's possible to update the setting for --array_cell_max_elem_num
+// in runtime and it effectively limits how many elements can in one as 
expected.
+TEST_F(RowOperationsTest, ArrayCellElementNumberLimit) {
+  for (const DataType elem_type : { BOOL, INT32, DOUBLE, STRING }) {
+    SCOPED_TRACE(Substitute("array element type $0", 
DataType_Name(elem_type)));
+    const Schema schema(
+        {
+            ColumnSchema("key", INT32),
+            ColumnSchemaBuilder()
+                .name("arr")
+                .type(elem_type)
+                .nullable(true)
+                .array(true)
+                .Build(),
+        },
+        /*key_columns*/1);
+    for (auto op_type : { RowOperationsPB::INSERT,
+                          RowOperationsPB::INSERT_IGNORE,
+                          RowOperationsPB::UPSERT,
+                          RowOperationsPB::UPSERT_IGNORE,
+                          RowOperationsPB::UPDATE,
+                          RowOperationsPB::UPDATE_IGNORE, }) {
+      FLAGS_max_cell_size_bytes = 64 * 1024;
+      FLAGS_array_cell_max_elem_num = 100;
+      NO_FATALS(CheckArrayMaxElemNumLimit(schema, 100, op_type));
+      NO_FATALS(CheckArrayMaxElemNumLimit(schema, 101, op_type,
+          Status::InvalidArgument({}),
+          "too many array elements for column 'arr' (101, maximum is 100)"));
+      FLAGS_array_cell_max_elem_num = 2048;
+      NO_FATALS(CheckArrayMaxElemNumLimit(schema, 2000, op_type));
+      NO_FATALS(CheckArrayMaxElemNumLimit(schema, 8192, op_type,
+          Status::InvalidArgument({}),
+          "too many array elements for column 'arr' (8192, maximum is 2048)"));
+
+      // Run with boundary settings of --array_cell_max_elem_num: 0 and 64K.
+      FLAGS_array_cell_max_elem_num = 0;
+      NO_FATALS(CheckArrayMaxElemNumLimit(schema, 0, op_type));
+      NO_FATALS(CheckArrayMaxElemNumLimit(schema, 1, op_type,
+          Status::InvalidArgument({}),
+          "too many array elements for column 'arr' (1, maximum is 0)"));
+
+      // The limit on the cell size is enforced before the limit on the maximum
+      // number of elements in an array.
+      FLAGS_max_cell_size_bytes = 8192;
+      FLAGS_array_cell_max_elem_num = 64 * 1024;
+      NO_FATALS(CheckArrayMaxElemNumLimit(schema, 64 * 1024, op_type,
+          Status::InvalidArgument({}),
+          "value too large for column 'arr'"));
+
+      // Set the cell size limit very high to make sure it's not hit even with
+      // 64K elements in an array.
+      FLAGS_max_cell_size_bytes = 1 * 1024 * 1024;
+      NO_FATALS(CheckArrayMaxElemNumLimit(schema, 64 * 1024, op_type));
+      NO_FATALS(CheckArrayMaxElemNumLimit(schema, 64 * 1024 + 1, op_type,
+          Status::InvalidArgument({}),
+          "too many array elements for column 'arr' (65537, maximum is 
65536)"));
+    }
+  }
+}
+
 TEST_F(RowOperationsTest, SchemasDoNotMatch) {
   Schema client_schema({ ColumnSchema("key", INT32),
                          ColumnSchema("int_val", INT32) },
diff --git a/src/kudu/common/row_operations.cc 
b/src/kudu/common/row_operations.cc
index 074a7959e..3059cb9d6 100644
--- a/src/kudu/common/row_operations.cc
+++ b/src/kudu/common/row_operations.cc
@@ -26,6 +26,7 @@
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 
+#include "kudu/common/array_cell_view.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/row.h"
@@ -52,6 +53,25 @@ DEFINE_int32(max_cell_size_bytes, 64 * 1024,
              "in errors.");
 TAG_FLAG(max_cell_size_bytes, unsafe);
 
+DEFINE_uint32(array_cell_max_elem_num, 1024,
+              "The maximum number of elements in any individual cell of array "
+              "type column in a table. Attempting to store arrays of a length "
+              "greater than this will result in errors.");
+TAG_FLAG(array_cell_max_elem_num, advanced);
+TAG_FLAG(array_cell_max_elem_num, runtime);
+
+// Validate that log_min_segments_to_retain >= 1
+static bool ValidateArrayCellMaxElemNum(const char* flagname, uint32_t value) {
+  if (value > kudu::ArrayCellMetadataView::kArrayMaxElemNum) {
+    LOG(ERROR) << Substitute(
+        "'$0' set to invalid value $1; must not be greater than $2",
+        flagname, value, kudu::ArrayCellMetadataView::kArrayMaxElemNum);
+    return false;
+  }
+  return true;
+}
+DEFINE_validator(array_cell_max_elem_num, &ValidateArrayCellMaxElemNum);
+
 namespace kudu {
 
 string DecodedRowOperation::ToString(const Schema& schema) const {
@@ -290,6 +310,20 @@ Status RowOperationsPBDecoder::GetColumnSlice(const 
ColumnSchema& col,
       // validate subsequent columns and rows.
     }
     *slice = Slice(&pb_->indirect_data()[offset_in_indirect], 
ptr_slice->size());
+    if (col.is_array()) {
+      // Apply restrictions specific to array columns.
+      ArrayCellMetadataView view(slice->data(), slice->size());
+      const auto s = view.Init();
+      if (PREDICT_FALSE(!s.ok())) {
+        return Status::Corruption(
+            Substitute("column '$0': corrupted array cell data", col.name()));
+      }
+      if (PREDICT_FALSE(view.elem_num() > FLAGS_array_cell_max_elem_num)) {
+        *row_status = Status::InvalidArgument(Substitute(
+          "too many array elements for column '$0' ($1, maximum is $2)",
+          col.name(), view.elem_num(), FLAGS_array_cell_max_elem_num));
+      }
+    }
   }
   src_.remove_prefix(size);
   return Status::OK();

Reply via email to