This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new bbb47a690 Expose page-level arrow reader API (#4298) (#4307)
bbb47a690 is described below

commit bbb47a69023094809b69110c79a8f18d191e54f1
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Wed May 31 11:49:48 2023 +0100

    Expose page-level arrow reader API (#4298) (#4307)
    
    * Expose page-level arrow reader API (#4298)
    
    * Make scan_ranges public
    
    * Review feedback
---
 parquet/src/arrow/array_reader/builder.rs    | 20 +++++-----
 parquet/src/arrow/array_reader/list_array.rs |  4 +-
 parquet/src/arrow/array_reader/mod.rs        | 26 ++++--------
 parquet/src/arrow/arrow_reader/mod.rs        | 39 +++++++++++++-----
 parquet/src/arrow/arrow_reader/selection.rs  | 11 +++--
 parquet/src/arrow/async_reader/mod.rs        | 16 +++-----
 parquet/src/arrow/mod.rs                     |  3 +-
 parquet/src/arrow/schema/complex.rs          | 12 +++---
 parquet/src/arrow/schema/mod.rs              | 60 +++++++++++++++++++++++-----
 9 files changed, 122 insertions(+), 69 deletions(-)

diff --git a/parquet/src/arrow/array_reader/builder.rs 
b/parquet/src/arrow/array_reader/builder.rs
index 5e0d05e89..bb3f40335 100644
--- a/parquet/src/arrow/array_reader/builder.rs
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -24,7 +24,7 @@ use 
crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_
 use crate::arrow::array_reader::{
     make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader,
     FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
-    PrimitiveArrayReader, RowGroupCollection, StructArrayReader,
+    PrimitiveArrayReader, RowGroups, StructArrayReader,
 };
 use crate::arrow::schema::{ParquetField, ParquetFieldType};
 use crate::arrow::ProjectionMask;
@@ -39,7 +39,7 @@ use crate::schema::types::{ColumnDescriptor, ColumnPath, 
Type};
 pub fn build_array_reader(
     field: Option<&ParquetField>,
     mask: &ProjectionMask,
-    row_groups: &dyn RowGroupCollection,
+    row_groups: &dyn RowGroups,
 ) -> Result<Box<dyn ArrayReader>> {
     let reader = field
         .and_then(|field| build_reader(field, mask, row_groups).transpose())
@@ -52,7 +52,7 @@ pub fn build_array_reader(
 fn build_reader(
     field: &ParquetField,
     mask: &ProjectionMask,
-    row_groups: &dyn RowGroupCollection,
+    row_groups: &dyn RowGroups,
 ) -> Result<Option<Box<dyn ArrayReader>>> {
     match field.field_type {
         ParquetFieldType::Primitive { .. } => {
@@ -75,7 +75,7 @@ fn build_reader(
 fn build_map_reader(
     field: &ParquetField,
     mask: &ProjectionMask,
-    row_groups: &dyn RowGroupCollection,
+    row_groups: &dyn RowGroups,
 ) -> Result<Option<Box<dyn ArrayReader>>> {
     let children = field.children().unwrap();
     assert_eq!(children.len(), 2);
@@ -127,7 +127,7 @@ fn build_list_reader(
     field: &ParquetField,
     mask: &ProjectionMask,
     is_large: bool,
-    row_groups: &dyn RowGroupCollection,
+    row_groups: &dyn RowGroups,
 ) -> Result<Option<Box<dyn ArrayReader>>> {
     let children = field.children().unwrap();
     assert_eq!(children.len(), 1);
@@ -173,7 +173,7 @@ fn build_list_reader(
 fn build_fixed_size_list_reader(
     field: &ParquetField,
     mask: &ProjectionMask,
-    row_groups: &dyn RowGroupCollection,
+    row_groups: &dyn RowGroups,
 ) -> Result<Option<Box<dyn ArrayReader>>> {
     let children = field.children().unwrap();
     assert_eq!(children.len(), 1);
@@ -210,7 +210,7 @@ fn build_fixed_size_list_reader(
 fn build_primitive_reader(
     field: &ParquetField,
     mask: &ProjectionMask,
-    row_groups: &dyn RowGroupCollection,
+    row_groups: &dyn RowGroups,
 ) -> Result<Option<Box<dyn ArrayReader>>> {
     let (col_idx, primitive_type) = match &field.field_type {
         ParquetFieldType::Primitive {
@@ -301,7 +301,7 @@ fn build_primitive_reader(
 fn build_struct_reader(
     field: &ParquetField,
     mask: &ProjectionMask,
-    row_groups: &dyn RowGroupCollection,
+    row_groups: &dyn RowGroups,
 ) -> Result<Option<Box<dyn ArrayReader>>> {
     let arrow_fields = match &field.arrow_type {
         DataType::Struct(children) => children,
@@ -338,7 +338,7 @@ fn build_struct_reader(
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::arrow::schema::parquet_to_array_schema_and_fields;
+    use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
     use crate::file::reader::{FileReader, SerializedFileReader};
     use crate::util::test_common::file_util::get_test_file;
     use arrow::datatypes::Field;
@@ -352,7 +352,7 @@ mod tests {
 
         let file_metadata = file_reader.metadata().file_metadata();
         let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]);
-        let (_, fields) = parquet_to_array_schema_and_fields(
+        let (_, fields) = parquet_to_arrow_schema_and_fields(
             file_metadata.schema_descr(),
             ProjectionMask::all(),
             file_metadata.key_value_metadata(),
diff --git a/parquet/src/arrow/array_reader/list_array.rs 
b/parquet/src/arrow/array_reader/list_array.rs
index 932034417..7c66c5c23 100644
--- a/parquet/src/arrow/array_reader/list_array.rs
+++ b/parquet/src/arrow/array_reader/list_array.rs
@@ -250,7 +250,7 @@ mod tests {
     use crate::arrow::array_reader::build_array_reader;
     use crate::arrow::array_reader::list_array::ListArrayReader;
     use crate::arrow::array_reader::test_util::InMemoryArrayReader;
-    use crate::arrow::schema::parquet_to_array_schema_and_fields;
+    use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
     use crate::arrow::{parquet_to_arrow_schema, ArrowWriter, ProjectionMask};
     use crate::file::properties::WriterProperties;
     use crate::file::reader::{FileReader, SerializedFileReader};
@@ -566,7 +566,7 @@ mod tests {
         let file_metadata = file_reader.metadata().file_metadata();
         let schema = file_metadata.schema_descr();
         let mask = ProjectionMask::leaves(schema, vec![0]);
-        let (_, fields) = parquet_to_array_schema_and_fields(
+        let (_, fields) = parquet_to_arrow_schema_and_fields(
             schema,
             ProjectionMask::all(),
             file_metadata.key_value_metadata(),
diff --git a/parquet/src/arrow/array_reader/mod.rs 
b/parquet/src/arrow/array_reader/mod.rs
index 823084b43..1e781fb73 100644
--- a/parquet/src/arrow/array_reader/mod.rs
+++ b/parquet/src/arrow/array_reader/mod.rs
@@ -28,7 +28,6 @@ use crate::arrow::record_reader::GenericRecordReader;
 use crate::column::page::PageIterator;
 use crate::column::reader::decoder::ColumnValueDecoder;
 use crate::file::reader::{FilePageIterator, FileReader};
-use crate::schema::types::SchemaDescPtr;
 
 mod builder;
 mod byte_array;
@@ -100,22 +99,15 @@ pub trait ArrayReader: Send {
 }
 
 /// A collection of row groups
-pub trait RowGroupCollection {
-    /// Get schema of parquet file.
-    fn schema(&self) -> SchemaDescPtr;
-
+pub trait RowGroups {
     /// Get the number of rows in this collection
     fn num_rows(&self) -> usize;
 
-    /// Returns an iterator over the column chunks for particular column
+    /// Returns a [`PageIterator`] for the column chunks with the given leaf 
column index
     fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;
 }
 
-impl RowGroupCollection for Arc<dyn FileReader> {
-    fn schema(&self) -> SchemaDescPtr {
-        self.metadata().file_metadata().schema_descr_ptr()
-    }
-
+impl RowGroups for Arc<dyn FileReader> {
     fn num_rows(&self) -> usize {
         self.metadata().file_metadata().num_rows() as usize
     }
@@ -126,26 +118,22 @@ impl RowGroupCollection for Arc<dyn FileReader> {
     }
 }
 
-pub(crate) struct FileReaderRowGroupCollection {
+pub(crate) struct FileReaderRowGroups {
     /// The underling file reader
     reader: Arc<dyn FileReader>,
     /// Optional list of row group indices to scan
     row_groups: Option<Vec<usize>>,
 }
 
-impl FileReaderRowGroupCollection {
-    /// Creates a new [`RowGroupCollection`] from a `FileReader` and an 
optional
+impl FileReaderRowGroups {
+    /// Creates a new [`RowGroups`] from a `FileReader` and an optional
     /// list of row group indexes to scan
     pub fn new(reader: Arc<dyn FileReader>, row_groups: Option<Vec<usize>>) -> 
Self {
         Self { reader, row_groups }
     }
 }
 
-impl RowGroupCollection for FileReaderRowGroupCollection {
-    fn schema(&self) -> SchemaDescPtr {
-        self.reader.metadata().file_metadata().schema_descr_ptr()
-    }
-
+impl RowGroups for FileReaderRowGroups {
     fn num_rows(&self) -> usize {
         match &self.row_groups {
             None => self.reader.metadata().file_metadata().num_rows() as usize,
diff --git a/parquet/src/arrow/arrow_reader/mod.rs 
b/parquet/src/arrow/arrow_reader/mod.rs
index f3e178bdf..9cb09c9a5 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -26,12 +26,9 @@ use arrow_array::{RecordBatch, RecordBatchReader};
 use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
 use arrow_select::filter::prep_null_mask_filter;
 
-use crate::arrow::array_reader::{
-    build_array_reader, ArrayReader, FileReaderRowGroupCollection, 
RowGroupCollection,
-};
-use crate::arrow::schema::parquet_to_array_schema_and_fields;
-use crate::arrow::schema::ParquetField;
-use crate::arrow::ProjectionMask;
+use crate::arrow::array_reader::{build_array_reader, ArrayReader, 
FileReaderRowGroups};
+use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
+use crate::arrow::{FieldLevels, ProjectionMask};
 use crate::errors::{ParquetError, Result};
 use crate::file::metadata::ParquetMetaData;
 use crate::file::reader::{ChunkReader, SerializedFileReader};
@@ -41,6 +38,7 @@ use crate::schema::types::SchemaDescriptor;
 mod filter;
 mod selection;
 
+pub use crate::arrow::array_reader::RowGroups;
 pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
 pub use selection::{RowSelection, RowSelector};
 
@@ -87,7 +85,7 @@ impl<T> ArrowReaderBuilder<T> {
             false => metadata.file_metadata().key_value_metadata(),
         };
 
-        let (schema, fields) = parquet_to_array_schema_and_fields(
+        let (schema, fields) = parquet_to_arrow_schema_and_fields(
             metadata.file_metadata().schema_descr(),
             ProjectionMask::all(),
             kv_metadata,
@@ -269,8 +267,7 @@ impl<T: ChunkReader + 'static> 
ArrowReaderBuilder<SyncReader<T>> {
     ///
     /// Note: this will eagerly evaluate any `RowFilter` before returning
     pub fn build(self) -> Result<ParquetRecordBatchReader> {
-        let reader =
-            FileReaderRowGroupCollection::new(Arc::new(self.input.0), 
self.row_groups);
+        let reader = FileReaderRowGroups::new(Arc::new(self.input.0), 
self.row_groups);
 
         let mut filter = self.filter;
         let mut selection = self.selection;
@@ -420,6 +417,30 @@ impl ParquetRecordBatchReader {
             .build()
     }
 
+    /// Create a new [`ParquetRecordBatchReader`] from the provided 
[`RowGroups`]
+    ///
+    /// Note: this is a low-level interface see 
[`ParquetRecordBatchReader::try_new`] for a
+    /// higher-level interface for reading parquet data from a file
+    pub fn try_new_with_row_groups(
+        levels: &FieldLevels,
+        row_groups: &dyn RowGroups,
+        batch_size: usize,
+        selection: Option<RowSelection>,
+    ) -> Result<Self> {
+        let array_reader = build_array_reader(
+            levels.levels.as_ref(),
+            &ProjectionMask::all(),
+            row_groups,
+        )?;
+
+        Ok(Self {
+            batch_size,
+            array_reader,
+            schema: Arc::new(Schema::new(levels.fields.clone())),
+            selection: selection.map(|s| s.trim().into()),
+        })
+    }
+
     /// Create a new [`ParquetRecordBatchReader`] that will read at most 
`batch_size` rows at
     /// a time from [`ArrayReader`] based on the configured `selection`. If 
`selection` is `None`
     /// all rows will be returned
diff --git a/parquet/src/arrow/arrow_reader/selection.rs 
b/parquet/src/arrow/arrow_reader/selection.rs
index 76f950620..a558f893c 100644
--- a/parquet/src/arrow/arrow_reader/selection.rs
+++ b/parquet/src/arrow/arrow_reader/selection.rs
@@ -173,9 +173,14 @@ impl RowSelection {
         }
     }
 
-    /// Given an offset index, return the offset ranges for all data pages 
selected by `self`
-    #[cfg(any(test, feature = "async"))]
-    pub(crate) fn scan_ranges(
+    /// Given an offset index, return the byte ranges for all data pages 
selected by `self`
+    ///
+    /// This is useful for determining what byte ranges to fetch from 
underlying storage
+    ///
+    /// Note: this method does not make any effort to combine consecutive 
ranges, nor coalesce
+    /// ranges that are close together. This is instead delegated to the IO 
subsystem to optimise,
+    /// e.g. [`ObjectStore::get_ranges`](object_store::ObjectStore::get_ranges)
+    pub fn scan_ranges(
         &self,
         page_locations: &[crate::format::PageLocation],
     ) -> Vec<Range<usize>> {
diff --git a/parquet/src/arrow/async_reader/mod.rs 
b/parquet/src/arrow/async_reader/mod.rs
index c11033eae..f17fb0751 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -94,12 +94,11 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, 
AsyncSeekExt};
 use arrow_array::RecordBatch;
 use arrow_schema::SchemaRef;
 
-use crate::arrow::array_reader::{build_array_reader, RowGroupCollection};
+use crate::arrow::array_reader::{build_array_reader, RowGroups};
 use crate::arrow::arrow_reader::{
     apply_range, evaluate_predicate, selects_any, ArrowReaderBuilder, 
ArrowReaderOptions,
     ParquetRecordBatchReader, RowFilter, RowSelection,
 };
-use crate::arrow::schema::ParquetField;
 use crate::arrow::ProjectionMask;
 
 use crate::column::page::{PageIterator, PageReader};
@@ -112,14 +111,13 @@ use crate::format::PageLocation;
 
 use crate::file::FOOTER_SIZE;
 
-use crate::schema::types::SchemaDescPtr;
-
 mod metadata;
 pub use metadata::*;
 
 #[cfg(feature = "object_store")]
 mod store;
 
+use crate::arrow::schema::ParquetField;
 #[cfg(feature = "object_store")]
 pub use store::*;
 
@@ -648,11 +646,7 @@ impl<'a> InMemoryRowGroup<'a> {
     }
 }
 
-impl<'a> RowGroupCollection for InMemoryRowGroup<'a> {
-    fn schema(&self) -> SchemaDescPtr {
-        self.metadata.schema_descr_ptr()
-    }
-
+impl<'a> RowGroups for InMemoryRowGroup<'a> {
     fn num_rows(&self) -> usize {
         self.row_count
     }
@@ -756,7 +750,7 @@ mod tests {
     use crate::arrow::arrow_reader::{
         ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector,
     };
-    use crate::arrow::schema::parquet_to_array_schema_and_fields;
+    use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
     use crate::arrow::ArrowWriter;
     use crate::file::footer::parse_metadata;
     use crate::file::page_index::index_reader;
@@ -1401,7 +1395,7 @@ mod tests {
         };
 
         let requests = async_reader.requests.clone();
-        let (_, fields) = parquet_to_array_schema_and_fields(
+        let (_, fields) = parquet_to_arrow_schema_and_fields(
             metadata.file_metadata().schema_descr(),
             ProjectionMask::all(),
             None,
diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs
index e5211ec23..aad4925c7 100644
--- a/parquet/src/arrow/mod.rs
+++ b/parquet/src/arrow/mod.rs
@@ -123,7 +123,8 @@ pub use self::async_writer::AsyncArrowWriter;
 use crate::schema::types::SchemaDescriptor;
 
 pub use self::schema::{
-    arrow_to_parquet_schema, parquet_to_arrow_schema, 
parquet_to_arrow_schema_by_columns,
+    arrow_to_parquet_schema, parquet_to_arrow_field_levels, 
parquet_to_arrow_schema,
+    parquet_to_arrow_schema_by_columns, FieldLevels,
 };
 
 /// Schema metadata key used to store serialized Arrow IPC schema
diff --git a/parquet/src/arrow/schema/complex.rs 
b/parquet/src/arrow/schema/complex.rs
index c1699aafc..0d19875d9 100644
--- a/parquet/src/arrow/schema/complex.rs
+++ b/parquet/src/arrow/schema/complex.rs
@@ -24,7 +24,7 @@ use crate::basic::{ConvertedType, Repetition};
 use crate::errors::ParquetError;
 use crate::errors::Result;
 use crate::schema::types::{SchemaDescriptor, Type, TypePtr};
-use arrow_schema::{DataType, Field, Schema, SchemaBuilder};
+use arrow_schema::{DataType, Field, Fields, SchemaBuilder};
 
 fn get_repetition(t: &Type) -> Repetition {
     let info = t.get_basic_info();
@@ -34,7 +34,8 @@ fn get_repetition(t: &Type) -> Repetition {
     }
 }
 
-/// Representation of a parquet file, in terms of arrow schema elements
+/// Representation of a parquet schema element, in terms of arrow schema 
elements
+#[derive(Debug, Clone)]
 pub struct ParquetField {
     /// The level which represents an insertion into the current list
     /// i.e. guaranteed to be > 0 for a list type
@@ -82,6 +83,7 @@ impl ParquetField {
     }
 }
 
+#[derive(Debug, Clone)]
 pub enum ParquetFieldType {
     Primitive {
         /// The index of the column in parquet
@@ -554,13 +556,13 @@ fn convert_field(
 
 /// Computes the [`ParquetField`] for the provided [`SchemaDescriptor`] with 
`leaf_columns` listing
 /// the indexes of leaf columns to project, and `embedded_arrow_schema` the 
optional
-/// [`Schema`] embedded in the parquet metadata
+/// [`Fields`] embedded in the parquet metadata
 ///
 /// Note: This does not support out of order column projection
 pub fn convert_schema(
     schema: &SchemaDescriptor,
     mask: ProjectionMask,
-    embedded_arrow_schema: Option<&Schema>,
+    embedded_arrow_schema: Option<&Fields>,
 ) -> Result<Option<ParquetField>> {
     let mut visitor = Visitor {
         next_col_idx: 0,
@@ -570,7 +572,7 @@ pub fn convert_schema(
     let context = VisitorContext {
         rep_level: 0,
         def_level: 0,
-        data_type: embedded_arrow_schema.map(|s| 
DataType::Struct(s.fields().clone())),
+        data_type: embedded_arrow_schema.map(|fields| 
DataType::Struct(fields.clone())),
     };
 
     visitor.dispatch(&schema.root_schema_ptr(), context)
diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs
index a80d4add3..3b9691044 100644
--- a/parquet/src/arrow/schema/mod.rs
+++ b/parquet/src/arrow/schema/mod.rs
@@ -45,7 +45,8 @@ mod primitive;
 use crate::arrow::ProjectionMask;
 pub(crate) use complex::{ParquetField, ParquetFieldType};
 
-/// Convert Parquet schema to Arrow schema including optional metadata.
+/// Convert Parquet schema to Arrow schema including optional metadata
+///
 /// Attempts to decode any existing Arrow schema metadata, falling back
 /// to converting the Parquet schema column-wise
 pub fn parquet_to_arrow_schema(
@@ -66,11 +67,11 @@ pub fn parquet_to_arrow_schema_by_columns(
     mask: ProjectionMask,
     key_value_metadata: Option<&Vec<KeyValue>>,
 ) -> Result<Schema> {
-    Ok(parquet_to_array_schema_and_fields(parquet_schema, mask, 
key_value_metadata)?.0)
+    Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, 
key_value_metadata)?.0)
 }
 
 /// Extracts the arrow metadata
-pub(crate) fn parquet_to_array_schema_and_fields(
+pub(crate) fn parquet_to_arrow_schema_and_fields(
     parquet_schema: &SchemaDescriptor,
     mask: ProjectionMask,
     key_value_metadata: Option<&Vec<KeyValue>>,
@@ -88,15 +89,56 @@ pub(crate) fn parquet_to_array_schema_and_fields(
         });
     }
 
-    match complex::convert_schema(parquet_schema, mask, 
maybe_schema.as_ref())? {
+    let hint = maybe_schema.as_ref().map(|s| s.fields());
+    let field_levels = parquet_to_arrow_field_levels(parquet_schema, mask, 
hint)?;
+    let schema = Schema::new_with_metadata(field_levels.fields, metadata);
+    Ok((schema, field_levels.levels))
+}
+
+/// Schema information necessary to decode a parquet file as arrow [`Fields`]
+///
+/// In particular this stores the dremel-level information necessary to 
correctly
+/// interpret the encoded definition and repetition levels
+///
+/// Note: this is an opaque container intended to be used with lower-level APIs
+/// within this crate
+#[derive(Debug, Clone)]
+pub struct FieldLevels {
+    pub(crate) fields: Fields,
+    pub(crate) levels: Option<ParquetField>,
+}
+
+/// Convert a parquet [`SchemaDescriptor`] to [`FieldLevels`]
+///
+/// Columns not included within [`ProjectionMask`] will be ignored.
+///
+/// Where a field type in `hint` is compatible with the corresponding parquet 
type in `schema`, it
+/// will be used, otherwise the default arrow type for the given parquet 
column type will be used.
+///
+/// This is to accommodate arrow types that cannot be round-tripped through 
parquet natively.
+/// Depending on the parquet writer, this can lead to a mismatch between a 
file's parquet schema
+/// and its embedded arrow schema. The parquet `schema` must be treated as 
authoritative in such
+/// an event. See [#1663](https://github.com/apache/arrow-rs/issues/1663) for 
more information
+///
+/// Note: this is a low-level API, most users will want to make use of the 
higher-level
+/// [`parquet_to_arrow_schema`] for decoding metadata from a parquet file.
+pub fn parquet_to_arrow_field_levels(
+    schema: &SchemaDescriptor,
+    mask: ProjectionMask,
+    hint: Option<&Fields>,
+) -> Result<FieldLevels> {
+    match complex::convert_schema(schema, mask, hint)? {
         Some(field) => match &field.arrow_type {
-            DataType::Struct(fields) => Ok((
-                Schema::new_with_metadata(fields.clone(), metadata),
-                Some(field),
-            )),
+            DataType::Struct(fields) => Ok(FieldLevels {
+                fields: fields.clone(),
+                levels: Some(field),
+            }),
             _ => unreachable!(),
         },
-        None => Ok((Schema::new_with_metadata(Fields::empty(), metadata), 
None)),
+        None => Ok(FieldLevels {
+            fields: Fields::empty(),
+            levels: None,
+        }),
     }
 }
 

Reply via email to