This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 6670bd66b KUDU-1261 Add array type support to Python client
6670bd66b is described below
commit 6670bd66bfefaf4c2e916602c2f2c07e1acd1078
Author: Marton Greber <[email protected]>
AuthorDate: Tue Sep 30 23:17:17 2025 +0200
KUDU-1261 Add array type support to Python client
Implement full array type support for Python client including:
- Array column definition via array_type() and nested_type()
- Read operations (Row.get_array) with schema introspection
- Write operations (PartialRow._set_array) for insert/update/upsert
- Support for 11 element types: int8/16/32/64, float, double, bool,
string, binary, varchar, unixtime_micros, date
- NULL element handling via Python None
C++ API bindings:
- GetArray*/SetArray* methods for all supported types
- Type-safe schema introspection via KuduNestedTypeDescriptor
Testing:
- 6 integration tests (test_array_datatype.py)
- 3 schema tests (test_schema.py)
- Coverage for CRUD operations, empty arrays, NULL elements
TODO for follow-up:
- Decimal arrays: blocked on C++ API int128 overload support
(existing decimal uses int128, but SetArrayDecimal only has
int32/int64 overloads - needs API design review)
Change-Id: I2329c7466534bd4961860c05b600e7d4b4a11507
Reviewed-on: http://gerrit.cloudera.org:8080/23485
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Abhishek Chennaka <[email protected]>
Reviewed-by: Alexey Serbin <[email protected]>
---
python/kudu/__init__.py | 1 +
python/kudu/client.pxd | 4 +
python/kudu/client.pyx | 340 +++++++++++++++++++++++++++++++
python/kudu/libkudu_client.pxd | 138 +++++++++++++
python/kudu/schema.pyx | 135 +++++++++++-
python/kudu/tests/test_array_datatype.py | 315 ++++++++++++++++++++++++++++
python/kudu/tests/test_schema.py | 153 ++++++++++++++
7 files changed, 1084 insertions(+), 2 deletions(-)
diff --git a/python/kudu/__init__.py b/python/kudu/__init__.py
index f932f6bcc..e6d3a4bf2 100644
--- a/python/kudu/__init__.py
+++ b/python/kudu/__init__.py
@@ -47,6 +47,7 @@ from kudu.schema import (int8, int16, int32, int64, string_
as string, # noqa
date,
KuduType,
SchemaBuilder, ColumnSpec, Schema, ColumnSchema,
+ array_type,
COMPRESSION_DEFAULT,
COMPRESSION_NONE,
COMPRESSION_SNAPPY,
diff --git a/python/kudu/client.pxd b/python/kudu/client.pxd
index f28a0ce23..e146f98f2 100644
--- a/python/kudu/client.pxd
+++ b/python/kudu/client.pxd
@@ -53,6 +53,10 @@ cdef class PartialRow:
cpdef set_loc_null(self, int i)
+ cdef DataType _get_array_element_type(self, int col_idx)
+
+ cdef _set_array(self, int i, value)
+
cdef add_to_session(self, Session s)
diff --git a/python/kudu/client.pyx b/python/kudu/client.pyx
index 861346b22..16e2ab3e0 100644
--- a/python/kudu/client.pyx
+++ b/python/kudu/client.pyx
@@ -1915,10 +1915,161 @@ cdef class Row:
return frombytes(self.get_varchar(i))
elif t == KUDU_DATE:
return self.get_date(i)
+ elif t == KUDU_NESTED:
+ return self.get_array(i)
else:
raise TypeError("Cannot get kudu type <{0}>"
.format(_type_names[t]))
+ cdef DataType _get_scan_array_element_type(self, int i):
+ """
+ Extract array element type using projection schema (Python wrapper).
+ """
+ if self.parent.projection_schema is None:
+ raise TypeError("No projection schema available")
+
+ # Use the same safe pattern as PartialRow._get_array_element_type
+ col = self.parent.projection_schema[i]
+ return self._extract_element_type_from_column(col)
+
+ cdef DataType _extract_element_type_from_column(self, ColumnSchema col):
+ """
+ Helper to extract element type from a Python ColumnSchema wrapper.
+ """
+ cdef:
+ const KuduNestedTypeDescriptor* nested_desc
+ const KuduArrayTypeDescriptor* array_desc
+
+ # Safe access via Python wrapper (col owns the C++ object)
+ nested_desc = col.schema.nested_type()
+
+ if nested_desc == NULL:
+ raise TypeError("Column '{0}' is not a nested
type".format(col.name))
+
+ if not nested_desc.is_array():
+ raise TypeError("Column '{0}' is nested but not an
array".format(col.name))
+
+ array_desc = nested_desc.array()
+ if array_desc == NULL:
+ raise TypeError("Column '{0}' array descriptor is
NULL".format(col.name))
+
+ return array_desc.type()
+
+ cdef list _get_array_by_type(self, int i, DataType elem_type):
+ """
+ Get array using the known element type (from schema introspection).
+ """
+ cdef:
+ vector[c_bool] cpp_data_bool
+ vector[int8_t] cpp_data_int8
+ vector[int16_t] cpp_data_int16
+ vector[int32_t] cpp_data_int32
+ vector[int64_t] cpp_data_int64
+ vector[float] cpp_data_float
+ vector[double] cpp_data_double
+ vector[Slice] cpp_data_slice
+ vector[c_bool] cpp_validity
+ size_t j
+ list result
+
+ if elem_type == KUDU_INT8:
+ check_status(self.row.GetArrayInt8(i, &cpp_data_int8,
&cpp_validity))
+ result = []
+ for j in range(cpp_data_int8.size()):
+ result.append(cpp_data_int8[j] if cpp_validity[j] else None)
+ return result
+
+ elif elem_type == KUDU_INT16:
+ check_status(self.row.GetArrayInt16(i, &cpp_data_int16,
&cpp_validity))
+ result = []
+ for j in range(cpp_data_int16.size()):
+ result.append(cpp_data_int16[j] if cpp_validity[j] else None)
+ return result
+
+ elif elem_type == KUDU_INT32:
+ check_status(self.row.GetArrayInt32(i, &cpp_data_int32,
&cpp_validity))
+ result = []
+ for j in range(cpp_data_int32.size()):
+ result.append(cpp_data_int32[j] if cpp_validity[j] else None)
+ return result
+
+ elif elem_type == KUDU_INT64:
+ check_status(self.row.GetArrayInt64(i, &cpp_data_int64,
&cpp_validity))
+ result = []
+ for j in range(cpp_data_int64.size()):
+ result.append(cpp_data_int64[j] if cpp_validity[j] else None)
+ return result
+
+ elif elem_type == KUDU_FLOAT:
+ check_status(self.row.GetArrayFloat(i, &cpp_data_float,
&cpp_validity))
+ result = []
+ for j in range(cpp_data_float.size()):
+ result.append(cpp_data_float[j] if cpp_validity[j] else None)
+ return result
+
+ elif elem_type == KUDU_DOUBLE:
+ check_status(self.row.GetArrayDouble(i, &cpp_data_double,
&cpp_validity))
+ result = []
+ for j in range(cpp_data_double.size()):
+ result.append(cpp_data_double[j] if cpp_validity[j] else None)
+ return result
+
+ elif elem_type == KUDU_BOOL:
+ check_status(self.row.GetArrayBool(i, &cpp_data_bool,
&cpp_validity))
+ result = []
+ for j in range(cpp_data_bool.size()):
+ result.append(bool(cpp_data_bool[j]) if cpp_validity[j] else
None)
+ return result
+
+ elif elem_type == KUDU_STRING:
+ check_status(self.row.GetArrayString(i, &cpp_data_slice,
&cpp_validity))
+ result = []
+ for j in range(cpp_data_slice.size()):
+ result.append(frombytes(cpp_data_slice[j].ToString()) if
cpp_validity[j] else None)
+ return result
+
+ elif elem_type == KUDU_BINARY:
+ check_status(self.row.GetArrayBinary(i, &cpp_data_slice,
&cpp_validity))
+ result = []
+ for j in range(cpp_data_slice.size()):
+
result.append(cpp_data_slice[j].data()[:cpp_data_slice[j].size()] if
cpp_validity[j] else None)
+ return result
+
+ elif elem_type == KUDU_VARCHAR:
+ check_status(self.row.GetArrayVarchar(i, &cpp_data_slice,
&cpp_validity))
+ result = []
+ for j in range(cpp_data_slice.size()):
+ result.append(frombytes(cpp_data_slice[j].ToString()) if
cpp_validity[j] else None)
+ return result
+
+ elif elem_type == KUDU_UNIXTIME_MICROS:
+ check_status(self.row.GetArrayUnixTimeMicros(i, &cpp_data_int64,
&cpp_validity))
+ result = []
+ for j in range(cpp_data_int64.size()):
+ result.append(from_unixtime_micros(cpp_data_int64[j]) if
cpp_validity[j] else None)
+ return result
+
+ elif elem_type == KUDU_DATE:
+ check_status(self.row.GetArrayDate(i, &cpp_data_int32,
&cpp_validity))
+ result = []
+ for j in range(cpp_data_int32.size()):
+ result.append(unix_epoch_days_to_date(cpp_data_int32[j]) if
cpp_validity[j] else None)
+ return result
+
+ else:
+ raise TypeError("Unsupported array element type {0}".format(
+ _type_names.get(elem_type, elem_type)))
+
+ cdef get_array(self, int i):
+ """
+ Get the array value from column i.
+ Returns a Python list with None for NULL elements.
+ """
+ cdef DataType elem_type
+
+ elem_type = self._get_scan_array_element_type(i)
+ return self._get_array_by_type(i, elem_type)
+
cdef inline bint is_null(self, int i):
return self.row.IsNull(i)
@@ -1931,6 +2082,9 @@ cdef class RowBatch:
cdef:
KuduScanBatch batch
+ # Python Schema wrapper for array type introspection
+ Schema projection_schema
+
def __len__(self):
return self.batch.NumRows()
@@ -2368,6 +2522,8 @@ cdef class Scanner:
cdef RowBatch batch = RowBatch()
check_status(self.scanner.NextBatch(&batch.batch))
+ # Pass the projection schema to enable schema introspection in Row
+ batch.projection_schema = self.get_projection_schema()
return batch
def set_cache_blocks(self, cache_blocks):
@@ -3129,9 +3285,193 @@ cdef class PartialRow:
check_status(self.row.SetUnscaledDecimal(i,
<int128_t>to_unscaled_decimal(value)))
ELSE:
raise KuduException("The decimal type is not supported when
GCC version is < 4.6.0" % self)
+ elif t == KUDU_NESTED:
+ self._set_array(i, value)
else:
raise TypeError("Cannot set kudu type
<{0}>.".format(_type_names[t]))
+ cdef DataType _get_array_element_type(self, int col_idx):
+ """
+ Extract array element type from schema for column at index col_idx.
+ Returns the DataType of the array elements.
+ """
+ cdef:
+ ColumnSchema py_col
+ const KuduNestedTypeDescriptor* nested_desc
+ const KuduArrayTypeDescriptor* array_desc
+
+ # Use Python wrapper which properly manages the C++ KuduColumnSchema
+ py_col = self.schema[col_idx]
+
+ # Access nested_type through the pointer (py_col.schema is
KuduColumnSchema*)
+ nested_desc = py_col.schema.nested_type()
+
+ if nested_desc == NULL:
+ raise TypeError("Column '{0}' is not a nested
type".format(py_col.name))
+
+ if not nested_desc.is_array():
+ raise TypeError("Column '{0}' is nested but not an
array".format(py_col.name))
+
+ array_desc = nested_desc.array()
+ if array_desc == NULL:
+ raise TypeError("Column '{0}' array descriptor is
NULL".format(py_col.name))
+
+ return array_desc.type()
+
+ cdef _set_array(self, int i, value):
+ """
+ Set the array value for column i.
+
+ The value should be a list or tuple. Elements can be None to indicate
NULL.
+ """
+ cdef:
+ vector[c_bool] cpp_values_bool
+ vector[int8_t] cpp_values_int8
+ vector[int16_t] cpp_values_int16
+ vector[int32_t] cpp_values_int32
+ vector[int64_t] cpp_values_int64
+ vector[float] cpp_values_float
+ vector[double] cpp_values_double
+ vector[Slice] cpp_values_slice
+ vector[c_bool] cpp_validity
+ Slice slc
+ bytes encoded_str
+ DataType elem_type
+
+ if not isinstance(value, (list, tuple)):
+ raise TypeError("Array values must be a list or tuple, got
{0}".format(type(value)))
+
+ for elem in value:
+ cpp_validity.push_back(elem is not None)
+
+ # Get element type from schema via copy constructor (safe)
+ elem_type = self._get_array_element_type(i)
+
+ if elem_type == KUDU_BOOL:
+ for elem in value:
+ if elem is None:
+ # Dummy value for NULL
+ cpp_values_bool.push_back(False)
+ else:
+ cpp_values_bool.push_back(<c_bool>elem)
+ check_status(self.row.SetArrayBool(i, cpp_values_bool,
cpp_validity))
+
+ elif elem_type == KUDU_INT8:
+ for elem in value:
+ if elem is None:
+ cpp_values_int8.push_back(0)
+ else:
+ cpp_values_int8.push_back(<int8_t>elem)
+ check_status(self.row.SetArrayInt8(i, cpp_values_int8,
cpp_validity))
+
+ elif elem_type == KUDU_INT16:
+ for elem in value:
+ if elem is None:
+ cpp_values_int16.push_back(0)
+ else:
+ cpp_values_int16.push_back(<int16_t>elem)
+ check_status(self.row.SetArrayInt16(i, cpp_values_int16,
cpp_validity))
+
+ elif elem_type == KUDU_INT32:
+ for elem in value:
+ if elem is None:
+ cpp_values_int32.push_back(0)
+ else:
+ cpp_values_int32.push_back(<int32_t>elem)
+ check_status(self.row.SetArrayInt32(i, cpp_values_int32,
cpp_validity))
+
+ elif elem_type == KUDU_INT64:
+ for elem in value:
+ if elem is None:
+ cpp_values_int64.push_back(0)
+ else:
+ cpp_values_int64.push_back(<int64_t>elem)
+ check_status(self.row.SetArrayInt64(i, cpp_values_int64,
cpp_validity))
+
+ elif elem_type == KUDU_FLOAT:
+ for elem in value:
+ if elem is None:
+ cpp_values_float.push_back(0.0)
+ else:
+ cpp_values_float.push_back(<float>elem)
+ check_status(self.row.SetArrayFloat(i, cpp_values_float,
cpp_validity))
+
+ elif elem_type == KUDU_DOUBLE:
+ for elem in value:
+ if elem is None:
+ cpp_values_double.push_back(0.0)
+ else:
+ cpp_values_double.push_back(<double>elem)
+ check_status(self.row.SetArrayDouble(i, cpp_values_double,
cpp_validity))
+
+ elif elem_type == KUDU_STRING:
+ encoded_strings = []
+ for elem in value:
+ if elem is None:
+ slc = Slice(<char*>"", 0)
+ cpp_values_slice.push_back(slc)
+ encoded_strings.append(b"")
+ else:
+ if isinstance(elem, unicode):
+ encoded_str = elem.encode('utf8')
+ else:
+ encoded_str = elem
+ encoded_strings.append(encoded_str)
+ slc = Slice(<char*>encoded_str, len(encoded_str))
+ cpp_values_slice.push_back(slc)
+ check_status(self.row.SetArrayString(i, cpp_values_slice,
cpp_validity))
+
+ elif elem_type == KUDU_BINARY:
+ binary_data = []
+ for elem in value:
+ if elem is None:
+ slc = Slice(<char*>"", 0)
+ cpp_values_slice.push_back(slc)
+ binary_data.append(b"")
+ else:
+ if isinstance(elem, unicode):
+ raise TypeError("Unicode objects must be explicitly
encoded before storing in Binary array.")
+ binary_data.append(elem)
+ slc = Slice(<char*>elem, len(elem))
+ cpp_values_slice.push_back(slc)
+ check_status(self.row.SetArrayBinary(i, cpp_values_slice,
cpp_validity))
+
+ elif elem_type == KUDU_VARCHAR:
+ encoded_strings = []
+ for elem in value:
+ if elem is None:
+ slc = Slice(<char*>"", 0)
+ cpp_values_slice.push_back(slc)
+ encoded_strings.append(b"")
+ else:
+ if isinstance(elem, unicode):
+ encoded_str = elem.encode('utf8')
+ else:
+ encoded_str = elem
+ encoded_strings.append(encoded_str)
+ slc = Slice(<char*>encoded_str, len(encoded_str))
+ cpp_values_slice.push_back(slc)
+ check_status(self.row.SetArrayVarchar(i, cpp_values_slice,
cpp_validity))
+
+ elif elem_type == KUDU_UNIXTIME_MICROS:
+ for elem in value:
+ if elem is None:
+ cpp_values_int64.push_back(0)
+ else:
+
cpp_values_int64.push_back(<int64_t>to_unixtime_micros(elem))
+ check_status(self.row.SetArrayUnixTimeMicros(i, cpp_values_int64,
cpp_validity))
+
+ elif elem_type == KUDU_DATE:
+ for elem in value:
+ if elem is None:
+ cpp_values_int32.push_back(0)
+ else:
+ val = date_to_unix_epoch_days(elem)
+ cpp_values_int32.push_back(<int32_t>val)
+ check_status(self.row.SetArrayDate(i, cpp_values_int32,
cpp_validity))
+ else:
+ raise TypeError("Unsupported array element type:
{0}".format(_type_names.get(elem_type, elem_type)))
+
cpdef set_field_null(self, key):
pass
diff --git a/python/kudu/libkudu_client.pxd b/python/kudu/libkudu_client.pxd
index 4fd96ded3..6f1fd5f23 100644
--- a/python/kudu/libkudu_client.pxd
+++ b/python/kudu/libkudu_client.pxd
@@ -127,6 +127,7 @@ cdef extern from "kudu/client/schema.h" namespace
"kudu::client" nogil:
KUDU_DECIMAL " kudu::client::KuduColumnSchema::DECIMAL"
KUDU_VARCHAR " kudu::client::KuduColumnSchema::VARCHAR"
KUDU_DATE " kudu::client::KuduColumnSchema::DATE"
+ KUDU_NESTED " kudu::client::KuduColumnSchema::NESTED"
enum EncodingType"
kudu::client::KuduColumnStorageAttributes::EncodingType":
EncodingType_AUTO "
kudu::client::KuduColumnStorageAttributes::AUTO_ENCODING"
@@ -163,6 +164,16 @@ cdef extern from "kudu/client/schema.h" namespace
"kudu::client" nogil:
c_bool Equals(KuduColumnTypeAttributes& other)
void CopyFrom(KuduColumnTypeAttributes& other)
+ cdef cppclass KuduArrayTypeDescriptor"
kudu::client::KuduColumnSchema::KuduArrayTypeDescriptor":
+ KuduArrayTypeDescriptor(DataType element_type)
+ DataType type()
+
+ cdef cppclass KuduNestedTypeDescriptor"
kudu::client::KuduColumnSchema::KuduNestedTypeDescriptor":
+ KuduNestedTypeDescriptor(const KuduArrayTypeDescriptor& desc)
+ KuduNestedTypeDescriptor(const KuduNestedTypeDescriptor& other)
+ c_bool is_array()
+ const KuduArrayTypeDescriptor* array()
+
cdef cppclass KuduColumnSchema:
KuduColumnSchema(const KuduColumnSchema& other)
KuduColumnSchema(const string& name, DataType type)
@@ -176,6 +187,7 @@ cdef extern from "kudu/client/schema.h" namespace
"kudu::client" nogil:
c_bool is_immutable()
KuduColumnTypeAttributes type_attributes()
string& comment()
+ const KuduNestedTypeDescriptor* nested_type()
c_bool Equals(KuduColumnSchema& other)
void CopyFrom(KuduColumnSchema& other)
@@ -217,6 +229,8 @@ cdef extern from "kudu/client/schema.h" namespace
"kudu::client" nogil:
KuduColumnSpec* Scale(int8_t scale);
KuduColumnSpec* Length(uint16_t length);
+ KuduColumnSpec* NestedType(const KuduNestedTypeDescriptor& type_info)
+
KuduColumnSpec* RenameTo(const string& new_name)
KuduColumnSpec* Comment(const string& comment)
@@ -285,6 +299,67 @@ cdef extern from "kudu/client/scan_batch.h" namespace
"kudu::client" nogil:
Status GetDate(Slice& col_name, int32_t* val)
Status GetDate(int col_idx, int32_t* val)
+ # Array getters
+ Status GetArrayBool(const Slice& col_name, vector[c_bool]* data,
+ vector[c_bool]* validity)
+ Status GetArrayBool(int col_idx, vector[c_bool]* data,
+ vector[c_bool]* validity)
+
+ Status GetArrayInt8(const Slice& col_name, vector[int8_t]* data,
+ vector[c_bool]* validity)
+ Status GetArrayInt8(int col_idx, vector[int8_t]* data,
+ vector[c_bool]* validity)
+
+ Status GetArrayInt16(const Slice& col_name, vector[int16_t]* data,
+ vector[c_bool]* validity)
+ Status GetArrayInt16(int col_idx, vector[int16_t]* data,
+ vector[c_bool]* validity)
+
+ Status GetArrayInt32(const Slice& col_name, vector[int32_t]* data,
+ vector[c_bool]* validity)
+ Status GetArrayInt32(int col_idx, vector[int32_t]* data,
+ vector[c_bool]* validity)
+
+ Status GetArrayInt64(const Slice& col_name, vector[int64_t]* data,
+ vector[c_bool]* validity)
+ Status GetArrayInt64(int col_idx, vector[int64_t]* data,
+ vector[c_bool]* validity)
+
+ Status GetArrayFloat(const Slice& col_name, vector[float]* data,
+ vector[c_bool]* validity)
+ Status GetArrayFloat(int col_idx, vector[float]* data,
+ vector[c_bool]* validity)
+
+ Status GetArrayDouble(const Slice& col_name, vector[double]* data,
+ vector[c_bool]* validity)
+ Status GetArrayDouble(int col_idx, vector[double]* data,
+ vector[c_bool]* validity)
+
+ Status GetArrayString(const Slice& col_name, vector[Slice]* data,
+ vector[c_bool]* validity)
+ Status GetArrayString(int col_idx, vector[Slice]* data,
+ vector[c_bool]* validity)
+
+ Status GetArrayBinary(const Slice& col_name, vector[Slice]* data,
+ vector[c_bool]* validity)
+ Status GetArrayBinary(int col_idx, vector[Slice]* data,
+ vector[c_bool]* validity)
+
+ Status GetArrayVarchar(const Slice& col_name, vector[Slice]* data,
+ vector[c_bool]* validity)
+ Status GetArrayVarchar(int col_idx, vector[Slice]* data,
+ vector[c_bool]* validity)
+
+ Status GetArrayUnixTimeMicros(const Slice& col_name, vector[int64_t]*
data,
+ vector[c_bool]* validity)
+ Status GetArrayUnixTimeMicros(int col_idx, vector[int64_t]* data,
+ vector[c_bool]* validity)
+
+ Status GetArrayDate(const Slice& col_name, vector[int32_t]* data,
+ vector[c_bool]* validity)
+ Status GetArrayDate(int col_idx, vector[int32_t]* data,
+ vector[c_bool]* validity)
+
const void* cell(int col_idx)
string ToString()
@@ -392,6 +467,69 @@ cdef extern from "kudu/common/partial_row.h" namespace
"kudu" nogil:
Status Unset(Slice& col_name)
Status Unset(int col_idx)
+ #----------------------------------------------------------------------
+ # Array Setters
+
+ Status SetArrayBool(const Slice& col_name, const vector[c_bool]& val,
+ const vector[c_bool]& validity)
+ Status SetArrayBool(int col_idx, const vector[c_bool]& val,
+ const vector[c_bool]& validity)
+
+ Status SetArrayInt8(const Slice& col_name, const vector[int8_t]& val,
+ const vector[c_bool]& validity)
+ Status SetArrayInt8(int col_idx, const vector[int8_t]& val,
+ const vector[c_bool]& validity)
+
+ Status SetArrayInt16(const Slice& col_name, const vector[int16_t]& val,
+ const vector[c_bool]& validity)
+ Status SetArrayInt16(int col_idx, const vector[int16_t]& val,
+ const vector[c_bool]& validity)
+
+ Status SetArrayInt32(const Slice& col_name, const vector[int32_t]& val,
+ const vector[c_bool]& validity)
+ Status SetArrayInt32(int col_idx, const vector[int32_t]& val,
+ const vector[c_bool]& validity)
+
+ Status SetArrayInt64(const Slice& col_name, const vector[int64_t]& val,
+ const vector[c_bool]& validity)
+ Status SetArrayInt64(int col_idx, const vector[int64_t]& val,
+ const vector[c_bool]& validity)
+
+ Status SetArrayFloat(const Slice& col_name, const vector[float]& val,
+ const vector[c_bool]& validity)
+ Status SetArrayFloat(int col_idx, const vector[float]& val,
+ const vector[c_bool]& validity)
+
+ Status SetArrayDouble(const Slice& col_name, const vector[double]& val,
+ const vector[c_bool]& validity)
+ Status SetArrayDouble(int col_idx, const vector[double]& val,
+ const vector[c_bool]& validity)
+
+ Status SetArrayString(const Slice& col_name, const vector[Slice]& val,
+ const vector[c_bool]& validity)
+ Status SetArrayString(int col_idx, const vector[Slice]& val,
+ const vector[c_bool]& validity)
+
+ Status SetArrayBinary(const Slice& col_name, const vector[Slice]& val,
+ const vector[c_bool]& validity)
+ Status SetArrayBinary(int col_idx, const vector[Slice]& val,
+ const vector[c_bool]& validity)
+
+ Status SetArrayVarchar(const Slice& col_name, const vector[Slice]& val,
+ const vector[c_bool]& validity)
+ Status SetArrayVarchar(int col_idx, const vector[Slice]& val,
+ const vector[c_bool]& validity)
+
+ Status SetArrayUnixTimeMicros(const Slice& col_name, const
vector[int64_t]& val,
+ const vector[c_bool]& validity)
+ Status SetArrayUnixTimeMicros(int col_idx, const vector[int64_t]& val,
+ const vector[c_bool]& validity)
+
+ Status SetArrayDate(const Slice& col_name, const vector[int32_t]& val,
+ const vector[c_bool]& validity)
+ Status SetArrayDate(int col_idx, const vector[int32_t]& val,
+ const vector[c_bool]& validity)
+
#----------------------------------------------------------------------
# Getters
diff --git a/python/kudu/schema.pyx b/python/kudu/schema.pyx
index 00394e2b4..bc98dfe32 100644
--- a/python/kudu/schema.pyx
+++ b/python/kudu/schema.pyx
@@ -51,6 +51,7 @@ DATE = KUDU_DATE
BINARY = KUDU_BINARY
DECIMAL = KUDU_DECIMAL
+NESTED = KUDU_NESTED
cdef dict _reverse_dict(d):
return dict((v, k) for k, v in d.items())
@@ -128,6 +129,7 @@ unixtime_micros = KuduType(KUDU_UNIXTIME_MICROS)
decimal = KuduType(KUDU_DECIMAL)
varchar = KuduType(KUDU_VARCHAR)
date = KuduType(KUDU_DATE)
+nested = KuduType(KUDU_NESTED)
cdef dict _type_names = {
@@ -143,7 +145,8 @@ cdef dict _type_names = {
UNIXTIME_MICROS: 'unixtime_micros',
DECIMAL: 'decimal',
VARCHAR: 'varchar',
- DATE: 'date'
+ DATE: 'date',
+ NESTED: 'nested'
}
@@ -162,7 +165,8 @@ cdef dict _type_to_obj = {
UNIXTIME_MICROS: unixtime_micros,
DECIMAL: decimal,
VARCHAR: varchar,
- DATE: date
+ DATE: date,
+ NESTED: nested
}
@@ -176,6 +180,104 @@ cdef KuduType to_data_type(object obj):
else:
raise ValueError('Invalid type: {0}'.format(obj))
+
+cdef class ArrayTypeDescriptor:
+ """
+ Descriptor for array data types.
+ Specifies the element type for array columns.
+ """
+ cdef:
+ KuduArrayTypeDescriptor* descriptor
+
+ def __cinit__(self, element_type):
+ """
+ Create an array type descriptor.
+
+ Parameters
+ ----------
+ element_type : KuduType or type constant
+ The type of elements in the array (e.g., kudu.int64, kudu.string)
+ """
+ cdef KuduType k_type = to_data_type(element_type)
+ self.descriptor = new KuduArrayTypeDescriptor(k_type.type)
+
+ def __dealloc__(self):
+ if self.descriptor != NULL:
+ del self.descriptor
+
+ property element_type:
+ def __get__(self):
+ """Get the array element type"""
+ return self.descriptor.type()
+
+ def __repr__(self):
+ return 'ArrayTypeDescriptor(element_type=%s)' %
_type_names.get(self.element_type, 'unknown')
+
+
+cdef class NestedTypeDescriptor:
+ """
+ Descriptor for nested (non-scalar) data types.
+ Currently only supports arrays.
+ """
+ cdef:
+ KuduNestedTypeDescriptor* descriptor
+
+ def __cinit__(self, array_descriptor):
+ """
+ Create a nested type descriptor from an array descriptor.
+
+ Parameters
+ ----------
+ array_descriptor : ArrayTypeDescriptor
+ The array type descriptor
+ """
+ if not isinstance(array_descriptor, ArrayTypeDescriptor):
+ raise TypeError("Expected ArrayTypeDescriptor, got %s" %
type(array_descriptor))
+
+ cdef ArrayTypeDescriptor arr_desc =
<ArrayTypeDescriptor>array_descriptor
+ self.descriptor = new
KuduNestedTypeDescriptor(deref(arr_desc.descriptor))
+
+ def __dealloc__(self):
+ if self.descriptor != NULL:
+ del self.descriptor
+
+ def is_array(self):
+ """Returns True if this is an array type"""
+ return self.descriptor.is_array()
+
+ def __repr__(self):
+ if self.is_array():
+ return 'NestedTypeDescriptor(type=array)'
+ return 'NestedTypeDescriptor(type=unknown)'
+
+
+def array_type(element_type):
+ """
+ Helper function to create an array type descriptor.
+
+ Parameters
+ ----------
+ element_type : KuduType or type constant
+ The type of elements in the array (e.g., kudu.int64, kudu.string)
+
+ Returns
+ -------
+ descriptor : NestedTypeDescriptor
+ A nested type descriptor for an array with the specified element type
+
+ Examples
+ --------
+ >>> import kudu
+ >>> # Create a table with an array of INT64 values
+ >>> builder = kudu.schema_builder()
+ >>> builder.add_column('key', kudu.int32, nullable=False).primary_key()
+ >>> builder.add_column('values').nested_type(kudu.array_type(kudu.int64))
+ >>> schema = builder.build()
+ """
+ array_desc = ArrayTypeDescriptor(element_type)
+ return NestedTypeDescriptor(array_desc)
+
+
cdef cppclass KuduColumnTypeAttributes:
KuduColumnTypeAttributes()
KuduColumnTypeAttributes(const KuduColumnTypeAttributes& other)
@@ -422,6 +524,35 @@ cdef class ColumnSpec:
self.spec.Length(length)
return self
+ def nested_type(self, type_descriptor):
+ """
+ Set nested type information for this column.
+
+ Use this method to create array columns. The type_descriptor should be
+ created using the array_type() helper function.
+
+ Parameters
+ ----------
+ type_descriptor : NestedTypeDescriptor
+ The nested type descriptor, typically created via array_type()
+
+ Returns
+ -------
+ self
+
+ Examples
+ --------
+ >>> import kudu
+ >>> builder = kudu.schema_builder()
+ >>>
builder.add_column('my_array').nested_type(kudu.array_type(kudu.int64))
+ """
+ if not isinstance(type_descriptor, NestedTypeDescriptor):
+ raise TypeError("Expected NestedTypeDescriptor, got %s" %
type(type_descriptor))
+
+ cdef NestedTypeDescriptor nested_desc =
<NestedTypeDescriptor>type_descriptor
+ self.spec.NestedType(deref(nested_desc.descriptor))
+ return self
+
def primary_key(self):
"""
Make this column a primary key. If you use this method, it will be the
diff --git a/python/kudu/tests/test_array_datatype.py
b/python/kudu/tests/test_array_datatype.py
new file mode 100644
index 000000000..31485808f
--- /dev/null
+++ b/python/kudu/tests/test_array_datatype.py
@@ -0,0 +1,315 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import kudu
+from kudu.compat import CompatUnitTest
+from kudu.tests.common import KuduTestBase
+from kudu.client import Partitioning
+import datetime
+from pytz import utc
+
+class TestArrayDataTypeIntegration(KuduTestBase, CompatUnitTest):
+
+ # All array types supported by Python client
+ SUPPORTED_ARRAY_TYPES = [
+ ('int8', kudu.int8),
+ ('int16', kudu.int16),
+ ('int32', kudu.int32),
+ ('int64', kudu.int64),
+ ('float', kudu.float_),
+ ('double', kudu.double),
+ ('bool', kudu.bool),
+ ('string', kudu.string),
+ ('binary', kudu.binary),
+ ('unixtime_micros', kudu.unixtime_micros),
+ ('date', kudu.date),
+ ]
+
+ # Types that require special parameters
+ SPECIAL_PARAM_TYPES = [
+ ('varchar', kudu.varchar, {'length': 50}),
+ # TODO: Decimal arrays out of scope for this patch
+ # - C++ API has overloaded methods (int32/int64) but no int128 version
+ # - Regular decimals use int128, creating API inconsistency
+ # - Will be addressed in future patch with proper int128 support
+ # ('decimal', kudu.decimal, {'precision': 8, 'scale': 2}),
+ ]
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestArrayDataTypeIntegration, cls).setUpClass()
+
+ builder = kudu.schema_builder()
+ builder.add_column('id', kudu.int32, nullable=False).primary_key()
+
+ # Add basic array types
+ for type_name, kudu_type in cls.SUPPORTED_ARRAY_TYPES:
+ col_name = 'arr_' + type_name
+
builder.add_column(col_name).nested_type(kudu.array_type(kudu_type))
+
+ # Add special parameter types
+ for type_name, kudu_type, params in cls.SPECIAL_PARAM_TYPES:
+ col_name = 'arr_' + type_name
+ col_spec =
builder.add_column(col_name).nested_type(kudu.array_type(kudu_type))
+ if 'length' in params:
+ col_spec.length(params['length'])
+ if 'precision' in params:
+ col_spec.precision(params['precision'])
+ if 'scale' in params:
+ col_spec.scale(params['scale'])
+
+ cls.array_schema = builder.build()
+ cls.partitioning = Partitioning().set_range_partition_columns(['id'])
+
+ def setUp(self):
+ self.table_name = 'array-datatype-test-table'
+
+ if self.client.table_exists(self.table_name):
+ self.client.delete_table(self.table_name)
+
+ self.client.create_table(self.table_name, self.array_schema,
self.partitioning)
+
+ def tearDown(self):
+ if self.client.table_exists(self.table_name):
+ self.client.delete_table(self.table_name)
+
+ def _get_test_data_for_all_types(self, num_values=2, include_null=True,
base_value=1):
+ """
+ Generate test data for all array types.
+ Args:
+ num_values: Number of non-NULL values to generate
+ include_null: Whether to include a NULL value at the end
+ base_value: Base value to use for generating data
+ """
+ from pytz import utc
+
+ values = []
+ for i in range(num_values):
+ values.append(base_value + i)
+ if include_null:
+ values.append(None)
+ return [
+ ('arr_int8', [v if v is None else int(v % 100) for v in values]),
+ ('arr_int16', [v if v is None else int(v * 10) for v in values]),
+ ('arr_int32', [v if v is None else int(v * 100) for v in values]),
+ ('arr_int64', [v if v is None else int(v * 1000) for v in values]),
+ ('arr_float', [v if v is None else float(v + 0.5) for v in
values]),
+ ('arr_double', [v if v is None else float(v + 0.1) for v in
values]),
+ ('arr_bool', [v if v is None else bool(v % 2) for v in values]),
+ ('arr_string', [v if v is None else 'text{0}'.format(v) for v in
values]),
+ ('arr_binary', [v if v is None else 'data{0}'.format(v).encode()
for v in values]),
+ ('arr_unixtime_micros', [v if v is None else
datetime.datetime(2020, 1, min(v, 28), tzinfo=utc) for v in values]),
+ ('arr_date', [v if v is None else datetime.date(2020, 1, min(v,
28)) for v in values]),
+ ('arr_varchar', [v if v is None else 'varchar{0}'.format(v) for v
in values]),
+ ]
+
+ def test_insert_all_array_types(self):
+ table = self.client.table(self.table_name)
+ session = self.client.new_session()
+
+ insert = table.new_insert()
+ insert['id'] = 1
+
+ test_data = self._get_test_data_for_all_types()
+ for col_name, data in test_data:
+ insert[col_name] = data
+
+ session.apply(insert)
+ session.flush()
+
+ scanner = table.scanner()
+ scanner.add_predicate(table['id'] == 1)
+ scanner.open()
+ tuples = scanner.read_all_tuples()
+
+ self.assertEqual(len(tuples), 1)
+ row = tuples[0]
+ self.assertEqual(row[0], 1)
+
+ for idx, (col_name, expected_data) in enumerate(test_data, start=1):
+ actual_data = row[idx]
+ self.assertEqual(actual_data, expected_data,
+ "Array {0} data mismatch".format(col_name))
+
+ def test_update_all_array_types(self):
+ table = self.client.table(self.table_name)
+ session = self.client.new_session()
+
+ insert = table.new_insert()
+ insert['id'] = 1
+ initial_data = self._get_test_data_for_all_types()
+ for col_name, data in initial_data:
+ insert[col_name] = data
+ session.apply(insert)
+ session.flush()
+
+ update = table.new_update()
+ update['id'] = 1
+
+ updated_data = self._get_test_data_for_all_types(num_values=2,
include_null=True, base_value=10)
+ for col_name, data in updated_data:
+ update[col_name] = data
+
+ session.apply(update)
+ session.flush()
+
+ scanner = table.scanner()
+ scanner.add_predicate(table['id'] == 1)
+ scanner.open()
+ tuples = scanner.read_all_tuples()
+
+ self.assertEqual(len(tuples), 1)
+ row = tuples[0]
+
+ for idx, (col_name, expected_data) in enumerate(updated_data, start=1):
+ actual_data = row[idx]
+ self.assertEqual(actual_data, expected_data,
+ "Updated array {0} data mismatch".format(col_name))
+
+ def test_upsert_all_array_types(self):
+ table = self.client.table(self.table_name)
+ session = self.client.new_session()
+
+ upsert = table.new_upsert()
+ upsert['id'] = 1
+
+ upsert_data = self._get_test_data_for_all_types()
+ for col_name, data in upsert_data:
+ upsert[col_name] = data
+
+ session.apply(upsert)
+ session.flush()
+
+ scanner = table.scanner()
+ scanner.add_predicate(table['id'] == 1)
+ scanner.open()
+ tuples = scanner.read_all_tuples()
+
+ self.assertEqual(len(tuples), 1)
+ row = tuples[0]
+
+ for idx, (col_name, expected_data) in enumerate(upsert_data, start=1):
+ actual_data = row[idx]
+ self.assertEqual(actual_data, expected_data,
+ "Upserted array {0} data mismatch".format(col_name))
+
+ def test_delete_with_arrays(self):
+ table = self.client.table(self.table_name)
+ session = self.client.new_session()
+
+ for row_id in [1, 2, 3]:
+ insert = table.new_insert()
+ insert['id'] = row_id
+ insert['arr_int64'] = [row_id * 10, row_id * 20]
+ insert['arr_string'] = ['row', str(row_id)]
+ insert['arr_double'] = [row_id * 1.1]
+ insert['arr_bool'] = [True]
+ remaining_types = ['int8', 'int16', 'int32', 'float', 'binary',
'unixtime_micros', 'date']
+ for type_name in remaining_types:
+ col_name = 'arr_' + type_name
+ insert[col_name] = []
+ insert['arr_varchar'] = []
+ # TODO: Add decimal arrays once Cython overloading issue is
resolved
+ # insert['arr_decimal'] = []
+ session.apply(insert)
+ session.flush()
+
+ scanner = table.scanner()
+ scanner.open()
+ all_tuples = scanner.read_all_tuples()
+ self.assertEqual(len(all_tuples), 3)
+
+ delete = table.new_delete()
+ delete['id'] = 2
+ session.apply(delete)
+ session.flush()
+
+ scanner = table.scanner()
+ scanner.open()
+ remaining_tuples = scanner.read_all_tuples()
+ self.assertEqual(len(remaining_tuples), 2)
+
+ remaining_ids = [row[0] for row in remaining_tuples]
+ self.assertIn(1, remaining_ids)
+ self.assertIn(3, remaining_ids)
+ self.assertNotIn(2, remaining_ids)
+
+ def test_insert_and_scan_empty_arrays_all_types(self):
+ table = self.client.table(self.table_name)
+ session = self.client.new_session()
+
+ insert = table.new_insert()
+ insert['id'] = 1
+
+ for type_name, kudu_type in self.SUPPORTED_ARRAY_TYPES:
+ col_name = 'arr_' + type_name
+ insert[col_name] = []
+ insert['arr_varchar'] = []
+
+ session.apply(insert)
+ session.flush()
+
+ scanner = table.scanner()
+ scanner.add_predicate(table['id'] == 1)
+ scanner.open()
+ tuples = scanner.read_all_tuples()
+
+ self.assertEqual(len(tuples), 1)
+ row = tuples[0]
+ self.assertEqual(row[0], 1) # id
+
+ for idx, (type_name, kudu_type) in
enumerate(self.SUPPORTED_ARRAY_TYPES, start=1):
+ if type_name != 'bool':
+ self.assertEqual(row[idx], [], "arr_{0} should be
empty".format(type_name))
+
+ special_start_idx = len(self.SUPPORTED_ARRAY_TYPES) + 1
+ for offset, (type_name, kudu_type, params) in
enumerate(self.SPECIAL_PARAM_TYPES):
+ col_idx = special_start_idx + offset
+ self.assertEqual(row[col_idx], [], "arr_{0} should be
empty".format(type_name))
+
+ def test_scan_multiple_rows_all_array_types(self):
+ table = self.client.table(self.table_name)
+ session = self.client.new_session()
+
+ test_rows = {
+ 1: self._get_test_data_for_all_types(num_values=2,
include_null=True, base_value=1),
+ 2: self._get_test_data_for_all_types(num_values=1,
include_null=False, base_value=200),
+ 3: self._get_test_data_for_all_types(num_values=3,
include_null=False, base_value=50),
+ }
+
+ for row_id, test_data in test_rows.items():
+ insert = table.new_insert()
+ insert['id'] = row_id
+ for col_name, data in test_data:
+ insert[col_name] = data
+ session.apply(insert)
+
+ session.flush()
+
+ scanner = table.scanner()
+ scanner.open()
+ all_tuples = scanner.read_all_tuples()
+ self.assertEqual(len(all_tuples), 3)
+
+ for row in all_tuples:
+ row_id = row[0]
+ self.assertIn(row_id, test_rows, "Unexpected row ID:
{0}".format(row_id))
+ expected_data = test_rows[row_id]
+ for idx, (col_name, expected_array) in enumerate(expected_data,
start=1):
+ actual_array = row[idx]
+ self.assertEqual(actual_array, expected_array,
+ "Row {0} {1} mismatch".format(row_id, col_name))
diff --git a/python/kudu/tests/test_schema.py b/python/kudu/tests/test_schema.py
index 2f010a982..6204da72c 100644
--- a/python/kudu/tests/test_schema.py
+++ b/python/kudu/tests/test_schema.py
@@ -23,6 +23,7 @@ from kudu.errors import KuduInvalidArgument
import kudu
from kudu.schema import Schema
+import datetime
class TestSchema(CompatUnitTest):
@@ -546,3 +547,155 @@ class TestSchema(CompatUnitTest):
result = repr(self.schema[0])
expected = 'ColumnSchema(name=one, type=int32, nullable=False)'
self.assertEqual(result, expected)
+
+
+class TestArrayDataTypeSchema(CompatUnitTest):
+
+ SUPPORTED_ARRAY_TYPES = [
+ ('int8', kudu.int8),
+ ('int16', kudu.int16),
+ ('int32', kudu.int32),
+ ('int64', kudu.int64),
+ ('float', kudu.float_),
+ ('double', kudu.double),
+ ('bool', kudu.bool),
+ ('string', kudu.string),
+ ('binary', kudu.binary),
+ ('unixtime_micros', kudu.unixtime_micros),
+ ('date', kudu.date),
+ ]
+
+ SPECIAL_PARAM_TYPES = [
+ ('varchar', kudu.varchar, {'length': 50})
+ ]
+
+ def test_array_type_descriptors_all_types(self):
+ for type_name, kudu_type in self.SUPPORTED_ARRAY_TYPES:
+ arr = kudu.array_type(kudu_type)
+ self.assertIsNotNone(arr, "Failed to create array type for
{0}".format(type_name))
+ self.assertTrue(arr.is_array(), "Array type {0} not
recognized".format(type_name))
+ self.assertEqual(str(arr), 'NestedTypeDescriptor(type=array)')
+
+ for type_name, kudu_type, params in self.SPECIAL_PARAM_TYPES:
+ arr = kudu.array_type(kudu_type)
+ self.assertIsNotNone(arr, "Failed to create array type for
{0}".format(type_name))
+ self.assertTrue(arr.is_array(), "Array type {0} not
recognized".format(type_name))
+ self.assertEqual(str(arr), 'NestedTypeDescriptor(type=array)')
+
+ def test_comprehensive_array_schema(self):
+ builder = kudu.schema_builder()
+
+ builder.add_column('id', kudu.int32, nullable=False).primary_key()
+ builder.add_column('name', kudu.string)
+ builder.add_column('age', kudu.int32)
+
+ for type_name, kudu_type in self.SUPPORTED_ARRAY_TYPES:
+ col_name = 'arr_' + type_name
+
builder.add_column(col_name).nested_type(kudu.array_type(kudu_type))
+
+ for type_name, kudu_type, params in self.SPECIAL_PARAM_TYPES:
+ col_name = 'arr_' + type_name
+ col_spec =
builder.add_column(col_name).nested_type(kudu.array_type(kudu_type))
+ if 'length' in params:
+ col_spec.length(params['length'])
+ if 'precision' in params:
+ col_spec.precision(params['precision'])
+ if 'scale' in params:
+ col_spec.scale(params['scale'])
+
+ schema = builder.build()
+
+ # Verify schema structure
+ # 3 scalar + 11 basic arrays + 1 special array (varchar)
+ self.assertEqual(len(schema), 15)
+
+ self.assertEqual(schema[0].name, 'id')
+ self.assertEqual(schema[0].type.name, 'int32')
+ self.assertFalse(schema[0].nullable) # Primary key
+
+ self.assertEqual(schema[1].name, 'name')
+ self.assertEqual(schema[1].type.name, 'string')
+
+ self.assertEqual(schema[2].name, 'age')
+ self.assertEqual(schema[2].type.name, 'int32')
+
+ # Verify all array columns have nested type and are nullable by default
+ # Start at index 3 (after scalars)
+ for idx, (type_name, kudu_type) in
enumerate(self.SUPPORTED_ARRAY_TYPES, start=3):
+ col = schema[idx]
+ expected_name = 'arr_' + type_name
+ self.assertEqual(col.name, expected_name)
+ self.assertEqual(col.type.name, 'nested')
+ # Arrays nullable by default
+ self.assertTrue(col.nullable)
+
+ # Verify special parameter types
+ varchar_col = schema[14]
+ self.assertEqual(varchar_col.name, 'arr_varchar')
+ self.assertEqual(varchar_col.type.name, 'nested')
+
+ def test_array_schema_introspection_and_writing(self):
+ builder = kudu.schema_builder()
+ builder.add_column('key', kudu.int32, nullable=False).primary_key()
+
+ for type_name, kudu_type in self.SUPPORTED_ARRAY_TYPES:
+ col_name = 'arr_' + type_name
+
builder.add_column(col_name).nested_type(kudu.array_type(kudu_type))
+
+ for type_name, kudu_type, params in self.SPECIAL_PARAM_TYPES:
+ col_name = 'arr_' + type_name
+ col_spec =
builder.add_column(col_name).nested_type(kudu.array_type(kudu_type))
+ if 'length' in params:
+ col_spec.length(params['length'])
+ if 'precision' in params:
+ col_spec.precision(params['precision'])
+ if 'scale' in params:
+ col_spec.scale(params['scale'])
+
+ schema = builder.build()
+
+ row = schema.new_row()
+ row['key'] = 1
+
+ test_data = [
+ ('arr_int8', [1, 2, None]),
+ ('arr_int16', [10, 20, None]),
+ ('arr_int32', [100, 200, None]),
+ ('arr_int64', [1000, 2000, None]),
+ ('arr_float', [1.5, 2.5, None]),
+ ('arr_double', [1.1, 2.2, None]),
+ ('arr_bool', [True, False, None]),
+ ('arr_string', ['hello', 'world', None]),
+ ('arr_binary', [b'data1', b'data2', None]),
+ ('arr_unixtime_micros', [datetime.datetime(2020, 1, 1),
datetime.datetime(2020, 1, 2), None]),
+ ('arr_date', [datetime.date(2020, 1, 1), datetime.date(2020, 1,
2), None]),
+ ('arr_varchar', ['short', 'text', None]),
+ ]
+
+ for col_name, data in test_data:
+ row[col_name] = data
+
+ self.assertIsNotNone(row)
+
+ row2 = schema.new_row()
+ row2['key'] = 2
+
+ empty_test_data = [
+ ('arr_int8', []),
+ ('arr_int16', []),
+ ('arr_int32', []),
+ ('arr_int64', []),
+ ('arr_float', []),
+ ('arr_double', []),
+ ('arr_bool', []),
+ ('arr_string', []),
+ ('arr_binary', []),
+ ('arr_unixtime_micros', []),
+ ('arr_date', []),
+ ('arr_varchar', []),
+ ]
+
+ for col_name, data in empty_test_data:
+ row2[col_name] = data
+
+ self.assertIsNotNone(row2)