alamb commented on code in PR #7390: URL: https://github.com/apache/arrow-datafusion/pull/7390#discussion_r1304631144
########## datafusion/common/src/file_options/csv_writer.rs: ########## @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Options related to how csv files should be written + +use std::str::FromStr; + +use arrow::csv::WriterBuilder; + +use crate::{ + config::ConfigOptions, + error::{DataFusionError, Result}, + parsers::CompressionTypeVariant, +}; + +use super::StatementOptions; + +/// Options for writing CSV files +#[derive(Clone, Debug)] +pub struct CsvWriterOptions { + /// Struct from the arrow crate which contains all csv writing related settings + pub writer_options: WriterBuilder, + /// Compression to apply after ArrowWriter serializes RecordBatches. + /// This compression is applied by DataFusion not the ArrowWriter itself. + pub compression: CompressionTypeVariant, + /// Indicates weather WriterBuilder.has_header() is set to true. + /// This is duplicative as WriterBuilder also stores this information. + /// However, WriterBuilder does not allow public read access to the + /// has_header parameter. + pub has_header: bool, + // TODO: expose a way to read has_header in arrow create +} + +impl TryFrom<(&ConfigOptions, &StatementOptions)> for CsvWriterOptions { + type Error = DataFusionError; + + fn try_from(value: (&ConfigOptions, &StatementOptions)) -> Result<Self> { Review Comment: this is cool ########## datafusion/common/src/file_options/arrow_writer.rs: ########## @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Options related to how json files should be written Review Comment: ```suggestion //! Options related to how Arrow files should be written ``` ########## datafusion/common/src/file_options/mod.rs: ########## @@ -0,0 +1,269 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Options related to how files should be written + +pub mod arrow_writer; +pub mod avro_writer; +pub mod csv_writer; +pub mod file_type; +pub mod json_writer; +pub mod parquet_writer; +pub(crate) mod parse_utils; + +use std::{collections::HashMap, path::Path, str::FromStr}; + +use crate::{ + config::ConfigOptions, file_options::parse_utils::parse_boolean_string, + DataFusionError, FileType, Result, +}; + +use self::{ + arrow_writer::ArrowWriterOptions, avro_writer::AvroWriterOptions, + csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions, + parquet_writer::ParquetWriterOptions, +}; + +/// Represents a single arbitrary setting in a +/// [StatementOptions] where OptionTuple.0 determines +/// the specific setting to be modified and OptionTuple.1 +/// determines the value which should be applied +pub type OptionTuple = (String, String); + +/// Represents arbitrary tuples of options passed as String +/// tuples from SQL statements. As in the following statement: +/// COPY ... TO ... (setting1 value1, setting2 value2, ...) +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub struct StatementOptions { + options: Vec<OptionTuple>, +} + +/// Useful for conversion from external tables which use Hashmap<String, String> +impl From<&HashMap<String, String>> for StatementOptions { + fn from(value: &HashMap<String, String>) -> Self { + Self { + options: value + .iter() + .map(|(k, v)| (k.to_owned(), v.to_owned())) + .collect::<Vec<OptionTuple>>(), + } + } +} + +impl StatementOptions { + pub fn new(options: Vec<OptionTuple>) -> Self { + Self { options } + } + + pub fn into_inner(self) -> Vec<OptionTuple> { + self.options + } + + /// Scans for option and if it exists removes it and attempts to parse as a boolean + /// Returns none if it does not exist. + pub fn get_bool_option(&mut self, find: &str) -> Result<Option<bool>> { Review Comment: I have seen similar APIs called `take_...` as a way to hint that they modify the underlying value. Thus perhaps we could call this and the other functions following that convention: ```suggestion pub fn take_bool_option(&mut self, find: &str) -> Result<Option<bool>> { ``` ########## datafusion/common/src/file_options/mod.rs: ########## @@ -0,0 +1,269 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Options related to how files should be written + +pub mod arrow_writer; +pub mod avro_writer; +pub mod csv_writer; +pub mod file_type; +pub mod json_writer; +pub mod parquet_writer; +pub(crate) mod parse_utils; + +use std::{collections::HashMap, path::Path, str::FromStr}; + +use crate::{ + config::ConfigOptions, file_options::parse_utils::parse_boolean_string, + DataFusionError, FileType, Result, +}; + +use self::{ + arrow_writer::ArrowWriterOptions, avro_writer::AvroWriterOptions, + csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions, + parquet_writer::ParquetWriterOptions, +}; + +/// Represents a single arbitrary setting in a +/// [StatementOptions] where OptionTuple.0 determines +/// the specific setting to be modified and OptionTuple.1 +/// determines the value which should be applied +pub type OptionTuple = (String, String); + +/// Represents arbitrary tuples of options passed as String +/// tuples from SQL statements. As in the following statement: +/// COPY ... TO ... (setting1 value1, setting2 value2, ...) +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub struct StatementOptions { + options: Vec<OptionTuple>, +} + +/// Useful for conversion from external tables which use Hashmap<String, String> +impl From<&HashMap<String, String>> for StatementOptions { + fn from(value: &HashMap<String, String>) -> Self { + Self { + options: value + .iter() + .map(|(k, v)| (k.to_owned(), v.to_owned())) + .collect::<Vec<OptionTuple>>(), + } + } +} + +impl StatementOptions { + pub fn new(options: Vec<OptionTuple>) -> Self { + Self { options } + } + + pub fn into_inner(self) -> Vec<OptionTuple> { + self.options + } + + /// Scans for option and if it exists removes it and attempts to parse as a boolean + /// Returns none if it does not exist. + pub fn get_bool_option(&mut self, find: &str) -> Result<Option<bool>> { + let maybe_option = self.scan_and_remove_option(find); + maybe_option + .map(|(_, v)| parse_boolean_string(find, v)) + .transpose() + } + + /// Scans for option and if it exists removes it and returns it + /// Returns none if it does not exist + pub fn get_str_option(&mut self, find: &str) -> Option<String> { + let maybe_option = self.scan_and_remove_option(find); + maybe_option.map(|(_, v)| v) + } + + /// Infers the file_type given a target and arbitrary options. + /// If the options contain an explicit "format" option, that will be used. + /// Otherwise, attempt to infer file_type from the extension of target. + /// Finally, return an error if unable to determine the file_type + /// If found, format is removed from the options list. + pub fn try_infer_file_type(&mut self, target: &str) -> Result<FileType> { + let explicit_format = self.scan_and_remove_option("format"); + let format = match explicit_format { + Some(s) => FileType::from_str(s.1.as_str()), + None => { + // try to infer file format from file extension + let extension: &str = &Path::new(target) + .extension() + .ok_or(DataFusionError::InvalidOption( + "Format not explicitly set and unable to get file extension!" + .to_string(), + ))? + .to_str() + .ok_or(DataFusionError::InvalidOption( + "Format not explicitly set and failed to parse file extension!" + .to_string(), + ))? + .to_lowercase(); + + FileType::from_str(extension) + } + }?; + + Ok(format) + } + + /// Finds an option in StatementOptions if exists, removes and returns it + /// along with the vec of remaining options. + fn scan_and_remove_option(&mut self, find: &str) -> Option<OptionTuple> { + let idx = self + .options + .iter() + .position(|(k, _)| k.to_lowercase() == find.to_lowercase()); + match idx { + Some(i) => Some(self.options.swap_remove(i)), + None => None, + } + } +} + +/// This type contains all options needed to initialize a particular +/// RecordBatchWriter type. Each element in the enum contains a thin wrapper +/// around a "writer builder" type (e.g. arrow::csv::WriterBuilder) +/// plus any DataFusion specific writing options (e.g. CSV compression) +#[derive(Clone, Debug)] +pub enum FileTypeWriterOptions { + Parquet(ParquetWriterOptions), + CSV(CsvWriterOptions), + JSON(JsonWriterOptions), + Avro(AvroWriterOptions), + Arrow(ArrowWriterOptions), +} + +impl FileTypeWriterOptions { + /// Constructs a FileTypeWriterOptions given a FileType to be written + /// and arbitrary String tuple options. May return an error if any + /// string setting is unrecognized or unsupported. + pub fn build( + file_type: &FileType, + config_defaults: &ConfigOptions, + statement_options: &StatementOptions, + ) -> Result<Self> { + let options = (config_defaults, statement_options); + + let file_type_write_options = match file_type { + FileType::PARQUET => { + FileTypeWriterOptions::Parquet(ParquetWriterOptions::try_from(options)?) + } + FileType::CSV => { + FileTypeWriterOptions::CSV(CsvWriterOptions::try_from(options)?) + } + FileType::JSON => { + FileTypeWriterOptions::JSON(JsonWriterOptions::try_from(options)?) + } + FileType::AVRO => { + FileTypeWriterOptions::Avro(AvroWriterOptions::try_from(options)?) + } + FileType::ARROW => { + FileTypeWriterOptions::Arrow(ArrowWriterOptions::try_from(options)?) + } + }; + + Ok(file_type_write_options) + } + + /// Constructs a FileTypeWriterOptions from session defaults only. + pub fn build_default( + file_type: &FileType, + config_defaults: &ConfigOptions, + ) -> Result<Self> { + let empty_statement = StatementOptions::new(vec![]); + let options = (config_defaults, &empty_statement); + + let file_type_write_options = match file_type { + FileType::PARQUET => { + FileTypeWriterOptions::Parquet(ParquetWriterOptions::try_from(options)?) + } + FileType::CSV => { + FileTypeWriterOptions::CSV(CsvWriterOptions::try_from(options)?) + } + FileType::JSON => { + FileTypeWriterOptions::JSON(JsonWriterOptions::try_from(options)?) + } + FileType::AVRO => { + FileTypeWriterOptions::Avro(AvroWriterOptions::try_from(options)?) + } + FileType::ARROW => { + FileTypeWriterOptions::Arrow(ArrowWriterOptions::try_from(options)?) + } + }; + + Ok(file_type_write_options) + } + + /// Tries to extract ParquetWriterOptions from this FileTypeWriterOptions enum. + /// Returns an error if a different type from parquet is set. + pub fn try_into_parquet(&self) -> Result<&ParquetWriterOptions> { + match self { + FileTypeWriterOptions::Parquet(opt) => Ok(opt), + _ => Err(DataFusionError::Internal( + "Expected parquet options but found options for a different FileType!" Review Comment: I think it would help usablity if these errors contained the name of the other type. Also should these be `DataFusionError::InvalidOption`? `DataFusionError::Internal` is supposed to signal a bug ```suggestion "Expected parquet options but found options for: {}", self.name() ``` The same comment applies to the other `try_into*` functions as well ########## datafusion/common/src/file_options/parse_utils.rs: ########## @@ -0,0 +1,183 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Functions for parsing arbitrary passed strings to valid file_option settings + +use parquet::{ + basic::{BrotliLevel, GzipLevel, ZstdLevel}, + file::properties::{EnabledStatistics, WriterVersion}, +}; + +use crate::{DataFusionError, Result}; + +/// Converts a String option to a bool, or returns an error if not a valid bool string. +pub(crate) fn parse_boolean_string(option: &str, value: String) -> Result<bool> { + match value.to_lowercase().as_str() { + "true" => Ok(true), + "false" => Ok(false), + _ => Err(DataFusionError::InvalidOption(format!( + "Unsupported value {value} for option {option}! \ + Valid values are true or false!" + ))), + } +} + +/// Parses datafusion.execution.parquet.encoding String to a parquet::basic::Encoding +pub(crate) fn parse_encoding_string( Review Comment: It would be great to move this stuff upstream into a `FromStr` implementation in the parquet crate. I can perhaps file a ticket to do so. ########## datafusion/core/src/datasource/physical_plan/mod.rs: ########## @@ -94,12 +93,14 @@ pub struct FileSinkConfig { pub table_partition_cols: Vec<(String, DataType)>, /// A writer mode that determines how data is written to the file pub writer_mode: FileWriterMode, - /// If false, it is assumed there is a single table_path which is a file to which all data should be written + /// If true, it is assumed there is a single table_path which is a file to which all data should be written /// regardless of input partitioning. Otherwise, each table path is assumed to be a directory /// to which each output partition is written to its own output file. - pub per_thread_output: bool, + pub single_file_output: bool, Review Comment: I agree the new name is much nicer ########## datafusion/common/src/error.rs: ########## @@ -73,6 +73,9 @@ pub enum DataFusionError { /// This error happens whenever a plan is not valid. Examples include /// impossible casts. Plan(String), + /// This error happens when an invalid or unsupported option is passed + /// in a SQL statement + InvalidOption(String), Review Comment: I agree none of the existing errors really fits. What about calling this `Configuration` as I think it could be used more generally when some configuration failed validation (not just setting options via SQL)? ########## datafusion/common/src/file_options/csv_writer.rs: ########## @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Options related to how csv files should be written + +use std::str::FromStr; + +use arrow::csv::WriterBuilder; + +use crate::{ + config::ConfigOptions, + error::{DataFusionError, Result}, + parsers::CompressionTypeVariant, +}; + +use super::StatementOptions; + +/// Options for writing CSV files +#[derive(Clone, Debug)] +pub struct CsvWriterOptions { + /// Struct from the arrow crate which contains all csv writing related settings + pub writer_options: WriterBuilder, + /// Compression to apply after ArrowWriter serializes RecordBatches. + /// This compression is applied by DataFusion not the ArrowWriter itself. + pub compression: CompressionTypeVariant, + /// Indicates weather WriterBuilder.has_header() is set to true. Review Comment: ```suggestion /// Indicates whether WriterBuilder.has_header() is set to true. ``` ########## datafusion/common/src/file_options/csv_writer.rs: ########## @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Options related to how csv files should be written + +use std::str::FromStr; + +use arrow::csv::WriterBuilder; + +use crate::{ + config::ConfigOptions, + error::{DataFusionError, Result}, + parsers::CompressionTypeVariant, +}; + +use super::StatementOptions; + +/// Options for writing CSV files +#[derive(Clone, Debug)] +pub struct CsvWriterOptions { + /// Struct from the arrow crate which contains all csv writing related settings + pub writer_options: WriterBuilder, + /// Compression to apply after ArrowWriter serializes RecordBatches. + /// This compression is applied by DataFusion not the ArrowWriter itself. + pub compression: CompressionTypeVariant, + /// Indicates weather WriterBuilder.has_header() is set to true. + /// This is duplicative as WriterBuilder also stores this information. + /// However, WriterBuilder does not allow public read access to the + /// has_header parameter. + pub has_header: bool, + // TODO: expose a way to read has_header in arrow create Review Comment: I filed https://github.com/apache/arrow-rs/issues/4735 ```suggestion // TODO: expose a way to read has_header in arrow create // https://github.com/apache/arrow-rs/issues/4735 ``` ########## datafusion/core/src/datasource/physical_plan/csv.rs: ########## @@ -1191,6 +1191,8 @@ mod tests { Field::new("c2", DataType::UInt64, false), ])); + println!("{out_dir}"); Review Comment: I think this might be left over? Since it is part of a test I think it is fine to leave in, just thought I would point it out ########## datafusion/common/src/file_options/parquet_writer.rs: ########## @@ -0,0 +1,163 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Options related to how parquet files should be written + +use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; + +use crate::{ + config::ConfigOptions, + file_options::parse_utils::{ + parse_compression_string, parse_encoding_string, parse_statistics_string, + parse_version_string, + }, + DataFusionError, Result, +}; + +use super::StatementOptions; + +/// Options for writing parquet files +#[derive(Clone, Debug)] +pub struct ParquetWriterOptions { + pub writer_options: WriterProperties, +} + +impl ParquetWriterOptions { + pub fn writer_options(&self) -> &WriterProperties { + &self.writer_options + } +} + +/// Constructs a default Parquet WriterPropertiesBuilder using +/// Session level ConfigOptions to initialize settings +fn default_builder(options: &ConfigOptions) -> Result<WriterPropertiesBuilder> { + let parquet_context = &options.execution.parquet; Review Comment: nit: calling this `parquet_options` might be clearer -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
