tustvold commented on code in PR #1798: URL: https://github.com/apache/arrow-rs/pull/1798#discussion_r893126800
########## parquet/src/bin/parquet-fromcsv.rs: ########## @@ -0,0 +1,517 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Binary file to converts csv to Parquet file +//! +//! # Install +//! +//! `parquet-fromcsv` can be installed using `cargo`: +//! +//! ```text +//! cargo install parquet --features=cli +//! ``` +//! +//! After this `parquet-fromcsv` shoud be available: +//! +//! ```text +//! parquet-fromcsv --schema message_schema_for_parquet.txt input.csv output.parquet +//! ``` +//! +//! The binary can also be built from the source code and run as follows: +//! +//! ```text +//! cargo run --features=cli --bin parquet-fromcsv --schema message_schema_for_parquet.txt \ +//! \ input.csv output.parquet +//! ``` +//! +//! # Options +//! +//! ```text +#![doc = include_str!("./parquet-fromcsv-help.txt")] // Update for this file : Run test test_command_help +//! ``` +//! +//! ## Parquet file options +//! +//! - `-b`, `--batch-size` : Batch size for Parquet +//! - `-c`, `--parquet-compression` : Compression option for Parquet, default is SNAPPY +//! - `-s`, `--schema` : path to message schema for generated Parquet file +//! - `-o`, `--output-file` : path to output parquet file +//! +//! ## Input file options +//! +//! - `-i`, `--input-file` : path to input CSV file +//! - `-f`, `--input-format` : dialect for input file, `csv` or `tsv`. +//! - `-d`, `--delimiter : Field delimitor for CSV file, default depends `--input-format` +//! - `-e`, `--escape` : Escape charactor for input file +//! - `-h`, `--has-header` : input has header +//! - `-r`, `--record-terminator` : record terminator charactor for input. default is CRLF +//! - `-q`, `--quote-char` : input quoting charactor +//! + +use std::{ + fs::{read_to_string, File}, + path::{Path, PathBuf}, + sync::Arc, +}; + +use anyhow::{bail, Context, Error, Result}; +use arrow::{csv::ReaderBuilder, datatypes::Schema}; +use clap::{ArgEnum, Parser}; +use parquet::{ + arrow::{parquet_to_arrow_schema, ArrowWriter}, + basic::Compression, + file::properties::WriterProperties, + schema::{parser::parse_message_type, types::SchemaDescriptor}, +}; + +#[derive(Debug, Parser)] +#[clap(author, version, about("Binary to convert csv to Parquet"), long_about=None)] +struct Args { + /// Path to a text file containing a parquet schema definition + #[clap(short, long, help("message schema for output Parquet"))] + schema: PathBuf, + /// input CSV file path + #[clap(short, long, help("input CSV file"))] + input_file: PathBuf, + /// output Parquet file path + #[clap(short, long, help("output Parquet file"))] + output_file: PathBuf, + /// input file format + #[clap( + arg_enum, + short('f'), + long, + help("input file format"), + default_value_t=CsvDialect::CSV + )] + input_format: CsvDialect, + /// batch size + #[clap( + short, + long, + help("batch size"), + default_value_t = 1000, + env = "PARQUET_FROM_CSV_BATCHSIZE" + )] + batch_size: usize, + /// has header line + #[clap(short, long, help("has header"))] + has_header: bool, + /// field delimiter + /// + /// default value: + /// when input_format==CSV: ',' + /// when input_format==TSV: 'TAB' + #[clap(short, long, help("field delimiter"))] + delimiter: Option<char>, + #[clap(arg_enum, short, long, help("record terminator"))] + record_terminator: Option<RecordTerminator>, + #[clap(short, long, help("escape charactor"))] + escape_char: Option<char>, + #[clap(short, long, help("quate charactor"))] + quote_char: Option<char>, + #[clap(short('D'), long, help("double quote"))] + double_quote: Option<bool>, + #[clap(short('c'), long, help("compression mode"), default_value_t=Compression::SNAPPY)] + #[clap(parse(try_from_str =compression_from_str))] + parquet_compression: Compression, +} + +fn compression_from_str(cmp: &str) -> Result<Compression, Error> { + match cmp.to_uppercase().as_str() { + "UNCOMPRESSED" => Ok(Compression::UNCOMPRESSED), + "SNAPPY" => Ok(Compression::SNAPPY), + "GZIP" => Ok(Compression::GZIP), + "LZO" => Ok(Compression::LZO), + "BROTLI" => Ok(Compression::BROTLI), + "LZ4" => Ok(Compression::LZ4), + "ZSTD" => Ok(Compression::ZSTD), + v => bail!("Unknown compression {0} : possible values UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD ", v), + } +} + +impl Args { + fn schema_path(&self) -> &Path { + self.schema.as_path() + } + fn get_delimiter(&self) -> u8 { + match self.delimiter { + Some(ch) => ch as u8, + None => match self.input_format { + CsvDialect::CSV => b',', + CsvDialect::TSV => b'\t', + }, + } + } + fn get_terminator(&self) -> Option<u8> { + match self.record_terminator { + Some(RecordTerminator::LF) => Some(0x0a), + Some(RecordTerminator::CR) => Some(0x0d), + Some(RecordTerminator::CRLF) => None, + None => match self.input_format { + CsvDialect::CSV => None, + CsvDialect::TSV => Some(0x0a), + }, + } + } + fn get_escape(&self) -> Option<u8> { + self.escape_char.map(|ch| ch as u8) + } + fn get_quote(&self) -> Option<u8> { + if self.quote_char.is_none() { + match self.input_format { + CsvDialect::CSV => Some(b'\"'), + CsvDialect::TSV => None, + } + } else { + self.quote_char.map(|c| c as u8) + } + } +} + +#[derive(Debug, Clone, Copy, ArgEnum, PartialEq)] +enum CsvDialect { + CSV, + TSV, +} + +#[derive(Debug, Clone, Copy, ArgEnum, PartialEq)] +enum RecordTerminator { + LF, + CRLF, + CR, +} + +fn configure_writer_properties(compression: Compression) -> WriterProperties { + let properties_builder = WriterProperties::builder().set_compression(compression); + properties_builder.build() +} + +fn configure_reader_builder(args: &Args, arrow_schema: Arc<Schema>) -> ReaderBuilder { + fn configure_reader<T, F: Fn(ReaderBuilder, T) -> ReaderBuilder>( + builder: ReaderBuilder, + value: Option<T>, + fun: F, + ) -> ReaderBuilder { + if let Some(val) = value { + fun(builder, val) + } else { + builder + } + } + + let mut builder = ReaderBuilder::new() + .with_schema(arrow_schema) + .with_batch_size(args.batch_size) + .has_header(args.has_header) + .with_delimiter(args.get_delimiter()); + + builder = configure_reader( + builder, + args.get_terminator(), + ReaderBuilder::with_terminator, + ); + builder = configure_reader(builder, args.get_escape(), ReaderBuilder::with_escape); + builder = configure_reader(builder, args.get_quote(), ReaderBuilder::with_quote); + + builder +} + +fn arrow_schema_from_string(schema: &str) -> Result<Arc<Schema>> { + let schema = Arc::new(parse_message_type(&schema)?); + let desc = SchemaDescriptor::new(schema); + let arrow_schema = Arc::new(parquet_to_arrow_schema(&desc, None)?); + Ok(arrow_schema) +} + +fn convert_csv_to_parquet(args: &Args) -> Result<()> { + let schema = read_to_string(args.schema_path()).with_context(|| { + format!("Failed to open schema file {:#?}", args.schema_path()) + })?; + let arrow_schema = arrow_schema_from_string(&schema)?; + + // create output parquet writer + let parquet_file = File::create(&args.output_file).context(format!( + "Failed to create output file {:#?}", + &args.output_file + ))?; + + let writer_properties = Some(configure_writer_properties(args.parquet_compression)); + let mut arrow_writer = + ArrowWriter::try_new(parquet_file, arrow_schema.clone(), writer_properties) + .context("Failed to create ArrowWriter")?; + + // open input file + let input_file = File::open(&args.input_file) + .with_context(|| format!("Failed to open input file {:#?}", &args.input_file))?; + // create input csv reader + let builder = configure_reader_builder(&args, arrow_schema); + let reader = builder.build(input_file)?; + for batch_result in reader { + let batch = batch_result.context("Failed to read RecordBatch from CSV")?; + arrow_writer + .write(&batch) + .context("Failed to write RecordBatch to parquet")?; + } + arrow_writer.close().context("Failed to close parquet")?; + Ok(()) +} + +fn main() -> Result<()> { + let args = Args::parse(); + convert_csv_to_parquet(&args) +} + +#[cfg(test)] +mod tests { + use std::{ + io::{Seek, SeekFrom, Write}, + path::{Path, PathBuf}, + }; + + use super::*; + use anyhow::Result; + use arrow::datatypes::{DataType, Field}; + use clap::{CommandFactory, Parser}; + use tempfile::NamedTempFile; + + #[test] + fn test_command_help() { + let mut cmd = Args::command(); + let dir = std::env::var("CARGO_MANIFEST_DIR").unwrap(); + let mut path_buf = PathBuf::from(dir); + path_buf.push("src"); + path_buf.push("bin"); + path_buf.push("parquet-fromcsv-help.txt"); + let mut help_file = File::create(path_buf).unwrap(); + cmd.write_long_help(&mut help_file); Review Comment: Why not just assert the contents, if it fails it will automatically print what it actually was which can be copied into the file? We use this approach for SQL tests in DataFusion and it works well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org