leekeiabstraction commented on code in PR #474:
URL: https://github.com/apache/fluss-rust/pull/474#discussion_r3037118618
##########
bindings/python/test/test_log_table.py:
##########
@@ -755,3 +754,244 @@ def _poll_arrow_ids(scanner, expected_count,
timeout_s=10):
if arrow_table.num_rows > 0:
all_ids.extend(arrow_table.column("id").to_pylist())
return all_ids
+
+
+async def test_append_and_scan_with_array(connection, admin):
+ """Test appending and scanning with array columns."""
+ table_path = fluss.TablePath("fluss", "py_test_append_and_scan_with_array")
+ await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+ pa_schema = pa.schema(
+ [
+ pa.field("id", pa.int32()),
+ pa.field("tags", pa.list_(pa.string())),
+ pa.field("scores", pa.list_(pa.int32())),
+ ]
+ )
+ schema = fluss.Schema(pa_schema)
+ table_descriptor = fluss.TableDescriptor(schema)
+ await admin.create_table(table_path, table_descriptor,
ignore_if_exists=False)
+
+ table = await connection.get_table(table_path)
+ append_writer = table.new_append().create_writer()
+
+ # Batch 1: Testing both standard and large lists
+ batch1 = pa.RecordBatch.from_arrays(
+ [
+ pa.array([1, 2], type=pa.int32()),
+ pa.array([["a", "b"], ["c"]], type=pa.list_(pa.string())),
+ pa.array([[10, 20], [30]], type=pa.list_(pa.int32())),
+ ],
+ schema=pa_schema,
+ )
+ append_writer.write_arrow_batch(batch1)
+
+ # Batch 2: Testing null values inside arrays and null arrays
+ batch2 = pa.RecordBatch.from_arrays(
+ [
+ pa.array([3, 4, 5, 6], type=pa.int32()),
+ pa.array([["d", None], None, [], [None]],
type=pa.list_(pa.string())),
+ pa.array([[40, 50], [60], None, []], type=pa.list_(pa.int32())),
+ ],
+ schema=pa_schema,
+ )
+ append_writer.write_arrow_batch(batch2)
+ await append_writer.flush()
+
+ # Verify via LogScanner (record-by-record)
+ scanner = await table.new_scan().create_log_scanner()
+ scanner.subscribe_buckets({0: fluss.EARLIEST_OFFSET})
+ records = _poll_records(scanner, expected_count=6)
+
+ assert len(records) == 6
+ records.sort(key=lambda r: r.row["id"])
+
+ # Verify Batch 1
+ assert records[0].row["tags"] == ["a", "b"]
+ assert records[0].row["scores"] == [10, 20]
+ assert records[1].row["tags"] == ["c"]
+ assert records[1].row["scores"] == [30]
+
+ # Verify Batch 2
+ assert records[2].row["tags"] == ["d", None]
+ assert records[2].row["scores"] == [40, 50]
+ assert records[3].row["tags"] is None
+ assert records[3].row["scores"] == [60]
+ assert records[4].row["tags"] == []
+ assert records[4].row["scores"] is None
+ assert records[5].row["tags"] == [None]
+ assert records[5].row["scores"] == []
+
+ # Verify via to_arrow (batch-based)
+ scanner2 = await table.new_scan().create_record_batch_log_scanner()
+ scanner2.subscribe_buckets({0: fluss.EARLIEST_OFFSET})
+ result_table = scanner2.to_arrow()
+
+ assert result_table.num_rows == 6
+ assert result_table.column("tags").to_pylist() == [
+ ["a", "b"],
+ ["c"],
+ ["d", None],
+ None,
+ [],
+ [None],
+ ]
+ assert result_table.column("scores").to_pylist() == [
+ [10, 20],
+ [30],
+ [40, 50],
+ [60],
+ None,
+ [],
+ ]
+
+
[email protected](reason="Server currently only accepts ListVector.
FixedSizeList causes IPC mismatch until server supports it.")
Review Comment:
Do we need to have the same for LargeList? Or test (skipped) both LargeList
and FixedSizeList in here
##########
crates/fluss/src/row/column_writer.rs:
##########
@@ -550,15 +576,507 @@ impl ColumnWriter {
)?);
Ok(())
}
+ TypedWriter::List {
+ element_type,
+ builder,
+ } => {
+ let array = row.get_array(pos)?;
+ let values_builder = builder.values();
+ for i in 0..array.size() {
+ append_element_to_builder(values_builder, &array, i,
element_type)?;
+ }
+ builder.append(true);
+ Ok(())
+ }
+ }
+ }
+}
+
+fn create_builder(
+ fluss_type: &DataType,
+ arrow_type: &ArrowDataType,
+ capacity: usize,
+) -> Result<Box<dyn ArrayBuilder>> {
+ match fluss_type {
+ DataType::Boolean(_) =>
Ok(Box::new(BooleanBuilder::with_capacity(capacity))),
+ DataType::TinyInt(_) =>
Ok(Box::new(Int8Builder::with_capacity(capacity))),
+ DataType::SmallInt(_) =>
Ok(Box::new(Int16Builder::with_capacity(capacity))),
+ DataType::Int(_) =>
Ok(Box::new(Int32Builder::with_capacity(capacity))),
+ DataType::BigInt(_) =>
Ok(Box::new(Int64Builder::with_capacity(capacity))),
+ DataType::Float(_) =>
Ok(Box::new(Float32Builder::with_capacity(capacity))),
+ DataType::Double(_) =>
Ok(Box::new(Float64Builder::with_capacity(capacity))),
+ DataType::Char(_) | DataType::String(_) =>
Ok(Box::new(StringBuilder::with_capacity(
+ capacity,
+ capacity.saturating_mul(VARIABLE_WIDTH_AVG_BYTES),
+ ))),
+ DataType::Bytes(_) => Ok(Box::new(BinaryBuilder::with_capacity(
+ capacity,
+ capacity.saturating_mul(VARIABLE_WIDTH_AVG_BYTES),
+ ))),
+ DataType::Binary(t) => {
+ let arrow_len: i32 = t.length().try_into().map_err(|_|
Error::IllegalArgument {
+ message: format!(
+ "Binary length {} exceeds Arrow's maximum (i32::MAX)",
+ t.length()
+ ),
+ })?;
+ Ok(Box::new(FixedSizeBinaryBuilder::with_capacity(
+ capacity, arrow_len,
+ )))
+ }
+ DataType::Decimal(_) => {
+ let (p, s) = match arrow_type {
+ ArrowDataType::Decimal128(p, s) => (*p, *s),
+ _ => {
+ return Err(Error::IllegalArgument {
+ message: format!(
+ "Expected Decimal128 Arrow type for Decimal, got:
{arrow_type:?}"
+ ),
+ });
+ }
+ };
+ let builder = Decimal128Builder::with_capacity(capacity)
+ .with_precision_and_scale(p, s)
+ .map_err(|e| Error::IllegalArgument {
+ message: format!("Invalid decimal precision {p} or scale
{s}: {e}"),
+ })?;
+ Ok(Box::new(builder))
+ }
+ DataType::Date(_) =>
Ok(Box::new(Date32Builder::with_capacity(capacity))),
+ DataType::Time(_) => match arrow_type {
+ ArrowDataType::Time32(arrow_schema::TimeUnit::Second) => {
+ Ok(Box::new(Time32SecondBuilder::with_capacity(capacity)))
+ }
+ ArrowDataType::Time32(arrow_schema::TimeUnit::Millisecond) => {
+ Ok(Box::new(Time32MillisecondBuilder::with_capacity(capacity)))
+ }
+ ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
+ Ok(Box::new(Time64MicrosecondBuilder::with_capacity(capacity)))
+ }
+ ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond) => {
+ Ok(Box::new(Time64NanosecondBuilder::with_capacity(capacity)))
+ }
+ _ => Err(Error::IllegalArgument {
+ message: format!("Unsupported Arrow type for Time:
{arrow_type:?}"),
+ }),
+ },
+ DataType::Timestamp(_) => match arrow_type {
+ ArrowDataType::Timestamp(arrow_schema::TimeUnit::Second, _) => {
+ Ok(Box::new(TimestampSecondBuilder::with_capacity(capacity)))
+ }
+ ArrowDataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _)
=> Ok(Box::new(
+ TimestampMillisecondBuilder::with_capacity(capacity),
+ )),
+ ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _)
=> Ok(Box::new(
+ TimestampMicrosecondBuilder::with_capacity(capacity),
+ )),
+ ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) =>
Ok(Box::new(
+ TimestampNanosecondBuilder::with_capacity(capacity),
+ )),
+ _ => Err(Error::IllegalArgument {
+ message: format!("Unsupported Arrow type for Timestamp:
{arrow_type:?}"),
+ }),
+ },
+ DataType::TimestampLTz(_) => match arrow_type {
+ ArrowDataType::Timestamp(arrow_schema::TimeUnit::Second, _) => {
+ Ok(Box::new(TimestampSecondBuilder::with_capacity(capacity)))
+ }
+ ArrowDataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _)
=> Ok(Box::new(
+ TimestampMillisecondBuilder::with_capacity(capacity),
+ )),
+ ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _)
=> Ok(Box::new(
+ TimestampMicrosecondBuilder::with_capacity(capacity),
+ )),
+ ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) =>
Ok(Box::new(
+ TimestampNanosecondBuilder::with_capacity(capacity),
+ )),
+ _ => Err(Error::IllegalArgument {
+ message: format!("Unsupported Arrow type for TimestampLTz:
{arrow_type:?}"),
+ }),
+ },
+ DataType::Array(array_type) => {
+ let element_type = array_type.get_element_type();
+ let arrow_element_type = match arrow_type {
+ ArrowDataType::List(field) | ArrowDataType::LargeList(field)
=> field.data_type(),
Review Comment:
Curious, why only match against List / LargeList but not FixedSizeList?
If I understand correctly, there's no code path that would reach LargeList /
FixedSizeList here so matching against LargeList is defensive but inconsistent
in that we do not match against FixedSize
##########
crates/fluss/tests/integration/log_table.rs:
##########
@@ -738,18 +738,18 @@ mod table_test {
row.set_field(14, col_time_ns);
row.set_field(15, col_timestamp_s);
row.set_field(16, col_timestamp_ms);
- row.set_field(17, col_timestamp_us.clone());
- row.set_field(18, col_timestamp_ns.clone());
+ row.set_field(17, col_timestamp_us);
Review Comment:
These seems unrelated to Array data type support change?
##########
bindings/python/test/test_schema.py:
##########
@@ -35,3 +35,38 @@ def test_get_primary_keys():
assert schema_without_pk.get_primary_keys() == []
+def test_schema_with_array():
Review Comment:
Do we need to update documentation as well on Array data type support?
##########
crates/fluss/tests/integration/kv_table.rs:
##########
@@ -632,8 +632,8 @@ mod kv_table_test {
let col_smallint = 32767i16;
let col_int = 2147483647i32;
let col_bigint = 9223372036854775807i64;
- let col_float = 3.14f32;
- let col_double = 2.718281828459045f64;
+ let col_float = std::f32::consts::PI;
Review Comment:
These seems unrelated to Array data type support change?
##########
crates/fluss/src/row/column_writer.rs:
##########
@@ -321,6 +327,26 @@ impl ColumnWriter {
}
}
}
+ DataType::Array(array_type) => {
+ let element_type = array_type.get_element_type();
+ let arrow_element_type = match arrow_type {
+ ArrowDataType::List(field) |
ArrowDataType::LargeList(field) => {
Review Comment:
Curious, why only match against List / LargeList but not FixedSizeList?
If I understand correctly, there's no code path that would reach LargeList /
FixedSizeList here so matching against LargeList is defensive but inconsistent
in that we do not match against FixedSize
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]