alamb commented on code in PR #9382:
URL: https://github.com/apache/arrow-datafusion/pull/9382#discussion_r1521134769
##########
datafusion-cli/src/exec.rs:
##########
@@ -258,76 +256,123 @@ async fn create_plan(
// Note that cmd is a mutable reference so that create_external_table
function can remove all
// datafusion-cli specific options before passing through to datafusion.
Otherwise, datafusion
// will raise Configuration errors.
- if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut
plan {
- create_external_table(ctx, cmd).await?;
+ if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
+ register_object_store_and_config_extensions(ctx, &cmd.location,
&cmd.options)
+ .await?;
}
if let LogicalPlan::Copy(copy_to) = &mut plan {
- register_object_store(ctx, copy_to).await?;
+ register_object_store_and_config_extensions(
+ ctx,
+ ©_to.output_url,
+ ©_to.options,
+ )
+ .await?;
}
Ok(plan)
}
-async fn register_object_store(
- ctx: &SessionContext,
- copy_to: &mut CopyTo,
-) -> Result<(), DataFusionError> {
- let url = ListingTableUrl::parse(copy_to.output_url.as_str())?;
- let store = get_object_store(
- &ctx.state(),
- &mut HashMap::new(),
- url.scheme(),
- url.as_ref(),
- )
- .await?;
- ctx.runtime_env().register_object_store(url.as_ref(), store);
- Ok(())
-}
-
-async fn create_external_table(
+/// Asynchronously registers an object store and its configuration extensions
Review Comment:
❤️
##########
datafusion/common/src/file_options/csv_writer.rs:
##########
@@ -51,58 +45,32 @@ impl CsvWriterOptions {
}
}
-impl TryFrom<(&ConfigOptions, &StatementOptions)> for CsvWriterOptions {
+impl TryFrom<&CsvOptions> for CsvWriterOptions {
type Error = DataFusionError;
- fn try_from(value: (&ConfigOptions, &StatementOptions)) -> Result<Self> {
- let _configs = value.0;
- let statement_options = value.1;
- let mut builder = WriterBuilder::default();
- let mut compression = CompressionTypeVariant::UNCOMPRESSED;
- for (option, value) in &statement_options.options {
- builder = match option.to_lowercase().as_str(){
- "header" => {
- let has_header = value.parse()
- .map_err(|_|
DataFusionError::Configuration(format!("Unable to parse {value} as bool as
required for {option}!")))?;
- builder.with_header(has_header)
- },
- "date_format" => builder.with_date_format(value.to_owned()),
- "datetime_format" =>
builder.with_datetime_format(value.to_owned()),
- "timestamp_format" =>
builder.with_timestamp_format(value.to_owned()),
- "time_format" => builder.with_time_format(value.to_owned()),
- "rfc3339" => builder, // No-op
- "null_value" => builder.with_null(value.to_owned()),
- "compression" => {
- compression =
CompressionTypeVariant::from_str(value.replace('\'', "").as_str())?;
- builder
- },
- "delimiter" => {
- // Ignore string literal single quotes passed from sql
parsing
- let value = value.replace('\'', "");
- let chars: Vec<char> = value.chars().collect();
- if chars.len()>1{
- return Err(DataFusionError::Configuration(format!(
- "CSV Delimiter Option must be a single char, got:
{}", value
- )))
- }
- builder.with_delimiter(chars[0].try_into().map_err(|_| {
- DataFusionError::Internal(
- "Unable to convert CSV delimiter into u8".into(),
- )
- })?)
- },
- "quote" | "escape" => {
- // https://github.com/apache/arrow-rs/issues/5146
- // These two attributes are only available when reading
csv files.
- // To avoid error
- builder
- },
- _ => return Err(DataFusionError::Configuration(format!("Found
unsupported option {option} with value {value} for CSV format!")))
- }
+ fn try_from(value: &CsvOptions) -> Result<Self> {
+ let mut builder = WriterBuilder::default()
+ .with_header(value.has_header)
+ .with_delimiter(value.delimiter);
+
+ if let Some(v) = &value.date_format {
Review Comment:
This is a very nice example of how the new programatic API improves things
##########
datafusion-cli/src/exec.rs:
##########
@@ -396,12 +455,12 @@ mod tests {
// Missing region, use object_store defaults
let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET
- OPTIONS('access_key_id' '{access_key_id}', 'secret_access_key'
'{secret_access_key}') LOCATION '{location}'");
+ OPTIONS('aws.access_key_id' '{access_key_id}',
'aws.secret_access_key' '{secret_access_key}') LOCATION '{location}'");
Review Comment:
I think prefixing the configuration with cloud provider specific prefix
makes sense
##########
datafusion/sqllogictest/test_files/set_variable.slt:
##########
@@ -65,7 +65,7 @@ SHOW datafusion.execution.batch_size
datafusion.execution.batch_size 1
# set variable unknown variable
-statement error DataFusion error: External error: could not find config
namespace for key "aabbcc"
+statement error DataFusion error: Invalid or Unsupported Configuration: could
not find config namespace for key "aabbcc"
Review Comment:
👍
##########
datafusion/common/src/config.rs:
##########
@@ -1064,3 +1110,556 @@ macro_rules! extensions_options {
}
}
}
+
+#[derive(Debug, Clone, Default)]
+pub struct TableOptions {
+ pub csv: CsvOptions,
+ pub parquet: TableParquetOptions,
+ pub json: JsonOptions,
+ pub current_format: Option<FileType>,
+ /// Optional extensions registered using [`Extensions::insert`]
+ pub extensions: Extensions,
+}
+
+impl ConfigField for TableOptions {
+ fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description:
&'static str) {
+ self.csv.visit(v, "csv", "");
+ self.parquet.visit(v, "parquet", "");
+ self.json.visit(v, "json", "");
+ }
+
+ fn set(&mut self, key: &str, value: &str) -> Result<()> {
+ // Extensions are handled in the public `ConfigOptions::set`
+ let (key, rem) = key.split_once('.').unwrap_or((key, ""));
+ match key {
+ "csv" => self.csv.set(rem, value),
+ "parquet" => self.parquet.set(rem, value),
+ "json" => self.json.set(rem, value),
+ _ => _config_err!("Config value \"{key}\" not found on
TableOptions"),
+ }
+ }
+}
+
+impl TableOptions {
+ /// Creates a new [`ConfigOptions`] with default values
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ pub fn set_file_format(&mut self, format: FileType) {
+ self.current_format = Some(format);
+ }
+
+ pub fn default_from_session_config(config: &ConfigOptions) -> Self {
+ let mut initial = TableOptions::default();
+ initial.parquet.global = config.execution.parquet.clone();
+ initial
+ }
+
+ /// Set extensions to provided value
+ pub fn with_extensions(mut self, extensions: Extensions) -> Self {
+ self.extensions = extensions;
+ self
+ }
+
+ /// Set a configuration option
+ pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
+ let (prefix, _) = key.split_once('.').ok_or_else(|| {
+ DataFusionError::Configuration(format!(
+ "could not find config namespace for key \"{key}\""
+ ))
+ })?;
+
+ if prefix == "csv" || prefix == "json" || prefix == "parquet" {
+ if let Some(format) = &self.current_format {
+ match format {
+ FileType::CSV if prefix != "csv" => {
+ return Err(DataFusionError::Configuration(format!(
+ "Key \"{key}\" is not applicable for CSV format"
+ )))
+ }
+ #[cfg(feature = "parquet")]
+ FileType::PARQUET if prefix != "parquet" => {
+ return Err(DataFusionError::Configuration(format!(
+ "Key \"{key}\" is not applicable for PARQUET
format"
+ )))
+ }
+ FileType::JSON if prefix != "json" => {
+ return Err(DataFusionError::Configuration(format!(
+ "Key \"{key}\" is not applicable for JSON format"
+ )))
+ }
+ _ => {}
+ }
+ }
+ return ConfigField::set(self, key, value);
+ }
+
+ let e = self.extensions.0.get_mut(prefix);
+ let e = e.ok_or_else(|| {
+ DataFusionError::Configuration(format!(
+ "Could not find config namespace \"{prefix}\""
+ ))
+ })?;
+ e.0.set(key, value)
+ }
+
+ pub fn from_string_hash_map(settings: &HashMap<String, String>) ->
Result<Self> {
+ let mut ret = Self::default();
+ for (k, v) in settings {
+ ret.set(k, v)?;
+ }
+
+ Ok(ret)
+ }
+
+ pub fn alter_with_string_hash_map(
+ &mut self,
+ settings: &HashMap<String, String>,
+ ) -> Result<()> {
+ for (k, v) in settings {
+ self.set(k, v)?;
+ }
+ Ok(())
+ }
+
+ /// Returns the [`ConfigEntry`] stored within this [`ConfigOptions`]
+ pub fn entries(&self) -> Vec<ConfigEntry> {
+ struct Visitor(Vec<ConfigEntry>);
+
+ impl Visit for Visitor {
+ fn some<V: Display>(
+ &mut self,
+ key: &str,
+ value: V,
+ description: &'static str,
+ ) {
+ self.0.push(ConfigEntry {
+ key: key.to_string(),
+ value: Some(value.to_string()),
+ description,
+ })
+ }
+
+ fn none(&mut self, key: &str, description: &'static str) {
+ self.0.push(ConfigEntry {
+ key: key.to_string(),
+ value: None,
+ description,
+ })
+ }
+ }
+
+ let mut v = Visitor(vec![]);
+ self.visit(&mut v, "csv", "");
+ self.visit(&mut v, "json", "");
+ self.visit(&mut v, "parquet", "");
+
+ v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
+ v.0
+ }
+}
+
+#[derive(Clone, Default, Debug, PartialEq)]
+pub struct TableParquetOptions {
+ /// Global Parquet options that propagates to all columns.
+ pub global: ParquetOptions,
+ /// Column specific options. Default usage is parquet.XX::column.
+ pub column_specific_options: HashMap<String, ColumnOptions>,
+}
+
+impl ConfigField for TableParquetOptions {
+ fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, description:
&'static str) {
+ self.global.visit(v, key_prefix, description);
+ self.column_specific_options
+ .visit(v, key_prefix, description)
+ }
+
+ fn set(&mut self, key: &str, value: &str) -> Result<()> {
+ // Determine the key if it's a global or column-specific setting
+ if key.contains("::") {
+ self.column_specific_options.set(key, value)
+ } else {
+ self.global.set(key, value)
+ }
+ }
+}
+
+macro_rules! config_namespace_with_hashmap {
+ (
+ $(#[doc = $struct_d:tt])*
+ $vis:vis struct $struct_name:ident {
+ $(
+ $(#[doc = $d:tt])*
+ $field_vis:vis $field_name:ident : $field_type:ty, default =
$default:expr
+ )*$(,)*
+ }
+ ) => {
+
+ $(#[doc = $struct_d])*
+ #[derive(Debug, Clone, PartialEq)]
+ $vis struct $struct_name{
+ $(
+ $(#[doc = $d])*
+ $field_vis $field_name : $field_type,
+ )*
+ }
+
+ impl ConfigField for $struct_name {
+ fn set(&mut self, key: &str, value: &str) -> Result<()> {
+ let (key, rem) = key.split_once('.').unwrap_or((key, ""));
+ match key {
+ $(
+ stringify!($field_name) => self.$field_name.set(rem,
value),
+ )*
+ _ => _config_err!(
+ "Config value \"{}\" not found on {}", key,
stringify!($struct_name)
+ )
+ }
+ }
+
+ fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str,
_description: &'static str) {
+ $(
+ let key = format!(concat!("{}.", stringify!($field_name)),
key_prefix);
+ let desc = concat!($($d),*).trim();
+ self.$field_name.visit(v, key.as_str(), desc);
+ )*
+ }
+ }
+
+ impl Default for $struct_name {
+ fn default() -> Self {
+ Self {
+ $($field_name: $default),*
+ }
+ }
+ }
+
+ impl ConfigField for HashMap<String,$struct_name> {
+ fn set(&mut self, key: &str, value: &str) -> Result<()> {
+ let parts: Vec<&str> = key.splitn(2, "::").collect();
+ match parts.as_slice() {
+ [inner_key, hashmap_key] => {
+ // Get or create the ColumnOptions for the specified
column
+ let inner_value = self
+ .entry((*hashmap_key).to_owned())
+ .or_insert_with($struct_name::default);
+
+ inner_value.set(inner_key, value)
+ }
+ _ => Err(DataFusionError::Configuration(format!(
+ "Unrecognized key '{}'.",
+ key
+ ))),
+ }
+ }
+
+ fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str,
_description: &'static str) {
+ for (column_name, col_options) in self {
+ $(
+ let key = format!("{}.{field}::{}", key_prefix,
column_name, field = stringify!($field_name));
+ let desc = concat!($($d),*).trim();
+ col_options.$field_name.visit(v, key.as_str(), desc);
+ )*
+ }
+ }
+ }
+ }
+}
+
+config_namespace_with_hashmap! {
+ pub struct ColumnOptions {
+ /// Sets if bloom filter is enabled for the column path.
Review Comment:
this is really nice to see in structured format here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]