mbrobbel commented on code in PR #8274:
URL: https://github.com/apache/arrow-rs/pull/8274#discussion_r2329419436


##########
arrow-avro/src/schema.rs:
##########
@@ -453,6 +427,49 @@ impl AvroSchema {
     pub fn generate_canonical_form(schema: &Schema) -> Result<String, 
ArrowError> {
         build_canonical(schema, None)
     }
+
+    /// Build Avro JSON from an Arrow [`ArrowSchema`], applying the given 
null‑union order.
+    ///
+    /// If the input Arrow schema already contains Avro JSON in
+    /// [`SCHEMA_METADATA_KEY`], that JSON is returned verbatim to preserve
+    ///  the exact header encoding alignment; otherwise, a new JSON is 
generated
+    /// honoring `null_union_order` at **all nullable sites**.
+    pub fn from_arrow_with_options(
+        schema: &ArrowSchema,
+        null_order: Option<Nullability>,
+    ) -> Result<AvroSchema, ArrowError> {
+        if let Some(json) = schema.metadata.get(SCHEMA_METADATA_KEY) {
+            return Ok(AvroSchema::new(json.clone()));
+        }
+        let order = null_order.unwrap_or_default();
+        let mut name_gen = NameGenerator::default();
+        let fields_json = schema
+            .fields()
+            .iter()
+            .map(|f| arrow_field_to_avro(f, &mut name_gen, order))
+            .collect::<Result<Vec<_>, _>>()?;
+        let record_name = schema
+            .metadata
+            .get(AVRO_NAME_METADATA_KEY)
+            .map_or("topLevelRecord", |s| s.as_str());
+        let mut record = JsonMap::with_capacity(schema.metadata.len() + 4);
+        record.insert("type".into(), Value::String("record".into()));
+        record.insert(
+            "name".into(),
+            Value::String(sanitise_avro_name(record_name)),
+        );
+        if let Some(ns) = schema.metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
+            record.insert("namespace".into(), Value::String(ns.clone()));
+        }
+        if let Some(doc) = schema.metadata.get(AVRO_DOC_METADATA_KEY) {
+            record.insert("doc".into(), Value::String(doc.clone()));
+        }
+        record.insert("fields".into(), Value::Array(fields_json));
+        extend_with_passthrough_metadata(&mut record, &schema.metadata);
+        let json_string = serde_json::to_string(&Value::Object(record))
+            .map_err(|e| ArrowError::SchemaError(format!("Serialising Avro 
JSON failed: {e}")))?;

Review Comment:
   ```suggestion
               .map_err(|e| ArrowError::SchemaError(format!("Serializing Avro 
JSON failed: {e}")))?;
   ```



##########
arrow-avro/src/schema.rs:
##########
@@ -453,6 +427,49 @@ impl AvroSchema {
     pub fn generate_canonical_form(schema: &Schema) -> Result<String, 
ArrowError> {
         build_canonical(schema, None)
     }
+
+    /// Build Avro JSON from an Arrow [`ArrowSchema`], applying the given 
null‑union order.
+    ///
+    /// If the input Arrow schema already contains Avro JSON in
+    /// [`SCHEMA_METADATA_KEY`], that JSON is returned verbatim to preserve
+    ///  the exact header encoding alignment; otherwise, a new JSON is 
generated

Review Comment:
   ```suggestion
       /// the exact header encoding alignment; otherwise, a new JSON is 
generated
   ```



##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -83,181 +74,416 @@ fn write_bool<W: Write + ?Sized>(writer: &mut W, v: bool) 
-> Result<(), ArrowErr
 /// Branch index is 0-based per Avro unions:
 /// - Null-first (default): null => 0, value => 1
 /// - Null-second (Impala): value => 0, null => 1
-#[inline]
-fn write_optional_branch<W: Write + ?Sized>(
-    writer: &mut W,
+fn write_optional_index<W: Write + ?Sized>(
+    out: &mut W,
     is_null: bool,
-    impala_mode: bool,
+    null_order: Nullability,
 ) -> Result<(), ArrowError> {
-    let branch = if impala_mode == is_null { 1 } else { 0 };
-    write_int(writer, branch)
+    let byte = union_value_branch_byte(null_order, is_null);
+    out.write_all(&[byte])
+        .map_err(|e| ArrowError::IoError(format!("write union branch: {e}"), 
e))
 }
 
-/// Encode a `RecordBatch` in Avro binary format using **default options**.
-pub fn encode_record_batch<W: Write>(batch: &RecordBatch, out: &mut W) -> 
Result<(), ArrowError> {
-    encode_record_batch_with_options(batch, out, &EncoderOptions::default())
+#[derive(Debug, Clone)]
+enum NullState {
+    NonNullable,
+    NullableNoNulls {
+        union_value_byte: u8,
+    },
+    Nullable {
+        nulls: NullBuffer,
+        null_order: Nullability,
+    },
 }
 
-/// Encode a `RecordBatch` with explicit `EncoderOptions`.
-pub fn encode_record_batch_with_options<W: Write>(
-    batch: &RecordBatch,
-    out: &mut W,
-    opts: &EncoderOptions,
-) -> Result<(), ArrowError> {
-    let mut encoders = batch
-        .schema()
-        .fields()
-        .iter()
-        .zip(batch.columns())
-        .map(|(field, array)| Ok((field.is_nullable(), 
make_encoder(array.as_ref())?)))
-        .collect::<Result<Vec<_>, ArrowError>>()?;
-    (0..batch.num_rows()).try_for_each(|row| {
-        encoders.iter_mut().try_for_each(|(is_nullable, enc)| {
-            if *is_nullable {
-                let is_null = enc.is_null(row);
-                write_optional_branch(out, is_null, opts.impala_mode)?;
-                if is_null {
-                    return Ok(());
+/// Arrow to Avro FieldEncoder:
+/// - Holds the inner `Encoder` (by value)
+/// - Carries the per-site nullability **state** as a single enum that 
enforces invariants
+pub struct FieldEncoder<'a> {
+    encoder: Encoder<'a>,
+    null_state: NullState,
+}
+
+impl<'a> FieldEncoder<'a> {
+    fn make_encoder(
+        array: &'a dyn Array,
+        field: &Field,
+        plan: &FieldPlan,
+        nullability: Option<Nullability>,
+    ) -> Result<Self, ArrowError> {
+        let encoder = match plan {
+            FieldPlan::Struct { encoders } => {
+                let arr = array
+                    .as_any()
+                    .downcast_ref::<StructArray>()
+                    .ok_or_else(|| ArrowError::SchemaError("Expected 
StructArray".into()))?;
+                Encoder::Struct(Box::new(StructEncoder::try_new(arr, 
encoders)?))
+            }
+            FieldPlan::List {
+                items_nullability,
+                item_plan,
+            } => match array.data_type() {
+                DataType::List(_) => {
+                    let arr = array
+                        .as_any()
+                        .downcast_ref::<ListArray>()
+                        .ok_or_else(|| ArrowError::SchemaError("Expected 
ListArray".into()))?;
+                    Encoder::List(Box::new(ListEncoder32::try_new(
+                        arr,
+                        *items_nullability,
+                        item_plan.as_ref(),
+                    )?))
+                }
+                DataType::LargeList(_) => {
+                    let arr = array
+                        .as_any()
+                        .downcast_ref::<LargeListArray>()
+                        .ok_or_else(|| ArrowError::SchemaError("Expected 
LargeListArray".into()))?;
+                    Encoder::LargeList(Box::new(ListEncoder64::try_new(
+                        arr,
+                        *items_nullability,
+                        item_plan.as_ref(),
+                    )?))
+                }
+                other => {
+                    return Err(ArrowError::SchemaError(format!(
+                        "Avro array site requires Arrow List/LargeList, found: 
{other:?}"
+                    )))
+                }
+            },
+            FieldPlan::Scalar => match array.data_type() {
+                DataType::Boolean => 
Encoder::Boolean(BooleanEncoder(array.as_boolean())),
+                DataType::Utf8 => {
+                    
Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
+                }
+                DataType::LargeUtf8 => {
+                    
Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
+                }
+                DataType::Int32 => 
Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
+                DataType::Int64 => 
Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
+                DataType::Float32 => {
+                    
Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
+                }
+                DataType::Float64 => {
+                    
Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
+                }
+                DataType::Binary => 
Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
+                DataType::LargeBinary => {
+                    
Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
                 }
+                DataType::Timestamp(TimeUnit::Microsecond, _) => 
Encoder::Timestamp(LongEncoder(
+                    array.as_primitive::<TimestampMicrosecondType>(),
+                )),
+                other => {
+                    return Err(ArrowError::NotYetImplemented(format!(
+                        "Avro scalar type not yet supported: {other:?}"
+                    )));
+                }
+            },
+            other => {
+                return Err(ArrowError::NotYetImplemented(
+                    "Avro writer: {other:?} not yet supported".into(),
+                ));

Review Comment:
   ```suggestion
                   return Err(ArrowError::NotYetImplemented(format!(
                       "Avro writer: {other:?} not yet supported".into(),
                   )));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to