This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new ad5446fff KUDU-1261 control the maximum number of elements in an array
ad5446fff is described below
commit ad5446fff129bf1814e8fbfe43958384959d22eb
Author: Alexey Serbin <[email protected]>
AuthorDate: Fri Oct 10 15:56:58 2025 -0700
KUDU-1261 control the maximum number of elements in an array
This patch introduces a new --array_cell_max_elem_num runtime flag
to control the maximum number of array elements that can be written
into a Kudu table when ingesting data via RPC. Corresponding unit
tests scenarios are added as well.
Change-Id: I63c0f9bd09723c15b2666fe5f5e683f8c85cf3a7
Reviewed-on: http://gerrit.cloudera.org:8080/23527
Reviewed-by: Abhishek Chennaka <[email protected]>
Tested-by: Alexey Serbin <[email protected]>
---
src/kudu/common/array_cell_view.h | 4 +-
src/kudu/common/row_operations-test.cc | 156 +++++++++++++++++++++++++++++++++
src/kudu/common/row_operations.cc | 34 +++++++
3 files changed, 193 insertions(+), 1 deletion(-)
diff --git a/src/kudu/common/array_cell_view.h
b/src/kudu/common/array_cell_view.h
index 96d1c7ccc..fdbd60b4f 100644
--- a/src/kudu/common/array_cell_view.h
+++ b/src/kudu/common/array_cell_view.h
@@ -82,6 +82,8 @@ constexpr serdes::ScalarArray KuduToScalarArrayType(DataType
data_type) {
class ArrayCellMetadataView final {
public:
+ static constexpr const size_t kArrayMaxElemNum = 65536;
+
// buf: data raw pointer
// len: size of the buffer (bytes) pointed at by the 'buf' pointer
ArrayCellMetadataView(const uint8_t* buf, const size_t size)
@@ -123,7 +125,7 @@ class ArrayCellMetadataView final {
// comparison criteria in the flatbuffers' logic that asserts the
// buffers size restriction
opt.max_depth = 3;
- opt.max_tables = 65536 + 1;
+ opt.max_tables = kArrayMaxElemNum + 1;
opt.max_size = size_ + 1;
flatbuffers::Verifier verifier(data_, size_, opt);
diff --git a/src/kudu/common/row_operations-test.cc
b/src/kudu/common/row_operations-test.cc
index a81f6d9f1..808f4ebe6 100644
--- a/src/kudu/common/row_operations-test.cc
+++ b/src/kudu/common/row_operations-test.cc
@@ -21,6 +21,7 @@
#include <cstdlib>
#include <initializer_list>
#include <memory>
+#include <optional>
#include <ostream>
#include <string>
#include <type_traits>
@@ -47,12 +48,15 @@
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
+using std::nullopt;
+using std::optional;
using std::shared_ptr;
using std::string;
using std::vector;
using strings::Substitute;
DECLARE_int32(max_cell_size_bytes);
+DECLARE_uint32(array_cell_max_elem_num);
namespace kudu {
@@ -948,6 +952,98 @@ void CheckSplitExceedCellLimit(const Schema& client_schema,
NO_FATALS(CheckExceedCellLimit(client_schema, col_values, op_type,
expect_status, expect_msg));
}
}
+
+void CheckArrayMaxElemNumLimit(const Schema& schema,
+ size_t elem_num,
+ RowOperationsPB::Type op_type,
+ const Status& expected_status = Status::OK(),
+ const optional<string>& expected_msg = nullopt)
{
+ switch (op_type) {
+ case RowOperationsPB::INSERT:
+ case RowOperationsPB::UPDATE:
+ case RowOperationsPB::DELETE:
+ case RowOperationsPB::UPSERT:
+ case RowOperationsPB::INSERT_IGNORE:
+ case RowOperationsPB::UPDATE_IGNORE:
+ case RowOperationsPB::DELETE_IGNORE:
+ case RowOperationsPB::UPSERT_IGNORE:
+ break;
+ default:
+ LOG(FATAL) << "unsupported op_type: " <<
RowOperationsPB::Type_Name(op_type);
+ break; // unreachable
+ }
+
+ KuduPartialRow row(&schema);
+ for (size_t i = 0; i < schema.num_key_columns(); ++i) {
+ const ColumnSchema& cs = schema.column(i);
+ switch (cs.type_info()->type()) {
+ case INT32:
+ ASSERT_OK(row.SetInt32(i, 0));
+ break;
+ default:
+ LOG(FATAL) << "non-INT32 key column";
+ break; // unreachable
+ }
+ }
+ for (size_t i = schema.num_key_columns(); i < schema.num_columns(); ++i) {
+ if (op_type == RowOperationsPB::DELETE || op_type ==
RowOperationsPB::DELETE_IGNORE) {
+ // DELETE should not have a value for non-key column.
+ break;
+ }
+ const ColumnSchema& cs = schema.column(i);
+ if (!cs.type_info()->is_array()) {
+ // Testing only array cells, so any non-key non-array column supposed to
+ // be nullable and is not set here.
+ continue;
+ }
+
+ const TypeInfo* elem_type_info = GetArrayElementTypeInfo(*cs.type_info());
+ ASSERT_NE(nullptr, elem_type_info);
+ switch (elem_type_info->type()) {
+ case BOOL:
+ ASSERT_OK(row.SetArrayBool(
+ i, vector<bool>(elem_num, true), vector<bool>(elem_num, true)));
+ break;
+ case INT32:
+ ASSERT_OK(row.SetArrayInt32(
+ i, vector<int32_t>(elem_num, 0), vector<bool>(elem_num, true)));
+ break;
+ case DOUBLE:
+ ASSERT_OK(row.SetArrayDouble(
+ i, vector<double>(elem_num, 0), vector<bool>(elem_num, true)));
+ break;
+ case STRING:
+ ASSERT_OK(row.SetArrayString(
+ i, vector<Slice>(elem_num, Slice()), vector<bool>(elem_num,
true)));
+ break;
+ default:
+ LOG(FATAL) << Substitute("not implemented for arrays of type $0",
+ DataType_Name(elem_type_info->type()));
+ break; // unreachable
+ }
+ }
+
+ RowOperationsPB pb;
+ RowOperationsPBEncoder(&pb).Add(op_type, row);
+
+ Arena arena(1024 * 1024);
+ Schema result_schema = schema.CopyWithColumnIds();
+ RowOperationsPBDecoder decoder(&pb, &schema, &result_schema, &arena);
+
+ vector<DecodedRowOperation> ops;
+ const auto s = decoder.DecodeOperations<WRITE_OPS>(&ops);
+ ASSERT_OK(s);
+
+ for (const auto& op : ops) {
+ SCOPED_TRACE(Substitute("op_type $0",
RowOperationsPB::Type_Name(op_type)));
+ ASSERT_EQ(expected_status.CodeAsString(), op.result.CodeAsString())
+ << op.result.message().ToString();
+ if (expected_msg.has_value()) {
+ ASSERT_STR_CONTAINS(op.result.ToString(), expected_msg.value());
+ }
+ }
+}
+
} // anonymous namespace
// Decodes a split row using RowOperationsPBDecoder with
DecoderMode::SPLIT_ROWS under
@@ -1016,6 +1112,66 @@ TEST_F(RowOperationsTest, ExceedCellLimit) {
}
}
+// Make sure it's possible to update the setting for --array_cell_max_elem_num
+// in runtime and it effectively limits how many elements can in one as
expected.
+TEST_F(RowOperationsTest, ArrayCellElementNumberLimit) {
+ for (const DataType elem_type : { BOOL, INT32, DOUBLE, STRING }) {
+ SCOPED_TRACE(Substitute("array element type $0",
DataType_Name(elem_type)));
+ const Schema schema(
+ {
+ ColumnSchema("key", INT32),
+ ColumnSchemaBuilder()
+ .name("arr")
+ .type(elem_type)
+ .nullable(true)
+ .array(true)
+ .Build(),
+ },
+ /*key_columns*/1);
+ for (auto op_type : { RowOperationsPB::INSERT,
+ RowOperationsPB::INSERT_IGNORE,
+ RowOperationsPB::UPSERT,
+ RowOperationsPB::UPSERT_IGNORE,
+ RowOperationsPB::UPDATE,
+ RowOperationsPB::UPDATE_IGNORE, }) {
+ FLAGS_max_cell_size_bytes = 64 * 1024;
+ FLAGS_array_cell_max_elem_num = 100;
+ NO_FATALS(CheckArrayMaxElemNumLimit(schema, 100, op_type));
+ NO_FATALS(CheckArrayMaxElemNumLimit(schema, 101, op_type,
+ Status::InvalidArgument({}),
+ "too many array elements for column 'arr' (101, maximum is 100)"));
+ FLAGS_array_cell_max_elem_num = 2048;
+ NO_FATALS(CheckArrayMaxElemNumLimit(schema, 2000, op_type));
+ NO_FATALS(CheckArrayMaxElemNumLimit(schema, 8192, op_type,
+ Status::InvalidArgument({}),
+ "too many array elements for column 'arr' (8192, maximum is 2048)"));
+
+ // Run with boundary settings of --array_cell_max_elem_num: 0 and 64K.
+ FLAGS_array_cell_max_elem_num = 0;
+ NO_FATALS(CheckArrayMaxElemNumLimit(schema, 0, op_type));
+ NO_FATALS(CheckArrayMaxElemNumLimit(schema, 1, op_type,
+ Status::InvalidArgument({}),
+ "too many array elements for column 'arr' (1, maximum is 0)"));
+
+ // The limit on the cell size is enforced before the limit on the maximum
+ // number of elements in an array.
+ FLAGS_max_cell_size_bytes = 8192;
+ FLAGS_array_cell_max_elem_num = 64 * 1024;
+ NO_FATALS(CheckArrayMaxElemNumLimit(schema, 64 * 1024, op_type,
+ Status::InvalidArgument({}),
+ "value too large for column 'arr'"));
+
+ // Set the cell size limit very high to make sure it's not hit even with
+ // 64K elements in an array.
+ FLAGS_max_cell_size_bytes = 1 * 1024 * 1024;
+ NO_FATALS(CheckArrayMaxElemNumLimit(schema, 64 * 1024, op_type));
+ NO_FATALS(CheckArrayMaxElemNumLimit(schema, 64 * 1024 + 1, op_type,
+ Status::InvalidArgument({}),
+ "too many array elements for column 'arr' (65537, maximum is
65536)"));
+ }
+ }
+}
+
TEST_F(RowOperationsTest, SchemasDoNotMatch) {
Schema client_schema({ ColumnSchema("key", INT32),
ColumnSchema("int_val", INT32) },
diff --git a/src/kudu/common/row_operations.cc
b/src/kudu/common/row_operations.cc
index 074a7959e..3059cb9d6 100644
--- a/src/kudu/common/row_operations.cc
+++ b/src/kudu/common/row_operations.cc
@@ -26,6 +26,7 @@
#include <gflags/gflags.h>
#include <glog/logging.h>
+#include "kudu/common/array_cell_view.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/row.h"
@@ -52,6 +53,25 @@ DEFINE_int32(max_cell_size_bytes, 64 * 1024,
"in errors.");
TAG_FLAG(max_cell_size_bytes, unsafe);
+DEFINE_uint32(array_cell_max_elem_num, 1024,
+ "The maximum number of elements in any individual cell of array "
+ "type column in a table. Attempting to store arrays of a length "
+ "greater than this will result in errors.");
+TAG_FLAG(array_cell_max_elem_num, advanced);
+TAG_FLAG(array_cell_max_elem_num, runtime);
+
+// Validate that log_min_segments_to_retain >= 1
+static bool ValidateArrayCellMaxElemNum(const char* flagname, uint32_t value) {
+ if (value > kudu::ArrayCellMetadataView::kArrayMaxElemNum) {
+ LOG(ERROR) << Substitute(
+ "'$0' set to invalid value $1; must not be greater than $2",
+ flagname, value, kudu::ArrayCellMetadataView::kArrayMaxElemNum);
+ return false;
+ }
+ return true;
+}
+DEFINE_validator(array_cell_max_elem_num, &ValidateArrayCellMaxElemNum);
+
namespace kudu {
string DecodedRowOperation::ToString(const Schema& schema) const {
@@ -290,6 +310,20 @@ Status RowOperationsPBDecoder::GetColumnSlice(const
ColumnSchema& col,
// validate subsequent columns and rows.
}
*slice = Slice(&pb_->indirect_data()[offset_in_indirect],
ptr_slice->size());
+ if (col.is_array()) {
+ // Apply restrictions specific to array columns.
+ ArrayCellMetadataView view(slice->data(), slice->size());
+ const auto s = view.Init();
+ if (PREDICT_FALSE(!s.ok())) {
+ return Status::Corruption(
+ Substitute("column '$0': corrupted array cell data", col.name()));
+ }
+ if (PREDICT_FALSE(view.elem_num() > FLAGS_array_cell_max_elem_num)) {
+ *row_status = Status::InvalidArgument(Substitute(
+ "too many array elements for column '$0' ($1, maximum is $2)",
+ col.name(), view.elem_num(), FLAGS_array_cell_max_elem_num));
+ }
+ }
}
src_.remove_prefix(size);
return Status::OK();