alamb commented on code in PR #5732:
URL: https://github.com/apache/arrow-datafusion/pull/5732#discussion_r1158905894
##########
datafusion-cli/src/exec.rs:
##########
@@ -193,105 +197,144 @@ fn create_external_table(ctx: &SessionContext, cmd:
&CreateExternalTable) -> Res
let url: &Url = table_path.as_ref();
// registering the cloud object store dynamically using cmd.options
- match scheme {
+ let store = match scheme {
"s3" => {
- let bucket_name = get_bucket_name(url)?;
- let mut builder =
AmazonS3Builder::from_env().with_bucket_name(bucket_name);
-
- if let (Some(access_key_id), Some(secret_access_key)) = (
- cmd.options.get("access_key_id"),
- cmd.options.get("secret_access_key"),
- ) {
- builder = builder
- .with_access_key_id(access_key_id)
- .with_secret_access_key(secret_access_key);
- }
-
- if let Some(session_token) = cmd.options.get("session_token") {
- builder = builder.with_token(session_token);
- }
-
- if let Some(region) = cmd.options.get("region") {
- builder = builder.with_region(region);
- }
-
- let store = Arc::new(builder.build()?);
-
- ctx.runtime_env().register_object_store(url, store);
+ let builder = get_s3_object_store_builder(url, cmd)?;
+ Arc::new(builder.build()?) as Arc<dyn ObjectStore>
}
"oss" => {
- let bucket_name = get_bucket_name(url)?;
- let mut builder = AmazonS3Builder::from_env()
- .with_virtual_hosted_style_request(true)
- .with_bucket_name(bucket_name)
- // oss don't care about the "region" field
- .with_region("do_not_care");
-
- if let (Some(access_key_id), Some(secret_access_key)) = (
- cmd.options.get("access_key_id"),
- cmd.options.get("secret_access_key"),
- ) {
- builder = builder
- .with_access_key_id(access_key_id)
- .with_secret_access_key(secret_access_key);
- }
-
- if let Some(endpoint) = cmd.options.get("endpoint") {
- builder = builder.with_endpoint(endpoint);
- }
-
- let store = Arc::new(builder.build()?);
-
- ctx.runtime_env().register_object_store(url, store);
+ let builder = get_oss_object_store_builder(url, cmd)?;
+ Arc::new(builder.build()?) as Arc<dyn ObjectStore>
}
"gs" | "gcs" => {
- let bucket_name = get_bucket_name(url)?;
- let mut builder =
-
GoogleCloudStorageBuilder::from_env().with_bucket_name(bucket_name);
-
- if let Some(service_account_path) =
cmd.options.get("service_account_path") {
- builder =
builder.with_service_account_path(service_account_path);
- }
-
- if let Some(service_account_key) =
cmd.options.get("service_account_key") {
- builder =
builder.with_service_account_key(service_account_key);
- }
-
- if let Some(application_credentials_path) =
- cmd.options.get("application_credentials_path")
- {
- builder =
-
builder.with_application_credentials(application_credentials_path);
- }
-
- let store = Arc::new(builder.build()?);
-
- ctx.runtime_env().register_object_store(url, store);
+ let builder = get_gcs_object_store_builder(url, cmd)?;
+ Arc::new(builder.build()?) as Arc<dyn ObjectStore>
}
_ => {
// for other types, try to get from the object_store_registry
- let store = ctx
- .runtime_env()
+ ctx.runtime_env()
.object_store_registry
.get_store(url)
.map_err(|_| {
DataFusionError::Execution(format!(
"Unsupported object store scheme: {}",
scheme
))
- })?;
- ctx.runtime_env().register_object_store(url, store);
+ })?
}
};
+ ctx.runtime_env().register_object_store(url, store);
+
Ok(())
}
-fn get_bucket_name(url: &Url) -> Result<&str> {
- url.host_str().ok_or_else(|| {
- DataFusionError::Execution(format!(
- "Not able to parse bucket name from url: {}",
- url.as_str()
- ))
- })
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ async fn create_external_table_test(location: &str, sql: &str) ->
Result<()> {
+ let ctx = SessionContext::new();
+ let plan = ctx.state().create_logical_plan(&sql).await?;
+
+ match &plan {
+ LogicalPlan::CreateExternalTable(cmd) => {
+ create_external_table(&ctx, cmd)?;
+ }
+ _ => assert!(false),
+ };
+
+ ctx.runtime_env()
+ .object_store(ListingTableUrl::parse(location)?)?;
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn create_object_store_table_s3() -> Result<()> {
+ let access_key_id = "fake_access_key_id";
+ let secret_access_key = "fake_secret_access_key";
+ let region = "fake_us-east-2";
+ let session_token = "fake_session_token";
+ let location = "s3://bucket/path/file.parquet";
+
+ // Missing region
+ let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET
+ OPTIONS('access_key_id' '{access_key_id}', 'secret_access_key'
'{secret_access_key}') LOCATION '{location}'");
+ let err = create_external_table_test(location, &sql)
+ .await
+ .unwrap_err();
+ assert!(err.to_string().contains("Missing region"));
+
+ // Should be OK
+ let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET
+ OPTIONS('access_key_id' '{access_key_id}', 'secret_access_key'
'{secret_access_key}', 'region' '{region}', 'session_token' '{session_token}')
LOCATION '{location}'");
+ create_external_table_test(location, &sql).await?;
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn create_object_store_table_oss() -> Result<()> {
+ let access_key_id = "fake_access_key_id";
+ let secret_access_key = "fake_secret_access_key";
+ let endpoint = "fake_endpoint";
+ let location = "oss://bucket/path/file.parquet";
+
+ // Should be OK
+ let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET
+ OPTIONS('access_key_id' '{access_key_id}', 'secret_access_key'
'{secret_access_key}', 'endpoint' '{endpoint}') LOCATION '{location}'");
+ create_external_table_test(location, &sql).await?;
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn create_object_store_table_gcs() -> Result<()> {
+ let service_account_path = "fake_service_account_path";
+ let service_account_key =
+ "{\"private_key\":
\"fake_private_key.pem\",\"client_email\":\"fake_client_email\"}";
+ let application_credentials_path = "fake_application_credentials_path";
+ let location = "gcs://bucket/path/file.parquet";
+
+ // for service_account_path
+ let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET
+ OPTIONS('service_account_path' '{service_account_path}') LOCATION
'{location}'");
+ let err = create_external_table_test(location, &sql)
+ .await
+ .unwrap_err();
+ assert!(err.to_string().contains("No such file or directory"));
+
+ // for service_account_key
+ let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET
OPTIONS('service_account_key' '{service_account_key}') LOCATION '{location}'");
+ let err = create_external_table_test(location, &sql)
+ .await
+ .unwrap_err();
+ assert!(err.to_string().contains("No RSA key found in pem file"));
Review Comment:
💯 for negative tests
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]