alamb commented on code in PR #9021:
URL: https://github.com/apache/arrow-rs/pull/9021#discussion_r2729495830


##########
arrow-json/src/reader/mod.rs:
##########
@@ -373,6 +386,95 @@ impl<R: BufRead> RecordBatchReader for Reader<R> {
     }
 }
 
+/// A trait to create custom decoders for specific data types.
+///
+/// This allows overriding the default decoders for specific data types,
+/// or adding new decoders for custom data types.
+///
+/// # Examples
+///
+/// ```
+/// use arrow_json::{ArrayDecoder, DecoderFactory, TapeElement, Tape, 
ReaderBuilder, StructMode};
+/// use arrow_schema::ArrowError;
+/// use arrow_schema::{DataType, Field, Fields, Schema};
+/// use arrow_array::cast::AsArray;
+/// use arrow_array::Array;
+/// use arrow_array::builder::StringBuilder;
+/// use arrow_data::ArrayData;
+/// use std::sync::Arc;
+///
+/// struct IncorrectStringAsNullDecoder {}
+///
+/// impl ArrayDecoder for IncorrectStringAsNullDecoder {
+///     fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> 
Result<ArrayData, ArrowError> {
+///         let mut builder = StringBuilder::new();
+///         for p in pos {
+///             match tape.get(*p) {
+///                 TapeElement::String(idx) => {
+///                     builder.append_value(tape.get_string(idx));
+///                 }
+///                 _ => builder.append_null(),
+///             }
+///         }
+///         Ok(builder.finish().into_data())
+///     }
+/// }
+///
+/// #[derive(Debug)]
+/// struct IncorrectStringAsNullDecoderFactory;
+///
+/// impl DecoderFactory for IncorrectStringAsNullDecoderFactory {
+///     fn make_default_decoder<'a>(
+///         &self,
+///         _field: Option<FieldRef>,
+///         data_type: DataType,
+///         _coerce_primitive: bool,
+///         _strict_mode: bool,
+///         _is_nullable: bool,
+///         _struct_mode: StructMode,
+///     ) -> Result<Option<Box<dyn ArrayDecoder>>, ArrowError> {
+///         match data_type {
+///             DataType::Utf8 => 
Ok(Some(Box::new(IncorrectStringAsNullDecoder {}))),
+///             _ => Ok(None),
+///         }
+///     }
+/// }
+///
+/// let json = r#"
+/// {"a": "a"}
+/// {"a": 12}
+/// "#;
+/// let batch = 
ReaderBuilder::new(Arc::new(Schema::new(Fields::from(vec![Field::new(
+///     "a",
+///     DataType::Utf8,
+///     true,
+/// )]))))
+/// .with_decoder_factory(Arc::new(IncorrectStringAsNullDecoderFactory))
+/// .build(json.as_bytes())
+/// .unwrap()
+/// .next()
+/// .unwrap()
+/// .unwrap();
+///
+/// let values = batch.column(0).as_string::<i32>();
+/// assert_eq!(values.len(), 2);
+/// assert_eq!(values.value(0), "a");
+/// assert!(values.is_null(1));
+/// ```
+pub trait DecoderFactory: std::fmt::Debug + Send + Sync {

Review Comment:
   I think the key to making this easier to work with would be to register a 
decoder by path rather than by type -- so like you would register a custom 
decoder for a particular type you could register it by path
   
   So like if you are decoding a top level schema, you would register it by 
field name or schema position:
   ```
   { 
     foo: Int64, 
     bar: Float64, 
     baz: <custom decoder> 
   }
   ```
   
   We would have to figure out nested types somehow though 🤔 



##########
parquet-variant-compute/src/decoder.rs:
##########
@@ -0,0 +1,295 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{VariantArrayBuilder, VariantType};
+use arrow_array::{Array, StructArray};
+use arrow_data::ArrayData;
+use arrow_json::{DecoderFactory, StructMode};
+use arrow_schema::extension::ExtensionType;
+use arrow_schema::{ArrowError, DataType, FieldRef};
+use parquet_variant::{ObjectFieldBuilder, Variant, VariantBuilderExt};
+
+use arrow_json::reader::ArrayDecoder;
+use arrow_json::reader::{Tape, TapeElement};
+
+/// An [`ArrayDecoder`] implementation that decodes JSON values into a Variant 
array.
+///
+/// This decoder converts JSON tape elements (parsed JSON tokens) into Parquet 
Variant
+/// format, preserving the full structure of arbitrary JSON including nested 
objects,
+/// arrays, and primitive types.
+///
+/// This decoder is typically used indirectly via 
[`VariantArrayDecoderFactory`] when
+/// reading JSON data into Variant columns.
+#[derive(Default)]
+pub struct VariantArrayDecoder;
+
+impl ArrayDecoder for VariantArrayDecoder {
+    fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, 
ArrowError> {
+        let mut array_builder = VariantArrayBuilder::new(pos.len());
+        for p in pos {
+            variant_from_tape_element(&mut array_builder, *p, tape)?;
+        }
+        let variant_struct_array = StructArray::from(array_builder.build());
+        Ok(variant_struct_array.into_data())
+    }
+}
+
+/// A [`DecoderFactory`] that creates [`VariantArrayDecoder`] instances for 
Variant-typed fields.
+///
+/// This factory integrates with the Arrow JSON reader to automatically decode 
JSON values
+/// into Variant arrays when the target field is registered as a 
[`VariantType`] extension type.
+///
+/// # Example
+///
+/// ```ignore
+/// use arrow_json::reader::ReaderBuilder;
+/// use arrow_json::StructMode;
+/// use std::sync::Arc;
+///
+/// let builder = ReaderBuilder::new(Arc::new(schema));
+/// let reader = builder
+///     .with_struct_mode(StructMode::ObjectOnly)
+///     .with_decoder_factory(Arc::new(VariantArrayDecoderFactory))
+///     .build(json_input)?;
+/// ```
+#[derive(Default, Debug)]
+#[allow(unused)]
+pub struct VariantArrayDecoderFactory;
+
+impl DecoderFactory for VariantArrayDecoderFactory {
+    fn make_custom_decoder<'a>(
+        &self,
+        field: Option<FieldRef>,
+        _data_type: DataType,
+        _coerce_primitive: bool,
+        _strict_mode: bool,
+        _is_nullable: bool,
+        _struct_mode: StructMode,
+    ) -> Result<Option<Box<dyn ArrayDecoder>>, ArrowError> {
+        if let Some(field) = field
+            && field.extension_type_name() == Some(VariantType::NAME)
+        {
+            return Ok(Some(Box::new(VariantArrayDecoder)));
+        }
+        Ok(None)
+    }
+}
+
+fn variant_from_tape_element(
+    builder: &mut impl VariantBuilderExt,
+    mut p: u32,
+    tape: &Tape,
+) -> Result<u32, ArrowError> {
+    match tape.get(p) {
+        TapeElement::StartObject(end_idx) => {
+            let mut object_builder = builder.try_new_object()?;
+            p += 1;
+            while p < end_idx {
+                // Read field name
+                let field_name = match tape.get(p) {
+                    TapeElement::String(s) => tape.get_string(s),
+                    _ => return Err(tape.error(p, "field name")),
+                };
+
+                let mut field_builder = ObjectFieldBuilder::new(field_name, 
&mut object_builder);
+                p = tape.next(p, "field value")?;
+                p = variant_from_tape_element(&mut field_builder, p, tape)?;
+            }
+            object_builder.finish();
+        }
+        TapeElement::EndObject(_u32) => {
+            return Err(ArrowError::JsonError(
+                "unexpected end of object".to_string(),
+            ));
+        }
+        TapeElement::StartList(end_idx) => {
+            let mut list_builder = builder.try_new_list()?;
+            p += 1;
+            while p < end_idx {
+                p = variant_from_tape_element(&mut list_builder, p, tape)?;
+            }
+            list_builder.finish();
+        }
+        TapeElement::EndList(_u32) => {
+            return Err(ArrowError::JsonError("unexpected end of 
list".to_string()));
+        }
+        TapeElement::String(idx) => builder.append_value(tape.get_string(idx)),
+        TapeElement::Number(idx) => {
+            let s = tape.get_string(idx);
+            builder.append_value(parse_number(s)?)
+        }
+        TapeElement::I64(i) => {
+            return Err(ArrowError::JsonError(format!(
+                "I64 tape element not supported: {i}"
+            )));
+        }
+        TapeElement::I32(i) => {
+            return Err(ArrowError::JsonError(format!(
+                "I32 tape element not supported: {i}"
+            )));
+        }
+        TapeElement::F64(f) => {
+            return Err(ArrowError::JsonError(format!(
+                "F64 tape element not supported: {f}"
+            )));
+        }
+        TapeElement::F32(f) => {
+            return Err(ArrowError::JsonError(format!(
+                "F32 tape element not supported: {f}"
+            )));
+        }
+        TapeElement::True => builder.append_value(true),
+        TapeElement::False => builder.append_value(false),
+        TapeElement::Null => builder.append_value(Variant::Null),
+    }
+    p += 1;
+    Ok(p)
+}
+
+fn parse_number<'a, 'b>(s: &'a str) -> Result<Variant<'a, 'b>, ArrowError> {
+    if let Ok(v) = lexical_core::parse(s.as_bytes()) {

Review Comment:
   it is supposedly fastert



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to