This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 99bf509bc5 Bugfix: Remove df-cli specific SQL statment options before 
executing with DataFusion (#8426)
99bf509bc5 is described below

commit 99bf509bc5ef7e49c32ab19e261ab662276c8968
Author: Devin D'Angelo <[email protected]>
AuthorDate: Wed Dec 6 17:29:45 2023 -0500

    Bugfix: Remove df-cli specific SQL statment options before executing with 
DataFusion (#8426)
    
    * remove df-cli specific options from create external table options
    
    * add test and comments
    
    * cargo fmt
    
    * merge main
    
    * cargo toml format
---
 datafusion-cli/Cargo.lock            | 19 +++++++++--------
 datafusion-cli/Cargo.toml            |  1 +
 datafusion-cli/src/exec.rs           | 31 +++++++++++++++++++++------
 datafusion-cli/src/object_storage.rs | 41 +++++++++++++++++++-----------------
 4 files changed, 58 insertions(+), 34 deletions(-)

diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 474d85ac46..f88c907b05 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -1155,6 +1155,7 @@ dependencies = [
  "clap",
  "ctor",
  "datafusion",
+ "datafusion-common",
  "dirs",
  "env_logger",
  "mimalloc",
@@ -2157,9 +2158,9 @@ dependencies = [
 
 [[package]]
 name = "mio"
-version = "0.8.9"
+version = "0.8.10"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0"
+checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09"
 dependencies = [
  "libc",
  "wasi",
@@ -2307,7 +2308,7 @@ dependencies = [
  "quick-xml",
  "rand",
  "reqwest",
- "ring 0.17.6",
+ "ring 0.17.7",
  "rustls-pemfile",
  "serde",
  "serde_json",
@@ -2770,9 +2771,9 @@ dependencies = [
 
 [[package]]
 name = "ring"
-version = "0.17.6"
+version = "0.17.7"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "684d5e6e18f669ccebf64a92236bb7db9a34f07be010e3627368182027180866"
+checksum = "688c63d65483050968b2a8937f7995f443e27041a0f7700aa59b0822aedebb74"
 dependencies = [
  "cc",
  "getrandom",
@@ -2861,7 +2862,7 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "629648aced5775d558af50b2b4c7b02983a04b312126d45eeead26e7caa498b9"
 dependencies = [
  "log",
- "ring 0.17.6",
+ "ring 0.17.7",
  "rustls-webpki",
  "sct",
 ]
@@ -2893,7 +2894,7 @@ version = "0.101.7"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765"
 dependencies = [
- "ring 0.17.6",
+ "ring 0.17.7",
  "untrusted 0.9.0",
 ]
 
@@ -2962,7 +2963,7 @@ version = "0.7.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414"
 dependencies = [
- "ring 0.17.6",
+ "ring 0.17.7",
  "untrusted 0.9.0",
 ]
 
@@ -3759,7 +3760,7 @@ version = "0.22.4"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "ed63aea5ce73d0ff405984102c42de94fc55a6b75765d621c65262469b3c9b53"
 dependencies = [
- "ring 0.17.6",
+ "ring 0.17.7",
  "untrusted 0.9.0",
 ]
 
diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml
index dd7a077988..fd2dfd76c2 100644
--- a/datafusion-cli/Cargo.toml
+++ b/datafusion-cli/Cargo.toml
@@ -48,5 +48,6 @@ url = "2.2"
 [dev-dependencies]
 assert_cmd = "2.0"
 ctor = "0.2.0"
+datafusion-common = { path = "../datafusion/common" }
 predicates = "3.0"
 rstest = "0.17"
diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs
index 63862caab8..8af534cd13 100644
--- a/datafusion-cli/src/exec.rs
+++ b/datafusion-cli/src/exec.rs
@@ -211,7 +211,7 @@ async fn exec_and_print(
     })?;
     let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?;
     for statement in statements {
-        let plan = ctx.state().statement_to_plan(statement).await?;
+        let mut plan = ctx.state().statement_to_plan(statement).await?;
 
         // For plans like `Explain` ignore `MaxRows` option and always display 
all rows
         let should_ignore_maxrows = matches!(
@@ -221,10 +221,12 @@ async fn exec_and_print(
                 | LogicalPlan::Analyze(_)
         );
 
-        if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = 
&plan {
+        // Note that cmd is a mutable reference so that create_external_table 
function can remove all
+        // datafusion-cli specific options before passing through to 
datafusion. Otherwise, datafusion
+        // will raise Configuration errors.
+        if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut 
plan {
             create_external_table(ctx, cmd).await?;
         }
-
         let df = ctx.execute_logical_plan(plan).await?;
         let results = df.collect().await?;
 
@@ -244,7 +246,7 @@ async fn exec_and_print(
 
 async fn create_external_table(
     ctx: &SessionContext,
-    cmd: &CreateExternalTable,
+    cmd: &mut CreateExternalTable,
 ) -> Result<()> {
     let table_path = ListingTableUrl::parse(&cmd.location)?;
     let scheme = table_path.scheme();
@@ -285,15 +287,32 @@ async fn create_external_table(
 
 #[cfg(test)]
 mod tests {
+    use std::str::FromStr;
+
     use super::*;
     use datafusion::common::plan_err;
+    use datafusion_common::{file_options::StatementOptions, 
FileTypeWriterOptions};
 
     async fn create_external_table_test(location: &str, sql: &str) -> 
Result<()> {
         let ctx = SessionContext::new();
-        let plan = ctx.state().create_logical_plan(sql).await?;
+        let mut plan = ctx.state().create_logical_plan(sql).await?;
 
-        if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = 
&plan {
+        if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut 
plan {
             create_external_table(&ctx, cmd).await?;
+            let options: Vec<_> = cmd
+                .options
+                .iter()
+                .map(|(k, v)| (k.clone(), v.clone()))
+                .collect();
+            let statement_options = StatementOptions::new(options);
+            let file_type =
+                datafusion_common::FileType::from_str(cmd.file_type.as_str())?;
+
+            let _file_type_writer_options = FileTypeWriterOptions::build(
+                &file_type,
+                ctx.state().config_options(),
+                &statement_options,
+            )?;
         } else {
             return plan_err!("LogicalPlan is not a CreateExternalTable");
         }
diff --git a/datafusion-cli/src/object_storage.rs 
b/datafusion-cli/src/object_storage.rs
index c39d1915eb..9d79c7e0ec 100644
--- a/datafusion-cli/src/object_storage.rs
+++ b/datafusion-cli/src/object_storage.rs
@@ -30,20 +30,23 @@ use url::Url;
 
 pub async fn get_s3_object_store_builder(
     url: &Url,
-    cmd: &CreateExternalTable,
+    cmd: &mut CreateExternalTable,
 ) -> Result<AmazonS3Builder> {
     let bucket_name = get_bucket_name(url)?;
     let mut builder = 
AmazonS3Builder::from_env().with_bucket_name(bucket_name);
 
     if let (Some(access_key_id), Some(secret_access_key)) = (
-        cmd.options.get("access_key_id"),
-        cmd.options.get("secret_access_key"),
+        // These options are datafusion-cli specific and must be removed 
before passing through to datafusion.
+        // Otherwise, a Configuration error will be raised.
+        cmd.options.remove("access_key_id"),
+        cmd.options.remove("secret_access_key"),
     ) {
+        println!("removing secret access key!");
         builder = builder
             .with_access_key_id(access_key_id)
             .with_secret_access_key(secret_access_key);
 
-        if let Some(session_token) = cmd.options.get("session_token") {
+        if let Some(session_token) = cmd.options.remove("session_token") {
             builder = builder.with_token(session_token);
         }
     } else {
@@ -66,7 +69,7 @@ pub async fn get_s3_object_store_builder(
         builder = builder.with_credentials(credentials);
     }
 
-    if let Some(region) = cmd.options.get("region") {
+    if let Some(region) = cmd.options.remove("region") {
         builder = builder.with_region(region);
     }
 
@@ -99,7 +102,7 @@ impl CredentialProvider for S3CredentialProvider {
 
 pub fn get_oss_object_store_builder(
     url: &Url,
-    cmd: &CreateExternalTable,
+    cmd: &mut CreateExternalTable,
 ) -> Result<AmazonS3Builder> {
     let bucket_name = get_bucket_name(url)?;
     let mut builder = AmazonS3Builder::from_env()
@@ -109,15 +112,15 @@ pub fn get_oss_object_store_builder(
         .with_region("do_not_care");
 
     if let (Some(access_key_id), Some(secret_access_key)) = (
-        cmd.options.get("access_key_id"),
-        cmd.options.get("secret_access_key"),
+        cmd.options.remove("access_key_id"),
+        cmd.options.remove("secret_access_key"),
     ) {
         builder = builder
             .with_access_key_id(access_key_id)
             .with_secret_access_key(secret_access_key);
     }
 
-    if let Some(endpoint) = cmd.options.get("endpoint") {
+    if let Some(endpoint) = cmd.options.remove("endpoint") {
         builder = builder.with_endpoint(endpoint);
     }
 
@@ -126,21 +129,21 @@ pub fn get_oss_object_store_builder(
 
 pub fn get_gcs_object_store_builder(
     url: &Url,
-    cmd: &CreateExternalTable,
+    cmd: &mut CreateExternalTable,
 ) -> Result<GoogleCloudStorageBuilder> {
     let bucket_name = get_bucket_name(url)?;
     let mut builder = 
GoogleCloudStorageBuilder::from_env().with_bucket_name(bucket_name);
 
-    if let Some(service_account_path) = 
cmd.options.get("service_account_path") {
+    if let Some(service_account_path) = 
cmd.options.remove("service_account_path") {
         builder = builder.with_service_account_path(service_account_path);
     }
 
-    if let Some(service_account_key) = cmd.options.get("service_account_key") {
+    if let Some(service_account_key) = 
cmd.options.remove("service_account_key") {
         builder = builder.with_service_account_key(service_account_key);
     }
 
     if let Some(application_credentials_path) =
-        cmd.options.get("application_credentials_path")
+        cmd.options.remove("application_credentials_path")
     {
         builder = 
builder.with_application_credentials(application_credentials_path);
     }
@@ -180,9 +183,9 @@ mod tests {
         let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET 
OPTIONS('access_key_id' '{access_key_id}', 'secret_access_key' 
'{secret_access_key}', 'region' '{region}', 'session_token' {session_token}) 
LOCATION '{location}'");
 
         let ctx = SessionContext::new();
-        let plan = ctx.state().create_logical_plan(&sql).await?;
+        let mut plan = ctx.state().create_logical_plan(&sql).await?;
 
-        if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = 
&plan {
+        if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut 
plan {
             let builder = get_s3_object_store_builder(table_url.as_ref(), 
cmd).await?;
             // get the actual configuration information, then assert_eq!
             let config = [
@@ -212,9 +215,9 @@ mod tests {
         let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET 
OPTIONS('access_key_id' '{access_key_id}', 'secret_access_key' 
'{secret_access_key}', 'endpoint' '{endpoint}') LOCATION '{location}'");
 
         let ctx = SessionContext::new();
-        let plan = ctx.state().create_logical_plan(&sql).await?;
+        let mut plan = ctx.state().create_logical_plan(&sql).await?;
 
-        if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = 
&plan {
+        if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut 
plan {
             let builder = get_oss_object_store_builder(table_url.as_ref(), 
cmd)?;
             // get the actual configuration information, then assert_eq!
             let config = [
@@ -244,9 +247,9 @@ mod tests {
         let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET 
OPTIONS('service_account_path' '{service_account_path}', 'service_account_key' 
'{service_account_key}', 'application_credentials_path' 
'{application_credentials_path}') LOCATION '{location}'");
 
         let ctx = SessionContext::new();
-        let plan = ctx.state().create_logical_plan(&sql).await?;
+        let mut plan = ctx.state().create_logical_plan(&sql).await?;
 
-        if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = 
&plan {
+        if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut 
plan {
             let builder = get_gcs_object_store_builder(table_url.as_ref(), 
cmd)?;
             // get the actual configuration information, then assert_eq!
             let config = [

Reply via email to