This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 7fe01bb432c Allow overriding the inferred parquet scheme root (#5814)
7fe01bb432c is described below

commit 7fe01bb432c0d190312158b706bda5971ca7d883
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Wed May 29 12:17:38 2024 +0100

    Allow overriding the inferred parquet scheme root (#5814)
---
 parquet/src/arrow/arrow_writer/mod.rs | 19 +++++++++++++++++--
 parquet/src/arrow/schema/mod.rs       | 12 +++++++++---
 parquet/src/bin/parquet-fromcsv.rs    | 26 +++++++++++++++-----------
 3 files changed, 41 insertions(+), 16 deletions(-)

diff --git a/parquet/src/arrow/arrow_writer/mod.rs 
b/parquet/src/arrow/arrow_writer/mod.rs
index 60feda69e84..fd3f9591718 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -31,7 +31,8 @@ use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
 use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, 
SchemaRef};
 
 use super::schema::{
-    add_encoded_arrow_schema_to_metadata, arrow_to_parquet_schema, 
decimal_length_from_precision,
+    add_encoded_arrow_schema_to_metadata, arrow_to_parquet_schema,
+    arrow_to_parquet_schema_with_root, decimal_length_from_precision,
 };
 
 use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
@@ -160,7 +161,10 @@ impl<W: Write + Send> ArrowWriter<W> {
         arrow_schema: SchemaRef,
         options: ArrowWriterOptions,
     ) -> Result<Self> {
-        let schema = arrow_to_parquet_schema(&arrow_schema)?;
+        let schema = match options.schema_root {
+            Some(s) => arrow_to_parquet_schema_with_root(&arrow_schema, &s)?,
+            None => arrow_to_parquet_schema(&arrow_schema)?,
+        };
         let mut props = options.properties;
         if !options.skip_arrow_metadata {
             // add serialized arrow schema
@@ -323,6 +327,7 @@ impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
 pub struct ArrowWriterOptions {
     properties: WriterProperties,
     skip_arrow_metadata: bool,
+    schema_root: Option<String>,
 }
 
 impl ArrowWriterOptions {
@@ -346,6 +351,16 @@ impl ArrowWriterOptions {
             ..self
         }
     }
+
+    /// Overrides the name of the root parquet schema element
+    ///
+    /// Defaults to `"arrow_schema"`
+    pub fn with_schema_root(self, name: String) -> Self {
+        Self {
+            schema_root: Some(name),
+            ..self
+        }
+    }
 }
 
 /// A single column chunk produced by [`ArrowColumnWriter`]
diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs
index 37f368b203a..8c583eebac5 100644
--- a/parquet/src/arrow/schema/mod.rs
+++ b/parquet/src/arrow/schema/mod.rs
@@ -223,15 +223,21 @@ pub(crate) fn 
add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut
 }
 
 /// Convert arrow schema to parquet schema
+///
+/// The name of the root schema element defaults to `"arrow_schema"`, this can 
be
+/// overridden with [`arrow_to_parquet_schema_with_root`]
 pub fn arrow_to_parquet_schema(schema: &Schema) -> Result<SchemaDescriptor> {
+    arrow_to_parquet_schema_with_root(schema, "arrow_schema")
+}
+
+/// Convert arrow schema to parquet schema specifying the name of the root 
schema element
+pub fn arrow_to_parquet_schema_with_root(schema: &Schema, root: &str) -> 
Result<SchemaDescriptor> {
     let fields = schema
         .fields()
         .iter()
         .map(|field| arrow_to_parquet_type(field).map(Arc::new))
         .collect::<Result<_>>()?;
-    let group = Type::group_type_builder("arrow_schema")
-        .with_fields(fields)
-        .build()?;
+    let group = Type::group_type_builder(root).with_fields(fields).build()?;
     Ok(SchemaDescriptor::new(Arc::new(group)))
 }
 
diff --git a/parquet/src/bin/parquet-fromcsv.rs 
b/parquet/src/bin/parquet-fromcsv.rs
index 445409610a8..140a367675f 100644
--- a/parquet/src/bin/parquet-fromcsv.rs
+++ b/parquet/src/bin/parquet-fromcsv.rs
@@ -81,6 +81,7 @@ use std::{
 use arrow_csv::ReaderBuilder;
 use arrow_schema::{ArrowError, Schema};
 use clap::{Parser, ValueEnum};
+use parquet::arrow::arrow_writer::ArrowWriterOptions;
 use parquet::{
     arrow::{parquet_to_arrow_schema, ArrowWriter},
     basic::Compression,
@@ -333,13 +334,6 @@ fn configure_reader_builder(args: &Args, arrow_schema: 
Arc<Schema>) -> ReaderBui
     builder
 }
 
-fn arrow_schema_from_string(schema: &str) -> Result<Arc<Schema>, 
ParquetFromCsvError> {
-    let schema = Arc::new(parse_message_type(schema)?);
-    let desc = SchemaDescriptor::new(schema);
-    let arrow_schema = Arc::new(parquet_to_arrow_schema(&desc, None)?);
-    Ok(arrow_schema)
-}
-
 fn convert_csv_to_parquet(args: &Args) -> Result<(), ParquetFromCsvError> {
     let schema = read_to_string(args.schema_path()).map_err(|e| {
         ParquetFromCsvError::with_context(
@@ -347,7 +341,9 @@ fn convert_csv_to_parquet(args: &Args) -> Result<(), 
ParquetFromCsvError> {
             &format!("Failed to open schema file {:#?}", args.schema_path()),
         )
     })?;
-    let arrow_schema = arrow_schema_from_string(&schema)?;
+    let parquet_schema = Arc::new(parse_message_type(&schema)?);
+    let desc = SchemaDescriptor::new(parquet_schema);
+    let arrow_schema = Arc::new(parquet_to_arrow_schema(&desc, None)?);
 
     // create output parquet writer
     let parquet_file = File::create(&args.output_file).map_err(|e| {
@@ -357,9 +353,12 @@ fn convert_csv_to_parquet(args: &Args) -> Result<(), 
ParquetFromCsvError> {
         )
     })?;
 
-    let writer_properties = Some(configure_writer_properties(args));
+    let options = ArrowWriterOptions::new()
+        .with_properties(configure_writer_properties(args))
+        .with_schema_root(desc.name().to_string());
+
     let mut arrow_writer =
-        ArrowWriter::try_new(parquet_file, arrow_schema.clone(), 
writer_properties)
+        ArrowWriter::try_new_with_options(parquet_file, arrow_schema.clone(), 
options)
             .map_err(|e| ParquetFromCsvError::with_context(e, "Failed to 
create ArrowWriter"))?;
 
     // open input file
@@ -426,6 +425,7 @@ mod tests {
     use clap::{CommandFactory, Parser};
     use flate2::write::GzEncoder;
     use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel};
+    use parquet::file::reader::{FileReader, SerializedFileReader};
     use snap::write::FrameEncoder;
     use tempfile::NamedTempFile;
 
@@ -647,7 +647,7 @@ mod tests {
 
     fn test_convert_compressed_csv_to_parquet(csv_compression: Compression) {
         let schema = NamedTempFile::new().unwrap();
-        let schema_text = r"message schema {
+        let schema_text = r"message my_amazing_schema {
             optional int32 id;
             optional binary name (STRING);
         }";
@@ -728,6 +728,10 @@ mod tests {
             help: None,
         };
         convert_csv_to_parquet(&args).unwrap();
+
+        let file = 
SerializedFileReader::new(output_parquet.into_file()).unwrap();
+        let schema_name = file.metadata().file_metadata().schema().name();
+        assert_eq!(schema_name, "my_amazing_schema");
     }
 
     #[test]

Reply via email to