This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 99bf509bc5 Bugfix: Remove df-cli specific SQL statment options before
executing with DataFusion (#8426)
99bf509bc5 is described below
commit 99bf509bc5ef7e49c32ab19e261ab662276c8968
Author: Devin D'Angelo <[email protected]>
AuthorDate: Wed Dec 6 17:29:45 2023 -0500
Bugfix: Remove df-cli specific SQL statment options before executing with
DataFusion (#8426)
* remove df-cli specific options from create external table options
* add test and comments
* cargo fmt
* merge main
* cargo toml format
---
datafusion-cli/Cargo.lock | 19 +++++++++--------
datafusion-cli/Cargo.toml | 1 +
datafusion-cli/src/exec.rs | 31 +++++++++++++++++++++------
datafusion-cli/src/object_storage.rs | 41 +++++++++++++++++++-----------------
4 files changed, 58 insertions(+), 34 deletions(-)
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 474d85ac46..f88c907b05 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -1155,6 +1155,7 @@ dependencies = [
"clap",
"ctor",
"datafusion",
+ "datafusion-common",
"dirs",
"env_logger",
"mimalloc",
@@ -2157,9 +2158,9 @@ dependencies = [
[[package]]
name = "mio"
-version = "0.8.9"
+version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0"
+checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09"
dependencies = [
"libc",
"wasi",
@@ -2307,7 +2308,7 @@ dependencies = [
"quick-xml",
"rand",
"reqwest",
- "ring 0.17.6",
+ "ring 0.17.7",
"rustls-pemfile",
"serde",
"serde_json",
@@ -2770,9 +2771,9 @@ dependencies = [
[[package]]
name = "ring"
-version = "0.17.6"
+version = "0.17.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "684d5e6e18f669ccebf64a92236bb7db9a34f07be010e3627368182027180866"
+checksum = "688c63d65483050968b2a8937f7995f443e27041a0f7700aa59b0822aedebb74"
dependencies = [
"cc",
"getrandom",
@@ -2861,7 +2862,7 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "629648aced5775d558af50b2b4c7b02983a04b312126d45eeead26e7caa498b9"
dependencies = [
"log",
- "ring 0.17.6",
+ "ring 0.17.7",
"rustls-webpki",
"sct",
]
@@ -2893,7 +2894,7 @@ version = "0.101.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765"
dependencies = [
- "ring 0.17.6",
+ "ring 0.17.7",
"untrusted 0.9.0",
]
@@ -2962,7 +2963,7 @@ version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414"
dependencies = [
- "ring 0.17.6",
+ "ring 0.17.7",
"untrusted 0.9.0",
]
@@ -3759,7 +3760,7 @@ version = "0.22.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed63aea5ce73d0ff405984102c42de94fc55a6b75765d621c65262469b3c9b53"
dependencies = [
- "ring 0.17.6",
+ "ring 0.17.7",
"untrusted 0.9.0",
]
diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml
index dd7a077988..fd2dfd76c2 100644
--- a/datafusion-cli/Cargo.toml
+++ b/datafusion-cli/Cargo.toml
@@ -48,5 +48,6 @@ url = "2.2"
[dev-dependencies]
assert_cmd = "2.0"
ctor = "0.2.0"
+datafusion-common = { path = "../datafusion/common" }
predicates = "3.0"
rstest = "0.17"
diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs
index 63862caab8..8af534cd13 100644
--- a/datafusion-cli/src/exec.rs
+++ b/datafusion-cli/src/exec.rs
@@ -211,7 +211,7 @@ async fn exec_and_print(
})?;
let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?;
for statement in statements {
- let plan = ctx.state().statement_to_plan(statement).await?;
+ let mut plan = ctx.state().statement_to_plan(statement).await?;
// For plans like `Explain` ignore `MaxRows` option and always display
all rows
let should_ignore_maxrows = matches!(
@@ -221,10 +221,12 @@ async fn exec_and_print(
| LogicalPlan::Analyze(_)
);
- if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) =
&plan {
+ // Note that cmd is a mutable reference so that create_external_table
function can remove all
+ // datafusion-cli specific options before passing through to
datafusion. Otherwise, datafusion
+ // will raise Configuration errors.
+ if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut
plan {
create_external_table(ctx, cmd).await?;
}
-
let df = ctx.execute_logical_plan(plan).await?;
let results = df.collect().await?;
@@ -244,7 +246,7 @@ async fn exec_and_print(
async fn create_external_table(
ctx: &SessionContext,
- cmd: &CreateExternalTable,
+ cmd: &mut CreateExternalTable,
) -> Result<()> {
let table_path = ListingTableUrl::parse(&cmd.location)?;
let scheme = table_path.scheme();
@@ -285,15 +287,32 @@ async fn create_external_table(
#[cfg(test)]
mod tests {
+ use std::str::FromStr;
+
use super::*;
use datafusion::common::plan_err;
+ use datafusion_common::{file_options::StatementOptions,
FileTypeWriterOptions};
async fn create_external_table_test(location: &str, sql: &str) ->
Result<()> {
let ctx = SessionContext::new();
- let plan = ctx.state().create_logical_plan(sql).await?;
+ let mut plan = ctx.state().create_logical_plan(sql).await?;
- if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) =
&plan {
+ if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut
plan {
create_external_table(&ctx, cmd).await?;
+ let options: Vec<_> = cmd
+ .options
+ .iter()
+ .map(|(k, v)| (k.clone(), v.clone()))
+ .collect();
+ let statement_options = StatementOptions::new(options);
+ let file_type =
+ datafusion_common::FileType::from_str(cmd.file_type.as_str())?;
+
+ let _file_type_writer_options = FileTypeWriterOptions::build(
+ &file_type,
+ ctx.state().config_options(),
+ &statement_options,
+ )?;
} else {
return plan_err!("LogicalPlan is not a CreateExternalTable");
}
diff --git a/datafusion-cli/src/object_storage.rs
b/datafusion-cli/src/object_storage.rs
index c39d1915eb..9d79c7e0ec 100644
--- a/datafusion-cli/src/object_storage.rs
+++ b/datafusion-cli/src/object_storage.rs
@@ -30,20 +30,23 @@ use url::Url;
pub async fn get_s3_object_store_builder(
url: &Url,
- cmd: &CreateExternalTable,
+ cmd: &mut CreateExternalTable,
) -> Result<AmazonS3Builder> {
let bucket_name = get_bucket_name(url)?;
let mut builder =
AmazonS3Builder::from_env().with_bucket_name(bucket_name);
if let (Some(access_key_id), Some(secret_access_key)) = (
- cmd.options.get("access_key_id"),
- cmd.options.get("secret_access_key"),
+ // These options are datafusion-cli specific and must be removed
before passing through to datafusion.
+ // Otherwise, a Configuration error will be raised.
+ cmd.options.remove("access_key_id"),
+ cmd.options.remove("secret_access_key"),
) {
+ println!("removing secret access key!");
builder = builder
.with_access_key_id(access_key_id)
.with_secret_access_key(secret_access_key);
- if let Some(session_token) = cmd.options.get("session_token") {
+ if let Some(session_token) = cmd.options.remove("session_token") {
builder = builder.with_token(session_token);
}
} else {
@@ -66,7 +69,7 @@ pub async fn get_s3_object_store_builder(
builder = builder.with_credentials(credentials);
}
- if let Some(region) = cmd.options.get("region") {
+ if let Some(region) = cmd.options.remove("region") {
builder = builder.with_region(region);
}
@@ -99,7 +102,7 @@ impl CredentialProvider for S3CredentialProvider {
pub fn get_oss_object_store_builder(
url: &Url,
- cmd: &CreateExternalTable,
+ cmd: &mut CreateExternalTable,
) -> Result<AmazonS3Builder> {
let bucket_name = get_bucket_name(url)?;
let mut builder = AmazonS3Builder::from_env()
@@ -109,15 +112,15 @@ pub fn get_oss_object_store_builder(
.with_region("do_not_care");
if let (Some(access_key_id), Some(secret_access_key)) = (
- cmd.options.get("access_key_id"),
- cmd.options.get("secret_access_key"),
+ cmd.options.remove("access_key_id"),
+ cmd.options.remove("secret_access_key"),
) {
builder = builder
.with_access_key_id(access_key_id)
.with_secret_access_key(secret_access_key);
}
- if let Some(endpoint) = cmd.options.get("endpoint") {
+ if let Some(endpoint) = cmd.options.remove("endpoint") {
builder = builder.with_endpoint(endpoint);
}
@@ -126,21 +129,21 @@ pub fn get_oss_object_store_builder(
pub fn get_gcs_object_store_builder(
url: &Url,
- cmd: &CreateExternalTable,
+ cmd: &mut CreateExternalTable,
) -> Result<GoogleCloudStorageBuilder> {
let bucket_name = get_bucket_name(url)?;
let mut builder =
GoogleCloudStorageBuilder::from_env().with_bucket_name(bucket_name);
- if let Some(service_account_path) =
cmd.options.get("service_account_path") {
+ if let Some(service_account_path) =
cmd.options.remove("service_account_path") {
builder = builder.with_service_account_path(service_account_path);
}
- if let Some(service_account_key) = cmd.options.get("service_account_key") {
+ if let Some(service_account_key) =
cmd.options.remove("service_account_key") {
builder = builder.with_service_account_key(service_account_key);
}
if let Some(application_credentials_path) =
- cmd.options.get("application_credentials_path")
+ cmd.options.remove("application_credentials_path")
{
builder =
builder.with_application_credentials(application_credentials_path);
}
@@ -180,9 +183,9 @@ mod tests {
let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET
OPTIONS('access_key_id' '{access_key_id}', 'secret_access_key'
'{secret_access_key}', 'region' '{region}', 'session_token' {session_token})
LOCATION '{location}'");
let ctx = SessionContext::new();
- let plan = ctx.state().create_logical_plan(&sql).await?;
+ let mut plan = ctx.state().create_logical_plan(&sql).await?;
- if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) =
&plan {
+ if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut
plan {
let builder = get_s3_object_store_builder(table_url.as_ref(),
cmd).await?;
// get the actual configuration information, then assert_eq!
let config = [
@@ -212,9 +215,9 @@ mod tests {
let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET
OPTIONS('access_key_id' '{access_key_id}', 'secret_access_key'
'{secret_access_key}', 'endpoint' '{endpoint}') LOCATION '{location}'");
let ctx = SessionContext::new();
- let plan = ctx.state().create_logical_plan(&sql).await?;
+ let mut plan = ctx.state().create_logical_plan(&sql).await?;
- if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) =
&plan {
+ if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut
plan {
let builder = get_oss_object_store_builder(table_url.as_ref(),
cmd)?;
// get the actual configuration information, then assert_eq!
let config = [
@@ -244,9 +247,9 @@ mod tests {
let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET
OPTIONS('service_account_path' '{service_account_path}', 'service_account_key'
'{service_account_key}', 'application_credentials_path'
'{application_credentials_path}') LOCATION '{location}'");
let ctx = SessionContext::new();
- let plan = ctx.state().create_logical_plan(&sql).await?;
+ let mut plan = ctx.state().create_logical_plan(&sql).await?;
- if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) =
&plan {
+ if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut
plan {
let builder = get_gcs_object_store_builder(table_url.as_ref(),
cmd)?;
// get the actual configuration information, then assert_eq!
let config = [