liurenjie1024 commented on code in PR #1824:
URL: https://github.com/apache/iceberg-rust/pull/1824#discussion_r2584562956
##########
crates/iceberg/src/arrow/record_batch_transformer.rs:
##########
@@ -539,86 +619,224 @@ impl RecordBatchTransformer {
prim_lit: &Option<PrimitiveLiteral>,
num_rows: usize,
) -> Result<ArrayRef> {
- Ok(match (target_type, prim_lit) {
- (DataType::Boolean, Some(PrimitiveLiteral::Boolean(value))) => {
- Arc::new(BooleanArray::from(vec![*value; num_rows]))
- }
- (DataType::Boolean, None) => {
- let vals: Vec<Option<bool>> = vec![None; num_rows];
- Arc::new(BooleanArray::from(vals))
- }
- (DataType::Int32, Some(PrimitiveLiteral::Int(value))) => {
- Arc::new(Int32Array::from(vec![*value; num_rows]))
- }
- (DataType::Int32, None) => {
- let vals: Vec<Option<i32>> = vec![None; num_rows];
- Arc::new(Int32Array::from(vals))
- }
- (DataType::Date32, Some(PrimitiveLiteral::Int(value))) => {
- Arc::new(Date32Array::from(vec![*value; num_rows]))
- }
- (DataType::Date32, None) => {
- let vals: Vec<Option<i32>> = vec![None; num_rows];
- Arc::new(Date32Array::from(vals))
- }
- (DataType::Int64, Some(PrimitiveLiteral::Long(value))) => {
- Arc::new(Int64Array::from(vec![*value; num_rows]))
- }
- (DataType::Int64, None) => {
- let vals: Vec<Option<i64>> = vec![None; num_rows];
- Arc::new(Int64Array::from(vals))
- }
- (DataType::Float32, Some(PrimitiveLiteral::Float(value))) => {
- Arc::new(Float32Array::from(vec![value.0; num_rows]))
- }
- (DataType::Float32, None) => {
- let vals: Vec<Option<f32>> = vec![None; num_rows];
- Arc::new(Float32Array::from(vals))
- }
- (DataType::Float64, Some(PrimitiveLiteral::Double(value))) => {
- Arc::new(Float64Array::from(vec![value.0; num_rows]))
- }
- (DataType::Float64, None) => {
- let vals: Vec<Option<f64>> = vec![None; num_rows];
- Arc::new(Float64Array::from(vals))
- }
- (DataType::Utf8, Some(PrimitiveLiteral::String(value))) => {
- Arc::new(StringArray::from(vec![value.clone(); num_rows]))
- }
- (DataType::Utf8, None) => {
- let vals: Vec<Option<String>> = vec![None; num_rows];
- Arc::new(StringArray::from(vals))
- }
- (DataType::Binary, Some(PrimitiveLiteral::Binary(value))) => {
- Arc::new(BinaryArray::from_vec(vec![value; num_rows]))
- }
- (DataType::Binary, None) => {
- let vals: Vec<Option<&[u8]>> = vec![None; num_rows];
- Arc::new(BinaryArray::from_opt_vec(vals))
- }
- (DataType::Struct(fields), None) => {
- // Create a StructArray filled with nulls. Per Iceberg spec,
optional struct fields
- // default to null when added to the schema. We defer non-null
default struct values
- // and leave them as not implemented yet.
- let null_arrays: Vec<ArrayRef> = fields
- .iter()
- .map(|field| Self::create_column(field.data_type(), &None,
num_rows))
- .collect::<Result<Vec<_>>>()?;
-
- Arc::new(StructArray::new(
- fields.clone(),
- null_arrays,
- Some(NullBuffer::new_null(num_rows)),
+ // Check if this is a RunEndEncoded type (for constant fields)
+ if let DataType::RunEndEncoded(_, values_field) = target_type {
+ // Helper to create a Run-End Encoded array
+ let create_ree_array = |values_array: ArrayRef| ->
Result<ArrayRef> {
+ let run_ends = if num_rows == 0 {
+ Int32Array::from(Vec::<i32>::new())
+ } else {
+ Int32Array::from(vec![num_rows as i32])
+ };
+ Ok(Arc::new(
+ RunArray::try_new(&run_ends, &values_array).map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ "Failed to create RunArray for constant value",
+ )
+ .with_source(e)
+ })?,
))
- }
- (DataType::Null, _) => Arc::new(NullArray::new(num_rows)),
- (dt, _) => {
- return Err(Error::new(
- ErrorKind::Unexpected,
- format!("unexpected target column type {}", dt),
- ));
- }
- })
+ };
+
+ // Create the values array based on the literal value
+ let values_array: ArrayRef = match (values_field.data_type(),
prim_lit) {
+ (DataType::Boolean, Some(PrimitiveLiteral::Boolean(v))) => {
Review Comment:
nit: Do we need to duplicate so much with another branch? Can we move this
to the value.rs under arrow module?
##########
crates/iceberg/src/arrow/schema.rs:
##########
@@ -1019,6 +1019,57 @@ impl TryFrom<&crate::spec::Schema> for ArrowSchema {
}
}
+/// Converts a Datum (Iceberg type + primitive literal) to its corresponding
Arrow DataType
+/// with Run-End Encoding (REE).
+///
+/// This function is used for constant fields in record batches, where all
values are the same.
+/// Run-End Encoding provides efficient storage for such constant columns.
+///
+/// # Arguments
+/// * `datum` - The Datum to convert, which contains both type and value
information
+///
+/// # Returns
+/// Arrow DataType with Run-End Encoding applied
+///
+/// # Example
+/// ```ignore
Review Comment:
I think it's safe to run this test?
##########
crates/iceberg/src/arrow/record_batch_transformer.rs:
##########
@@ -53,20 +54,59 @@ use crate::{Error, ErrorKind, Result};
fn constants_map(
partition_spec: &PartitionSpec,
partition_data: &Struct,
-) -> HashMap<i32, PrimitiveLiteral> {
+ schema: &IcebergSchema,
+) -> Result<HashMap<i32, Datum>> {
let mut constants = HashMap::new();
for (pos, field) in partition_spec.fields().iter().enumerate() {
// Only identity transforms should use constant values from partition
metadata
if matches!(field.transform, Transform::Identity) {
+ // Get the field from schema to extract its type
+ let iceberg_field =
schema.field_by_id(field.source_id).ok_or(Error::new(
+ ErrorKind::Unexpected,
+ format!("Field {} not found in schema", field.source_id),
+ ))?;
+
+ // Ensure the field type is primitive
+ let prim_type = match &*iceberg_field.field_type {
+ crate::spec::Type::Primitive(prim_type) => prim_type,
+ _ => {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ format!(
+ "Partition field {} has non-primitive type {:?}",
+ field.source_id, iceberg_field.field_type
+ ),
+ ));
+ }
+ };
+
// Get the partition value for this field
- if let Some(Literal::Primitive(value)) = &partition_data[pos] {
- constants.insert(field.source_id, value.clone());
+ // Handle both None (null) and Some(Literal::Primitive) cases
+ match &partition_data[pos] {
+ None => {
Review Comment:
It seems we need to change `Datum` to support null value. We don't need to
do it now, please file an issue to track it and add a TODO here.
##########
crates/iceberg/src/arrow/record_batch_transformer.rs:
##########
@@ -311,22 +359,48 @@ impl RecordBatchTransformer {
let fields: Result<Vec<_>> = projected_iceberg_field_ids
.iter()
.map(|field_id| {
- Ok(field_id_to_mapped_schema_map
- .get(field_id)
- .ok_or(Error::new(ErrorKind::Unexpected, "field not
found"))?
- .0
- .clone())
+ // Check if this is a constant field
+ if constant_fields.contains_key(field_id) {
+ // For metadata/virtual fields (like _file), get name from
metadata_columns
+ // For partition fields, get name from schema (they exist
in schema)
+ if let Ok(iceberg_field) = get_metadata_field(*field_id) {
+ // This is a metadata/virtual field - convert Iceberg
field to Arrow
+ let arrow_type =
+
datum_to_arrow_type_with_ree(constant_fields.get(field_id).unwrap());
+ let arrow_field =
+ Field::new(&iceberg_field.name, arrow_type,
!iceberg_field.required)
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ iceberg_field.id.to_string(),
+ )]));
+ Ok(Arc::new(arrow_field))
+ } else {
+ // This is a partition constant field (exists in
schema but uses constant value)
+ let field = &field_id_to_mapped_schema_map
+ .get(field_id)
+ .ok_or(Error::new(ErrorKind::Unexpected, "field
not found"))?
+ .0;
+ let datum = constant_fields.get(field_id).unwrap();
Review Comment:
nit: To aovid unwrap here, we could use `constant_fields.get().map()`
##########
crates/iceberg/src/arrow/record_batch_transformer.rs:
##########
@@ -53,20 +54,59 @@ use crate::{Error, ErrorKind, Result};
fn constants_map(
partition_spec: &PartitionSpec,
partition_data: &Struct,
-) -> HashMap<i32, PrimitiveLiteral> {
+ schema: &IcebergSchema,
+) -> Result<HashMap<i32, Datum>> {
let mut constants = HashMap::new();
for (pos, field) in partition_spec.fields().iter().enumerate() {
// Only identity transforms should use constant values from partition
metadata
if matches!(field.transform, Transform::Identity) {
+ // Get the field from schema to extract its type
+ let iceberg_field =
schema.field_by_id(field.source_id).ok_or(Error::new(
+ ErrorKind::Unexpected,
+ format!("Field {} not found in schema", field.source_id),
+ ))?;
+
+ // Ensure the field type is primitive
+ let prim_type = match &*iceberg_field.field_type {
+ crate::spec::Type::Primitive(prim_type) => prim_type,
+ _ => {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ format!(
+ "Partition field {} has non-primitive type {:?}",
+ field.source_id, iceberg_field.field_type
+ ),
+ ));
+ }
+ };
+
// Get the partition value for this field
- if let Some(Literal::Primitive(value)) = &partition_data[pos] {
- constants.insert(field.source_id, value.clone());
+ // Handle both None (null) and Some(Literal::Primitive) cases
+ match &partition_data[pos] {
+ None => {
Review Comment:
I think this incorrect, in this case we should return `None`, and other case
we should return `Some(Datum)`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]