mbrobbel commented on code in PR #8274:
URL: https://github.com/apache/arrow-rs/pull/8274#discussion_r2329419436
##########
arrow-avro/src/schema.rs:
##########
@@ -453,6 +427,49 @@ impl AvroSchema {
pub fn generate_canonical_form(schema: &Schema) -> Result<String,
ArrowError> {
build_canonical(schema, None)
}
+
+ /// Build Avro JSON from an Arrow [`ArrowSchema`], applying the given
null‑union order.
+ ///
+ /// If the input Arrow schema already contains Avro JSON in
+ /// [`SCHEMA_METADATA_KEY`], that JSON is returned verbatim to preserve
+ /// the exact header encoding alignment; otherwise, a new JSON is
generated
+ /// honoring `null_union_order` at **all nullable sites**.
+ pub fn from_arrow_with_options(
+ schema: &ArrowSchema,
+ null_order: Option<Nullability>,
+ ) -> Result<AvroSchema, ArrowError> {
+ if let Some(json) = schema.metadata.get(SCHEMA_METADATA_KEY) {
+ return Ok(AvroSchema::new(json.clone()));
+ }
+ let order = null_order.unwrap_or_default();
+ let mut name_gen = NameGenerator::default();
+ let fields_json = schema
+ .fields()
+ .iter()
+ .map(|f| arrow_field_to_avro(f, &mut name_gen, order))
+ .collect::<Result<Vec<_>, _>>()?;
+ let record_name = schema
+ .metadata
+ .get(AVRO_NAME_METADATA_KEY)
+ .map_or("topLevelRecord", |s| s.as_str());
+ let mut record = JsonMap::with_capacity(schema.metadata.len() + 4);
+ record.insert("type".into(), Value::String("record".into()));
+ record.insert(
+ "name".into(),
+ Value::String(sanitise_avro_name(record_name)),
+ );
+ if let Some(ns) = schema.metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
+ record.insert("namespace".into(), Value::String(ns.clone()));
+ }
+ if let Some(doc) = schema.metadata.get(AVRO_DOC_METADATA_KEY) {
+ record.insert("doc".into(), Value::String(doc.clone()));
+ }
+ record.insert("fields".into(), Value::Array(fields_json));
+ extend_with_passthrough_metadata(&mut record, &schema.metadata);
+ let json_string = serde_json::to_string(&Value::Object(record))
+ .map_err(|e| ArrowError::SchemaError(format!("Serialising Avro
JSON failed: {e}")))?;
Review Comment:
```suggestion
.map_err(|e| ArrowError::SchemaError(format!("Serializing Avro
JSON failed: {e}")))?;
```
##########
arrow-avro/src/schema.rs:
##########
@@ -453,6 +427,49 @@ impl AvroSchema {
pub fn generate_canonical_form(schema: &Schema) -> Result<String,
ArrowError> {
build_canonical(schema, None)
}
+
+ /// Build Avro JSON from an Arrow [`ArrowSchema`], applying the given
null‑union order.
+ ///
+ /// If the input Arrow schema already contains Avro JSON in
+ /// [`SCHEMA_METADATA_KEY`], that JSON is returned verbatim to preserve
+ /// the exact header encoding alignment; otherwise, a new JSON is
generated
Review Comment:
```suggestion
/// the exact header encoding alignment; otherwise, a new JSON is
generated
```
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -83,181 +74,416 @@ fn write_bool<W: Write + ?Sized>(writer: &mut W, v: bool)
-> Result<(), ArrowErr
/// Branch index is 0-based per Avro unions:
/// - Null-first (default): null => 0, value => 1
/// - Null-second (Impala): value => 0, null => 1
-#[inline]
-fn write_optional_branch<W: Write + ?Sized>(
- writer: &mut W,
+fn write_optional_index<W: Write + ?Sized>(
+ out: &mut W,
is_null: bool,
- impala_mode: bool,
+ null_order: Nullability,
) -> Result<(), ArrowError> {
- let branch = if impala_mode == is_null { 1 } else { 0 };
- write_int(writer, branch)
+ let byte = union_value_branch_byte(null_order, is_null);
+ out.write_all(&[byte])
+ .map_err(|e| ArrowError::IoError(format!("write union branch: {e}"),
e))
}
-/// Encode a `RecordBatch` in Avro binary format using **default options**.
-pub fn encode_record_batch<W: Write>(batch: &RecordBatch, out: &mut W) ->
Result<(), ArrowError> {
- encode_record_batch_with_options(batch, out, &EncoderOptions::default())
+#[derive(Debug, Clone)]
+enum NullState {
+ NonNullable,
+ NullableNoNulls {
+ union_value_byte: u8,
+ },
+ Nullable {
+ nulls: NullBuffer,
+ null_order: Nullability,
+ },
}
-/// Encode a `RecordBatch` with explicit `EncoderOptions`.
-pub fn encode_record_batch_with_options<W: Write>(
- batch: &RecordBatch,
- out: &mut W,
- opts: &EncoderOptions,
-) -> Result<(), ArrowError> {
- let mut encoders = batch
- .schema()
- .fields()
- .iter()
- .zip(batch.columns())
- .map(|(field, array)| Ok((field.is_nullable(),
make_encoder(array.as_ref())?)))
- .collect::<Result<Vec<_>, ArrowError>>()?;
- (0..batch.num_rows()).try_for_each(|row| {
- encoders.iter_mut().try_for_each(|(is_nullable, enc)| {
- if *is_nullable {
- let is_null = enc.is_null(row);
- write_optional_branch(out, is_null, opts.impala_mode)?;
- if is_null {
- return Ok(());
+/// Arrow to Avro FieldEncoder:
+/// - Holds the inner `Encoder` (by value)
+/// - Carries the per-site nullability **state** as a single enum that
enforces invariants
+pub struct FieldEncoder<'a> {
+ encoder: Encoder<'a>,
+ null_state: NullState,
+}
+
+impl<'a> FieldEncoder<'a> {
+ fn make_encoder(
+ array: &'a dyn Array,
+ field: &Field,
+ plan: &FieldPlan,
+ nullability: Option<Nullability>,
+ ) -> Result<Self, ArrowError> {
+ let encoder = match plan {
+ FieldPlan::Struct { encoders } => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
StructArray".into()))?;
+ Encoder::Struct(Box::new(StructEncoder::try_new(arr,
encoders)?))
+ }
+ FieldPlan::List {
+ items_nullability,
+ item_plan,
+ } => match array.data_type() {
+ DataType::List(_) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
ListArray".into()))?;
+ Encoder::List(Box::new(ListEncoder32::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
+ DataType::LargeList(_) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<LargeListArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
LargeListArray".into()))?;
+ Encoder::LargeList(Box::new(ListEncoder64::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
+ other => {
+ return Err(ArrowError::SchemaError(format!(
+ "Avro array site requires Arrow List/LargeList, found:
{other:?}"
+ )))
+ }
+ },
+ FieldPlan::Scalar => match array.data_type() {
+ DataType::Boolean =>
Encoder::Boolean(BooleanEncoder(array.as_boolean())),
+ DataType::Utf8 => {
+
Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
+ }
+ DataType::LargeUtf8 => {
+
Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
+ }
+ DataType::Int32 =>
Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
+ DataType::Int64 =>
Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
+ DataType::Float32 => {
+
Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
+ }
+ DataType::Float64 => {
+
Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
+ }
+ DataType::Binary =>
Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
+ DataType::LargeBinary => {
+
Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
}
+ DataType::Timestamp(TimeUnit::Microsecond, _) =>
Encoder::Timestamp(LongEncoder(
+ array.as_primitive::<TimestampMicrosecondType>(),
+ )),
+ other => {
+ return Err(ArrowError::NotYetImplemented(format!(
+ "Avro scalar type not yet supported: {other:?}"
+ )));
+ }
+ },
+ other => {
+ return Err(ArrowError::NotYetImplemented(
+ "Avro writer: {other:?} not yet supported".into(),
+ ));
Review Comment:
```suggestion
return Err(ArrowError::NotYetImplemented(format!(
"Avro writer: {other:?} not yet supported".into(),
)));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]