This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 417b928eac Support http s3 endpoints in datafusion-cli (#10080)
417b928eac is described below
commit 417b928eac3023f81e1b7d2dc5ee193500e8f5fe
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue Apr 16 09:32:24 2024 -0400
Support http s3 endpoints in datafusion-cli (#10080)
---
datafusion-cli/src/object_storage.rs | 121 ++++++++++++++++++++++++++++++++---
1 file changed, 113 insertions(+), 8 deletions(-)
diff --git a/datafusion-cli/src/object_storage.rs
b/datafusion-cli/src/object_storage.rs
index 1178c0aad6..85e0009bd2 100644
--- a/datafusion-cli/src/object_storage.rs
+++ b/datafusion-cli/src/object_storage.rs
@@ -22,7 +22,7 @@ use std::sync::Arc;
use datafusion::common::config::{
ConfigEntry, ConfigExtension, ConfigField, ExtensionOptions, TableOptions,
Visit,
};
-use datafusion::common::{exec_datafusion_err, exec_err, internal_err};
+use datafusion::common::{config_err, exec_datafusion_err, exec_err};
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionState;
use datafusion::prelude::SessionContext;
@@ -39,17 +39,26 @@ pub async fn get_s3_object_store_builder(
url: &Url,
aws_options: &AwsOptions,
) -> Result<AmazonS3Builder> {
+ let AwsOptions {
+ access_key_id,
+ secret_access_key,
+ session_token,
+ region,
+ endpoint,
+ allow_http,
+ } = aws_options;
+
let bucket_name = get_bucket_name(url)?;
let mut builder =
AmazonS3Builder::from_env().with_bucket_name(bucket_name);
if let (Some(access_key_id), Some(secret_access_key)) =
- (&aws_options.access_key_id, &aws_options.secret_access_key)
+ (access_key_id, secret_access_key)
{
builder = builder
.with_access_key_id(access_key_id)
.with_secret_access_key(secret_access_key);
- if let Some(session_token) = &aws_options.session_token {
+ if let Some(session_token) = session_token {
builder = builder.with_token(session_token);
}
} else {
@@ -72,10 +81,30 @@ pub async fn get_s3_object_store_builder(
builder = builder.with_credentials(credentials);
}
- if let Some(region) = &aws_options.region {
+ if let Some(region) = region {
builder = builder.with_region(region);
}
+ if let Some(endpoint) = endpoint {
+ // Make a nicer error if the user hasn't allowed http and the endpoint
+ // is http as the default message is "URL scheme is not allowed"
+ if let Ok(endpoint_url) = Url::try_from(endpoint.as_str()) {
+ if !matches!(allow_http, Some(true)) && endpoint_url.scheme() ==
"http" {
+ return config_err!(
+ "Invalid endpoint: {endpoint}. \
+ HTTP is not allowed for S3 endpoints. \
+ To allow HTTP, set 'aws.allow_http' to true"
+ );
+ }
+ }
+
+ builder = builder.with_endpoint(endpoint);
+ }
+
+ if let Some(allow_http) = allow_http {
+ builder = builder.with_allow_http(*allow_http);
+ }
+
Ok(builder)
}
@@ -188,6 +217,8 @@ pub struct AwsOptions {
pub region: Option<String>,
/// OSS or COS Endpoint
pub endpoint: Option<String>,
+ /// Allow HTTP (otherwise will always use https)
+ pub allow_http: Option<bool>,
}
impl ExtensionOptions for AwsOptions {
@@ -219,11 +250,14 @@ impl ExtensionOptions for AwsOptions {
"region" => {
self.region.set(rem, value)?;
}
- "oss" | "cos" => {
+ "oss" | "cos" | "endpoint" => {
self.endpoint.set(rem, value)?;
}
+ "allow_http" => {
+ self.allow_http.set(rem, value)?;
+ }
_ => {
- return internal_err!("Config value \"{}\" not found on
AwsOptions", rem);
+ return config_err!("Config value \"{}\" not found on
AwsOptions", rem);
}
}
Ok(())
@@ -262,6 +296,7 @@ impl ExtensionOptions for AwsOptions {
self.session_token.visit(&mut v, "session_token", "");
self.region.visit(&mut v, "region", "");
self.endpoint.visit(&mut v, "endpoint", "");
+ self.allow_http.visit(&mut v, "allow_http", "");
v.0
}
}
@@ -307,7 +342,7 @@ impl ExtensionOptions for GcpOptions {
self.application_credentials_path.set(rem, value)?;
}
_ => {
- return internal_err!("Config value \"{}\" not found on
GcpOptions", rem);
+ return config_err!("Config value \"{}\" not found on
GcpOptions", rem);
}
}
Ok(())
@@ -479,12 +514,21 @@ mod tests {
let access_key_id = "fake_access_key_id";
let secret_access_key = "fake_secret_access_key";
let region = "fake_us-east-2";
+ let endpoint = "endpoint33";
let session_token = "fake_session_token";
let location = "s3://bucket/path/file.parquet";
let table_url = ListingTableUrl::parse(location)?;
let scheme = table_url.scheme();
- let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET
OPTIONS('aws.access_key_id' '{access_key_id}', 'aws.secret_access_key'
'{secret_access_key}', 'aws.region' '{region}', 'aws.session_token'
{session_token}) LOCATION '{location}'");
+ let sql = format!(
+ "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
+ ('aws.access_key_id' '{access_key_id}', \
+ 'aws.secret_access_key' '{secret_access_key}', \
+ 'aws.region' '{region}', \
+ 'aws.session_token' {session_token}, \
+ 'aws.endpoint' '{endpoint}'\
+ ) LOCATION '{location}'"
+ );
let ctx = SessionContext::new();
let mut plan = ctx.state().create_logical_plan(&sql).await?;
@@ -501,6 +545,7 @@ mod tests {
(AmazonS3ConfigKey::AccessKeyId, access_key_id),
(AmazonS3ConfigKey::SecretAccessKey, secret_access_key),
(AmazonS3ConfigKey::Region, region),
+ (AmazonS3ConfigKey::Endpoint, endpoint),
(AmazonS3ConfigKey::Token, session_token),
];
for (key, value) in config {
@@ -513,6 +558,66 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn s3_object_store_builder_allow_http_error() -> Result<()> {
+ let access_key_id = "fake_access_key_id";
+ let secret_access_key = "fake_secret_access_key";
+ let endpoint = "http://endpoint33";
+ let location = "s3://bucket/path/file.parquet";
+
+ let table_url = ListingTableUrl::parse(location)?;
+ let scheme = table_url.scheme();
+ let sql = format!(
+ "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
+ ('aws.access_key_id' '{access_key_id}', \
+ 'aws.secret_access_key' '{secret_access_key}', \
+ 'aws.endpoint' '{endpoint}'\
+ ) LOCATION '{location}'"
+ );
+
+ let ctx = SessionContext::new();
+ let mut plan = ctx.state().create_logical_plan(&sql).await?;
+
+ if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut
plan {
+ register_options(&ctx, scheme);
+ let mut table_options =
ctx.state().default_table_options().clone();
+ table_options.alter_with_string_hash_map(&cmd.options)?;
+ let aws_options =
table_options.extensions.get::<AwsOptions>().unwrap();
+ let err = get_s3_object_store_builder(table_url.as_ref(),
aws_options)
+ .await
+ .unwrap_err();
+
+ assert_eq!(err.to_string(), "Invalid or Unsupported Configuration:
Invalid endpoint: http://endpoint33. HTTP is not allowed for S3 endpoints. To
allow HTTP, set 'aws.allow_http' to true");
+ } else {
+ return plan_err!("LogicalPlan is not a CreateExternalTable");
+ }
+
+ // Now add `allow_http` to the options and check if it works
+ let sql = format!(
+ "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\
+ ('aws.access_key_id' '{access_key_id}', \
+ 'aws.secret_access_key' '{secret_access_key}', \
+ 'aws.endpoint' '{endpoint}',\
+ 'aws.allow_http' 'true'\
+ ) LOCATION '{location}'"
+ );
+
+ let mut plan = ctx.state().create_logical_plan(&sql).await?;
+
+ if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut
plan {
+ register_options(&ctx, scheme);
+ let mut table_options =
ctx.state().default_table_options().clone();
+ table_options.alter_with_string_hash_map(&cmd.options)?;
+ let aws_options =
table_options.extensions.get::<AwsOptions>().unwrap();
+ // ensure this isn't an error
+ get_s3_object_store_builder(table_url.as_ref(),
aws_options).await?;
+ } else {
+ return plan_err!("LogicalPlan is not a CreateExternalTable");
+ }
+
+ Ok(())
+ }
+
#[tokio::test]
async fn oss_object_store_builder() -> Result<()> {
let access_key_id = "fake_access_key_id";