This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 417b928eac Support http s3 endpoints in datafusion-cli (#10080)
417b928eac is described below

commit 417b928eac3023f81e1b7d2dc5ee193500e8f5fe
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue Apr 16 09:32:24 2024 -0400

    Support http s3 endpoints in datafusion-cli (#10080)
---
 datafusion-cli/src/object_storage.rs | 121 ++++++++++++++++++++++++++++++++---
 1 file changed, 113 insertions(+), 8 deletions(-)

diff --git a/datafusion-cli/src/object_storage.rs 
b/datafusion-cli/src/object_storage.rs
index 1178c0aad6..85e0009bd2 100644
--- a/datafusion-cli/src/object_storage.rs
+++ b/datafusion-cli/src/object_storage.rs
@@ -22,7 +22,7 @@ use std::sync::Arc;
 use datafusion::common::config::{
     ConfigEntry, ConfigExtension, ConfigField, ExtensionOptions, TableOptions, 
Visit,
 };
-use datafusion::common::{exec_datafusion_err, exec_err, internal_err};
+use datafusion::common::{config_err, exec_datafusion_err, exec_err};
 use datafusion::error::{DataFusionError, Result};
 use datafusion::execution::context::SessionState;
 use datafusion::prelude::SessionContext;
@@ -39,17 +39,26 @@ pub async fn get_s3_object_store_builder(
     url: &Url,
     aws_options: &AwsOptions,
 ) -> Result<AmazonS3Builder> {
+    let AwsOptions {
+        access_key_id,
+        secret_access_key,
+        session_token,
+        region,
+        endpoint,
+        allow_http,
+    } = aws_options;
+
     let bucket_name = get_bucket_name(url)?;
     let mut builder = 
AmazonS3Builder::from_env().with_bucket_name(bucket_name);
 
     if let (Some(access_key_id), Some(secret_access_key)) =
-        (&aws_options.access_key_id, &aws_options.secret_access_key)
+        (access_key_id, secret_access_key)
     {
         builder = builder
             .with_access_key_id(access_key_id)
             .with_secret_access_key(secret_access_key);
 
-        if let Some(session_token) = &aws_options.session_token {
+        if let Some(session_token) = session_token {
             builder = builder.with_token(session_token);
         }
     } else {
@@ -72,10 +81,30 @@ pub async fn get_s3_object_store_builder(
         builder = builder.with_credentials(credentials);
     }
 
-    if let Some(region) = &aws_options.region {
+    if let Some(region) = region {
         builder = builder.with_region(region);
     }
 
+    if let Some(endpoint) = endpoint {
+        // Make a nicer error if the user hasn't allowed http and the endpoint
+        // is http as the default message is "URL scheme is not allowed"
+        if let Ok(endpoint_url) = Url::try_from(endpoint.as_str()) {
+            if !matches!(allow_http, Some(true)) && endpoint_url.scheme() == 
"http" {
+                return config_err!(
+                    "Invalid endpoint: {endpoint}. \
+                HTTP is not allowed for S3 endpoints. \
+                To allow HTTP, set 'aws.allow_http' to true"
+                );
+            }
+        }
+
+        builder = builder.with_endpoint(endpoint);
+    }
+
+    if let Some(allow_http) = allow_http {
+        builder = builder.with_allow_http(*allow_http);
+    }
+
     Ok(builder)
 }
 
@@ -188,6 +217,8 @@ pub struct AwsOptions {
     pub region: Option<String>,
     /// OSS or COS Endpoint
     pub endpoint: Option<String>,
+    /// Allow HTTP (otherwise will always use https)
+    pub allow_http: Option<bool>,
 }
 
 impl ExtensionOptions for AwsOptions {
@@ -219,11 +250,14 @@ impl ExtensionOptions for AwsOptions {
             "region" => {
                 self.region.set(rem, value)?;
             }
-            "oss" | "cos" => {
+            "oss" | "cos" | "endpoint" => {
                 self.endpoint.set(rem, value)?;
             }
+            "allow_http" => {
+                self.allow_http.set(rem, value)?;
+            }
             _ => {
-                return internal_err!("Config value \"{}\" not found on 
AwsOptions", rem);
+                return config_err!("Config value \"{}\" not found on 
AwsOptions", rem);
             }
         }
         Ok(())
@@ -262,6 +296,7 @@ impl ExtensionOptions for AwsOptions {
         self.session_token.visit(&mut v, "session_token", "");
         self.region.visit(&mut v, "region", "");
         self.endpoint.visit(&mut v, "endpoint", "");
+        self.allow_http.visit(&mut v, "allow_http", "");
         v.0
     }
 }
@@ -307,7 +342,7 @@ impl ExtensionOptions for GcpOptions {
                 self.application_credentials_path.set(rem, value)?;
             }
             _ => {
-                return internal_err!("Config value \"{}\" not found on 
GcpOptions", rem);
+                return config_err!("Config value \"{}\" not found on 
GcpOptions", rem);
             }
         }
         Ok(())
@@ -479,12 +514,21 @@ mod tests {
         let access_key_id = "fake_access_key_id";
         let secret_access_key = "fake_secret_access_key";
         let region = "fake_us-east-2";
+        let endpoint = "endpoint33";
         let session_token = "fake_session_token";
         let location = "s3://bucket/path/file.parquet";
 
         let table_url = ListingTableUrl::parse(location)?;
         let scheme = table_url.scheme();
-        let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET 
OPTIONS('aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' 
'{secret_access_key}', 'aws.region' '{region}', 'aws.session_token' 
{session_token}) LOCATION '{location}'");
+        let sql = format!(
+            "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
+            ('aws.access_key_id' '{access_key_id}', \
+            'aws.secret_access_key' '{secret_access_key}', \
+            'aws.region' '{region}', \
+            'aws.session_token' {session_token}, \
+            'aws.endpoint' '{endpoint}'\
+            ) LOCATION '{location}'"
+        );
 
         let ctx = SessionContext::new();
         let mut plan = ctx.state().create_logical_plan(&sql).await?;
@@ -501,6 +545,7 @@ mod tests {
                 (AmazonS3ConfigKey::AccessKeyId, access_key_id),
                 (AmazonS3ConfigKey::SecretAccessKey, secret_access_key),
                 (AmazonS3ConfigKey::Region, region),
+                (AmazonS3ConfigKey::Endpoint, endpoint),
                 (AmazonS3ConfigKey::Token, session_token),
             ];
             for (key, value) in config {
@@ -513,6 +558,66 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn s3_object_store_builder_allow_http_error() -> Result<()> {
+        let access_key_id = "fake_access_key_id";
+        let secret_access_key = "fake_secret_access_key";
+        let endpoint = "http://endpoint33";;
+        let location = "s3://bucket/path/file.parquet";
+
+        let table_url = ListingTableUrl::parse(location)?;
+        let scheme = table_url.scheme();
+        let sql = format!(
+            "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
+            ('aws.access_key_id' '{access_key_id}', \
+            'aws.secret_access_key' '{secret_access_key}', \
+            'aws.endpoint' '{endpoint}'\
+            ) LOCATION '{location}'"
+        );
+
+        let ctx = SessionContext::new();
+        let mut plan = ctx.state().create_logical_plan(&sql).await?;
+
+        if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut 
plan {
+            register_options(&ctx, scheme);
+            let mut table_options = 
ctx.state().default_table_options().clone();
+            table_options.alter_with_string_hash_map(&cmd.options)?;
+            let aws_options = 
table_options.extensions.get::<AwsOptions>().unwrap();
+            let err = get_s3_object_store_builder(table_url.as_ref(), 
aws_options)
+                .await
+                .unwrap_err();
+
+            assert_eq!(err.to_string(), "Invalid or Unsupported Configuration: 
Invalid endpoint: http://endpoint33. HTTP is not allowed for S3 endpoints. To 
allow HTTP, set 'aws.allow_http' to true");
+        } else {
+            return plan_err!("LogicalPlan is not a CreateExternalTable");
+        }
+
+        // Now add `allow_http` to the options and check if it works
+        let sql = format!(
+            "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
+            ('aws.access_key_id' '{access_key_id}', \
+            'aws.secret_access_key' '{secret_access_key}', \
+            'aws.endpoint' '{endpoint}',\
+            'aws.allow_http' 'true'\
+            ) LOCATION '{location}'"
+        );
+
+        let mut plan = ctx.state().create_logical_plan(&sql).await?;
+
+        if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut 
plan {
+            register_options(&ctx, scheme);
+            let mut table_options = 
ctx.state().default_table_options().clone();
+            table_options.alter_with_string_hash_map(&cmd.options)?;
+            let aws_options = 
table_options.extensions.get::<AwsOptions>().unwrap();
+            // ensure this isn't an error
+            get_s3_object_store_builder(table_url.as_ref(), 
aws_options).await?;
+        } else {
+            return plan_err!("LogicalPlan is not a CreateExternalTable");
+        }
+
+        Ok(())
+    }
+
     #[tokio::test]
     async fn oss_object_store_builder() -> Result<()> {
         let access_key_id = "fake_access_key_id";

Reply via email to