liurenjie1024 commented on code in PR #1824:
URL: https://github.com/apache/iceberg-rust/pull/1824#discussion_r2584562956


##########
crates/iceberg/src/arrow/record_batch_transformer.rs:
##########
@@ -539,86 +619,224 @@ impl RecordBatchTransformer {
         prim_lit: &Option<PrimitiveLiteral>,
         num_rows: usize,
     ) -> Result<ArrayRef> {
-        Ok(match (target_type, prim_lit) {
-            (DataType::Boolean, Some(PrimitiveLiteral::Boolean(value))) => {
-                Arc::new(BooleanArray::from(vec![*value; num_rows]))
-            }
-            (DataType::Boolean, None) => {
-                let vals: Vec<Option<bool>> = vec![None; num_rows];
-                Arc::new(BooleanArray::from(vals))
-            }
-            (DataType::Int32, Some(PrimitiveLiteral::Int(value))) => {
-                Arc::new(Int32Array::from(vec![*value; num_rows]))
-            }
-            (DataType::Int32, None) => {
-                let vals: Vec<Option<i32>> = vec![None; num_rows];
-                Arc::new(Int32Array::from(vals))
-            }
-            (DataType::Date32, Some(PrimitiveLiteral::Int(value))) => {
-                Arc::new(Date32Array::from(vec![*value; num_rows]))
-            }
-            (DataType::Date32, None) => {
-                let vals: Vec<Option<i32>> = vec![None; num_rows];
-                Arc::new(Date32Array::from(vals))
-            }
-            (DataType::Int64, Some(PrimitiveLiteral::Long(value))) => {
-                Arc::new(Int64Array::from(vec![*value; num_rows]))
-            }
-            (DataType::Int64, None) => {
-                let vals: Vec<Option<i64>> = vec![None; num_rows];
-                Arc::new(Int64Array::from(vals))
-            }
-            (DataType::Float32, Some(PrimitiveLiteral::Float(value))) => {
-                Arc::new(Float32Array::from(vec![value.0; num_rows]))
-            }
-            (DataType::Float32, None) => {
-                let vals: Vec<Option<f32>> = vec![None; num_rows];
-                Arc::new(Float32Array::from(vals))
-            }
-            (DataType::Float64, Some(PrimitiveLiteral::Double(value))) => {
-                Arc::new(Float64Array::from(vec![value.0; num_rows]))
-            }
-            (DataType::Float64, None) => {
-                let vals: Vec<Option<f64>> = vec![None; num_rows];
-                Arc::new(Float64Array::from(vals))
-            }
-            (DataType::Utf8, Some(PrimitiveLiteral::String(value))) => {
-                Arc::new(StringArray::from(vec![value.clone(); num_rows]))
-            }
-            (DataType::Utf8, None) => {
-                let vals: Vec<Option<String>> = vec![None; num_rows];
-                Arc::new(StringArray::from(vals))
-            }
-            (DataType::Binary, Some(PrimitiveLiteral::Binary(value))) => {
-                Arc::new(BinaryArray::from_vec(vec![value; num_rows]))
-            }
-            (DataType::Binary, None) => {
-                let vals: Vec<Option<&[u8]>> = vec![None; num_rows];
-                Arc::new(BinaryArray::from_opt_vec(vals))
-            }
-            (DataType::Struct(fields), None) => {
-                // Create a StructArray filled with nulls. Per Iceberg spec, 
optional struct fields
-                // default to null when added to the schema. We defer non-null 
default struct values
-                // and leave them as not implemented yet.
-                let null_arrays: Vec<ArrayRef> = fields
-                    .iter()
-                    .map(|field| Self::create_column(field.data_type(), &None, 
num_rows))
-                    .collect::<Result<Vec<_>>>()?;
-
-                Arc::new(StructArray::new(
-                    fields.clone(),
-                    null_arrays,
-                    Some(NullBuffer::new_null(num_rows)),
+        // Check if this is a RunEndEncoded type (for constant fields)
+        if let DataType::RunEndEncoded(_, values_field) = target_type {
+            // Helper to create a Run-End Encoded array
+            let create_ree_array = |values_array: ArrayRef| -> 
Result<ArrayRef> {
+                let run_ends = if num_rows == 0 {
+                    Int32Array::from(Vec::<i32>::new())
+                } else {
+                    Int32Array::from(vec![num_rows as i32])
+                };
+                Ok(Arc::new(
+                    RunArray::try_new(&run_ends, &values_array).map_err(|e| {
+                        Error::new(
+                            ErrorKind::Unexpected,
+                            "Failed to create RunArray for constant value",
+                        )
+                        .with_source(e)
+                    })?,
                 ))
-            }
-            (DataType::Null, _) => Arc::new(NullArray::new(num_rows)),
-            (dt, _) => {
-                return Err(Error::new(
-                    ErrorKind::Unexpected,
-                    format!("unexpected target column type {}", dt),
-                ));
-            }
-        })
+            };
+
+            // Create the values array based on the literal value
+            let values_array: ArrayRef = match (values_field.data_type(), 
prim_lit) {
+                (DataType::Boolean, Some(PrimitiveLiteral::Boolean(v))) => {

Review Comment:
   nit: Do we need to duplicate so much with another branch?  Can we move this 
to the value.rs under arrow module?



##########
crates/iceberg/src/arrow/schema.rs:
##########
@@ -1019,6 +1019,57 @@ impl TryFrom<&crate::spec::Schema> for ArrowSchema {
     }
 }
 
+/// Converts a Datum (Iceberg type + primitive literal) to its corresponding 
Arrow DataType
+/// with Run-End Encoding (REE).
+///
+/// This function is used for constant fields in record batches, where all 
values are the same.
+/// Run-End Encoding provides efficient storage for such constant columns.
+///
+/// # Arguments
+/// * `datum` - The Datum to convert, which contains both type and value 
information
+///
+/// # Returns
+/// Arrow DataType with Run-End Encoding applied
+///
+/// # Example
+/// ```ignore

Review Comment:
   I think it's safe to run this test?



##########
crates/iceberg/src/arrow/record_batch_transformer.rs:
##########
@@ -53,20 +54,59 @@ use crate::{Error, ErrorKind, Result};
 fn constants_map(
     partition_spec: &PartitionSpec,
     partition_data: &Struct,
-) -> HashMap<i32, PrimitiveLiteral> {
+    schema: &IcebergSchema,
+) -> Result<HashMap<i32, Datum>> {
     let mut constants = HashMap::new();
 
     for (pos, field) in partition_spec.fields().iter().enumerate() {
         // Only identity transforms should use constant values from partition 
metadata
         if matches!(field.transform, Transform::Identity) {
+            // Get the field from schema to extract its type
+            let iceberg_field = 
schema.field_by_id(field.source_id).ok_or(Error::new(
+                ErrorKind::Unexpected,
+                format!("Field {} not found in schema", field.source_id),
+            ))?;
+
+            // Ensure the field type is primitive
+            let prim_type = match &*iceberg_field.field_type {
+                crate::spec::Type::Primitive(prim_type) => prim_type,
+                _ => {
+                    return Err(Error::new(
+                        ErrorKind::Unexpected,
+                        format!(
+                            "Partition field {} has non-primitive type {:?}",
+                            field.source_id, iceberg_field.field_type
+                        ),
+                    ));
+                }
+            };
+
             // Get the partition value for this field
-            if let Some(Literal::Primitive(value)) = &partition_data[pos] {
-                constants.insert(field.source_id, value.clone());
+            // Handle both None (null) and Some(Literal::Primitive) cases
+            match &partition_data[pos] {
+                None => {

Review Comment:
   It seems we need to change `Datum` to support null value.  We don't need to 
do it now, please file an issue to track it and add a TODO here.



##########
crates/iceberg/src/arrow/record_batch_transformer.rs:
##########
@@ -311,22 +359,48 @@ impl RecordBatchTransformer {
         let fields: Result<Vec<_>> = projected_iceberg_field_ids
             .iter()
             .map(|field_id| {
-                Ok(field_id_to_mapped_schema_map
-                    .get(field_id)
-                    .ok_or(Error::new(ErrorKind::Unexpected, "field not 
found"))?
-                    .0
-                    .clone())
+                // Check if this is a constant field
+                if constant_fields.contains_key(field_id) {
+                    // For metadata/virtual fields (like _file), get name from 
metadata_columns
+                    // For partition fields, get name from schema (they exist 
in schema)
+                    if let Ok(iceberg_field) = get_metadata_field(*field_id) {
+                        // This is a metadata/virtual field - convert Iceberg 
field to Arrow
+                        let arrow_type =
+                            
datum_to_arrow_type_with_ree(constant_fields.get(field_id).unwrap());
+                        let arrow_field =
+                            Field::new(&iceberg_field.name, arrow_type, 
!iceberg_field.required)
+                                .with_metadata(HashMap::from([(
+                                    PARQUET_FIELD_ID_META_KEY.to_string(),
+                                    iceberg_field.id.to_string(),
+                                )]));
+                        Ok(Arc::new(arrow_field))
+                    } else {
+                        // This is a partition constant field (exists in 
schema but uses constant value)
+                        let field = &field_id_to_mapped_schema_map
+                            .get(field_id)
+                            .ok_or(Error::new(ErrorKind::Unexpected, "field 
not found"))?
+                            .0;
+                        let datum = constant_fields.get(field_id).unwrap();

Review Comment:
   nit: To aovid unwrap here, we could use `constant_fields.get().map()`



##########
crates/iceberg/src/arrow/record_batch_transformer.rs:
##########
@@ -53,20 +54,59 @@ use crate::{Error, ErrorKind, Result};
 fn constants_map(
     partition_spec: &PartitionSpec,
     partition_data: &Struct,
-) -> HashMap<i32, PrimitiveLiteral> {
+    schema: &IcebergSchema,
+) -> Result<HashMap<i32, Datum>> {
     let mut constants = HashMap::new();
 
     for (pos, field) in partition_spec.fields().iter().enumerate() {
         // Only identity transforms should use constant values from partition 
metadata
         if matches!(field.transform, Transform::Identity) {
+            // Get the field from schema to extract its type
+            let iceberg_field = 
schema.field_by_id(field.source_id).ok_or(Error::new(
+                ErrorKind::Unexpected,
+                format!("Field {} not found in schema", field.source_id),
+            ))?;
+
+            // Ensure the field type is primitive
+            let prim_type = match &*iceberg_field.field_type {
+                crate::spec::Type::Primitive(prim_type) => prim_type,
+                _ => {
+                    return Err(Error::new(
+                        ErrorKind::Unexpected,
+                        format!(
+                            "Partition field {} has non-primitive type {:?}",
+                            field.source_id, iceberg_field.field_type
+                        ),
+                    ));
+                }
+            };
+
             // Get the partition value for this field
-            if let Some(Literal::Primitive(value)) = &partition_data[pos] {
-                constants.insert(field.source_id, value.clone());
+            // Handle both None (null) and Some(Literal::Primitive) cases
+            match &partition_data[pos] {
+                None => {

Review Comment:
   I think this incorrect, in this case we should return `None`, and other case 
we should return `Some(Datum)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to