This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 7fe01bb432c Allow overriding the inferred parquet scheme root (#5814)
7fe01bb432c is described below
commit 7fe01bb432c0d190312158b706bda5971ca7d883
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Wed May 29 12:17:38 2024 +0100
Allow overriding the inferred parquet scheme root (#5814)
---
parquet/src/arrow/arrow_writer/mod.rs | 19 +++++++++++++++++--
parquet/src/arrow/schema/mod.rs | 12 +++++++++---
parquet/src/bin/parquet-fromcsv.rs | 26 +++++++++++++++-----------
3 files changed, 41 insertions(+), 16 deletions(-)
diff --git a/parquet/src/arrow/arrow_writer/mod.rs
b/parquet/src/arrow/arrow_writer/mod.rs
index 60feda69e84..fd3f9591718 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -31,7 +31,8 @@ use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit,
SchemaRef};
use super::schema::{
- add_encoded_arrow_schema_to_metadata, arrow_to_parquet_schema,
decimal_length_from_precision,
+ add_encoded_arrow_schema_to_metadata, arrow_to_parquet_schema,
+ arrow_to_parquet_schema_with_root, decimal_length_from_precision,
};
use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
@@ -160,7 +161,10 @@ impl<W: Write + Send> ArrowWriter<W> {
arrow_schema: SchemaRef,
options: ArrowWriterOptions,
) -> Result<Self> {
- let schema = arrow_to_parquet_schema(&arrow_schema)?;
+ let schema = match options.schema_root {
+ Some(s) => arrow_to_parquet_schema_with_root(&arrow_schema, &s)?,
+ None => arrow_to_parquet_schema(&arrow_schema)?,
+ };
let mut props = options.properties;
if !options.skip_arrow_metadata {
// add serialized arrow schema
@@ -323,6 +327,7 @@ impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
pub struct ArrowWriterOptions {
properties: WriterProperties,
skip_arrow_metadata: bool,
+ schema_root: Option<String>,
}
impl ArrowWriterOptions {
@@ -346,6 +351,16 @@ impl ArrowWriterOptions {
..self
}
}
+
+ /// Overrides the name of the root parquet schema element
+ ///
+ /// Defaults to `"arrow_schema"`
+ pub fn with_schema_root(self, name: String) -> Self {
+ Self {
+ schema_root: Some(name),
+ ..self
+ }
+ }
}
/// A single column chunk produced by [`ArrowColumnWriter`]
diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs
index 37f368b203a..8c583eebac5 100644
--- a/parquet/src/arrow/schema/mod.rs
+++ b/parquet/src/arrow/schema/mod.rs
@@ -223,15 +223,21 @@ pub(crate) fn
add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut
}
/// Convert arrow schema to parquet schema
+///
+/// The name of the root schema element defaults to `"arrow_schema"`, this can
be
+/// overridden with [`arrow_to_parquet_schema_with_root`]
pub fn arrow_to_parquet_schema(schema: &Schema) -> Result<SchemaDescriptor> {
+ arrow_to_parquet_schema_with_root(schema, "arrow_schema")
+}
+
+/// Convert arrow schema to parquet schema specifying the name of the root
schema element
+pub fn arrow_to_parquet_schema_with_root(schema: &Schema, root: &str) ->
Result<SchemaDescriptor> {
let fields = schema
.fields()
.iter()
.map(|field| arrow_to_parquet_type(field).map(Arc::new))
.collect::<Result<_>>()?;
- let group = Type::group_type_builder("arrow_schema")
- .with_fields(fields)
- .build()?;
+ let group = Type::group_type_builder(root).with_fields(fields).build()?;
Ok(SchemaDescriptor::new(Arc::new(group)))
}
diff --git a/parquet/src/bin/parquet-fromcsv.rs
b/parquet/src/bin/parquet-fromcsv.rs
index 445409610a8..140a367675f 100644
--- a/parquet/src/bin/parquet-fromcsv.rs
+++ b/parquet/src/bin/parquet-fromcsv.rs
@@ -81,6 +81,7 @@ use std::{
use arrow_csv::ReaderBuilder;
use arrow_schema::{ArrowError, Schema};
use clap::{Parser, ValueEnum};
+use parquet::arrow::arrow_writer::ArrowWriterOptions;
use parquet::{
arrow::{parquet_to_arrow_schema, ArrowWriter},
basic::Compression,
@@ -333,13 +334,6 @@ fn configure_reader_builder(args: &Args, arrow_schema:
Arc<Schema>) -> ReaderBui
builder
}
-fn arrow_schema_from_string(schema: &str) -> Result<Arc<Schema>,
ParquetFromCsvError> {
- let schema = Arc::new(parse_message_type(schema)?);
- let desc = SchemaDescriptor::new(schema);
- let arrow_schema = Arc::new(parquet_to_arrow_schema(&desc, None)?);
- Ok(arrow_schema)
-}
-
fn convert_csv_to_parquet(args: &Args) -> Result<(), ParquetFromCsvError> {
let schema = read_to_string(args.schema_path()).map_err(|e| {
ParquetFromCsvError::with_context(
@@ -347,7 +341,9 @@ fn convert_csv_to_parquet(args: &Args) -> Result<(),
ParquetFromCsvError> {
&format!("Failed to open schema file {:#?}", args.schema_path()),
)
})?;
- let arrow_schema = arrow_schema_from_string(&schema)?;
+ let parquet_schema = Arc::new(parse_message_type(&schema)?);
+ let desc = SchemaDescriptor::new(parquet_schema);
+ let arrow_schema = Arc::new(parquet_to_arrow_schema(&desc, None)?);
// create output parquet writer
let parquet_file = File::create(&args.output_file).map_err(|e| {
@@ -357,9 +353,12 @@ fn convert_csv_to_parquet(args: &Args) -> Result<(),
ParquetFromCsvError> {
)
})?;
- let writer_properties = Some(configure_writer_properties(args));
+ let options = ArrowWriterOptions::new()
+ .with_properties(configure_writer_properties(args))
+ .with_schema_root(desc.name().to_string());
+
let mut arrow_writer =
- ArrowWriter::try_new(parquet_file, arrow_schema.clone(),
writer_properties)
+ ArrowWriter::try_new_with_options(parquet_file, arrow_schema.clone(),
options)
.map_err(|e| ParquetFromCsvError::with_context(e, "Failed to
create ArrowWriter"))?;
// open input file
@@ -426,6 +425,7 @@ mod tests {
use clap::{CommandFactory, Parser};
use flate2::write::GzEncoder;
use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel};
+ use parquet::file::reader::{FileReader, SerializedFileReader};
use snap::write::FrameEncoder;
use tempfile::NamedTempFile;
@@ -647,7 +647,7 @@ mod tests {
fn test_convert_compressed_csv_to_parquet(csv_compression: Compression) {
let schema = NamedTempFile::new().unwrap();
- let schema_text = r"message schema {
+ let schema_text = r"message my_amazing_schema {
optional int32 id;
optional binary name (STRING);
}";
@@ -728,6 +728,10 @@ mod tests {
help: None,
};
convert_csv_to_parquet(&args).unwrap();
+
+ let file =
SerializedFileReader::new(output_parquet.into_file()).unwrap();
+ let schema_name = file.metadata().file_metadata().schema().name();
+ assert_eq!(schema_name, "my_amazing_schema");
}
#[test]