This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new bd1c76c7c8 implement direct query for s3 and gcs (#9199)
bd1c76c7c8 is described below
commit bd1c76c7c8e91463daf56c576094e12e4705c8cd
Author: Niko <[email protected]>
AuthorDate: Tue Feb 13 14:32:21 2024 +0000
implement direct query for s3 and gcs (#9199)
Signed-off-by: Nikolay Ulmasov <[email protected]>
---
datafusion-cli/src/catalog.rs | 124 +++++++++++++++++++++++++----------
datafusion-cli/src/exec.rs | 60 ++++-------------
datafusion-cli/src/object_storage.rs | 42 ++++++++++++
docs/source/user-guide/cli.md | 14 +++-
4 files changed, 156 insertions(+), 84 deletions(-)
diff --git a/datafusion-cli/src/catalog.rs b/datafusion-cli/src/catalog.rs
index 65c1feb885..f664d40df5 100644
--- a/datafusion-cli/src/catalog.rs
+++ b/datafusion-cli/src/catalog.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use crate::object_storage::get_object_store;
use async_trait::async_trait;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
@@ -24,10 +25,9 @@ use datafusion::datasource::listing::{
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::execution::context::SessionState;
-use object_store::http::HttpBuilder;
-use object_store::ObjectStore;
use parking_lot::RwLock;
use std::any::Any;
+use std::collections::HashMap;
use std::sync::{Arc, Weak};
use url::Url;
@@ -155,27 +155,24 @@ impl SchemaProvider for DynamicFileSchemaProvider {
// that name, try to treat it as a listing table
let state = self.state.upgrade()?.read().clone();
let table_url = ListingTableUrl::parse(name).ok()?;
-
- // Assure the `http` store for this url is registered if this
- // is an `http(s)` listing
- // TODO: support for other types, e.g. `s3`, may need to be added
- match table_url.scheme() {
- "http" | "https" => {
- let url: &Url = table_url.as_ref();
- match state.runtime_env().object_store_registry.get_store(url)
{
- Ok(_) => {}
- Err(_) => {
- let store = Arc::new(
- HttpBuilder::new()
- .with_url(url.origin().ascii_serialization())
- .build()
- .ok()?,
- ) as Arc<dyn ObjectStore>;
- state.runtime_env().register_object_store(url, store);
- }
- }
+ let url: &Url = table_url.as_ref();
+
+ // If the store is already registered for this URL then `get_store`
+ // will return `Ok` which means we don't need to register it again.
However,
+ // if `get_store` returns an `Err` then it means the corresponding
store is
+ // not registered yet and we need to register it
+ match state.runtime_env().object_store_registry.get_store(url) {
+ Ok(_) => { /*Nothing to do here, store for this URL is already
registered*/ }
+ Err(_) => {
+ // Register the store for this URL. Here we don't have access
+ // to any command options so the only choice is to use an
empty collection
+ let mut options = HashMap::new();
+ let store =
+ get_object_store(&state, &mut options, table_url.scheme(),
url)
+ .await
+ .unwrap();
+ state.runtime_env().register_object_store(url, store);
}
- _ => {}
}
let config = ListingTableConfig::new(table_url)
@@ -198,15 +195,10 @@ impl SchemaProvider for DynamicFileSchemaProvider {
#[cfg(test)]
mod tests {
use super::*;
+ use datafusion::catalog::schema::SchemaProvider;
use datafusion::prelude::SessionContext;
- #[tokio::test]
- async fn query_http_location_test() -> Result<()> {
- // Perhaps this could be changed to use an existing file but
- // that will require a permanently availalble web resource
- let domain = "example.com";
- let location = format!("http://{domain}/file.parquet");
-
+ fn setup_context() -> (SessionContext, Arc<dyn SchemaProvider>) {
let mut ctx = SessionContext::new();
ctx.register_catalog_list(Arc::new(DynamicFileCatalog::new(
ctx.state().catalog_list(),
@@ -222,12 +214,23 @@ mod tests {
let schema = catalog
.schema(catalog.schema_names().first().unwrap())
.unwrap();
- let none = schema.table(&location).await;
+ (ctx, schema)
+ }
- // That's a non-existing location so expecting None here
- assert!(none.is_none());
+ #[tokio::test]
+ async fn query_http_location_test() -> Result<()> {
+ // This is a unit test so not expecting a connection or a file to be
+ // available
+ let domain = "example.com";
+ let location = format!("http://{domain}/file.parquet");
- // It should still create an object store for the location
+ let (ctx, schema) = setup_context();
+
+ // That's a non registered table so expecting None here
+ let table = schema.table(&location).await;
+ assert!(table.is_none());
+
+ // It should still create an object store for the location in the
SessionState
let store = ctx
.runtime_env()
.object_store(ListingTableUrl::parse(location)?)?;
@@ -240,4 +243,59 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn query_s3_location_test() -> Result<()> {
+ let bucket = "examples3bucket";
+ let location = format!("s3://{bucket}/file.parquet");
+
+ let (ctx, schema) = setup_context();
+
+ let table = schema.table(&location).await;
+ assert!(table.is_none());
+
+ let store = ctx
+ .runtime_env()
+ .object_store(ListingTableUrl::parse(location)?)?;
+ assert_eq!(format!("{store}"), format!("AmazonS3({bucket})"));
+
+ // The store must be configured for this domain
+ let expected_bucket = format!("bucket: \"{bucket}\"");
+ assert!(format!("{store:?}").contains(&expected_bucket));
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn query_gs_location_test() -> Result<()> {
+ let bucket = "examplegsbucket";
+ let location = format!("gs://{bucket}/file.parquet");
+
+ let (ctx, schema) = setup_context();
+
+ let table = schema.table(&location).await;
+ assert!(table.is_none());
+
+ let store = ctx
+ .runtime_env()
+ .object_store(ListingTableUrl::parse(location)?)?;
+ assert_eq!(format!("{store}"),
format!("GoogleCloudStorage({bucket})"));
+
+ // The store must be configured for this domain
+ let expected_bucket = format!("bucket_name_encoded: \"{bucket}\"");
+ assert!(format!("{store:?}").contains(&expected_bucket));
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ #[should_panic]
+ async fn query_invalid_location_test() {
+ let location = "ts://file.parquet";
+ let (_ctx, schema) = setup_context();
+
+ // This will panic, we cannot prevent that because `schema.table`
+ // returns an Option
+ schema.table(location).await;
+ }
}
diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs
index f2392c49a4..6ca8dfe927 100644
--- a/datafusion-cli/src/exec.rs
+++ b/datafusion-cli/src/exec.rs
@@ -18,23 +18,20 @@
//! Execution functions
use std::collections::HashMap;
+use std::fs::File;
use std::io::prelude::*;
use std::io::BufReader;
use std::time::Instant;
-use std::{fs::File, sync::Arc};
use crate::print_format::PrintFormat;
use crate::{
command::{Command, OutputFormat},
helper::{unescape_input, CliHelper},
- object_storage::{
- get_gcs_object_store_builder, get_oss_object_store_builder,
- get_s3_object_store_builder,
- },
+ object_storage::get_object_store,
print_options::{MaxRows, PrintOptions},
};
-use datafusion::common::{exec_datafusion_err, plan_datafusion_err};
+use datafusion::common::plan_datafusion_err;
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::datasource::physical_plan::is_plan_streaming;
use datafusion::error::{DataFusionError, Result};
@@ -45,8 +42,6 @@ use datafusion::sql::{parser::DFParser,
sqlparser::dialect::dialect_from_str};
use datafusion::logical_expr::dml::CopyTo;
use datafusion::sql::parser::Statement;
-use object_store::http::HttpBuilder;
-use object_store::ObjectStore;
use rustyline::error::ReadlineError;
use rustyline::Editor;
use tokio::signal;
@@ -280,8 +275,13 @@ async fn register_object_store(
copy_to: &mut CopyTo,
) -> Result<(), DataFusionError> {
let url = ListingTableUrl::parse(copy_to.output_url.as_str())?;
- let store =
- get_object_store(ctx, &mut HashMap::new(), url.scheme(),
url.as_ref()).await?;
+ let store = get_object_store(
+ &ctx.state(),
+ &mut HashMap::new(),
+ url.scheme(),
+ url.as_ref(),
+ )
+ .await?;
ctx.runtime_env().register_object_store(url.as_ref(), store);
Ok(())
}
@@ -295,50 +295,12 @@ async fn create_external_table(
let url: &Url = table_path.as_ref();
// registering the cloud object store dynamically using cmd.options
- let store = get_object_store(ctx, &mut cmd.options, scheme, url).await?;
-
+ let store = get_object_store(&ctx.state(), &mut cmd.options, scheme,
url).await?;
ctx.runtime_env().register_object_store(url, store);
Ok(())
}
-async fn get_object_store(
- ctx: &SessionContext,
- options: &mut HashMap<String, String>,
- scheme: &str,
- url: &Url,
-) -> Result<Arc<dyn ObjectStore>, DataFusionError> {
- let store = match scheme {
- "s3" => {
- let builder = get_s3_object_store_builder(url, options).await?;
- Arc::new(builder.build()?) as Arc<dyn ObjectStore>
- }
- "oss" => {
- let builder = get_oss_object_store_builder(url, options)?;
- Arc::new(builder.build()?) as Arc<dyn ObjectStore>
- }
- "gs" | "gcs" => {
- let builder = get_gcs_object_store_builder(url, options)?;
- Arc::new(builder.build()?) as Arc<dyn ObjectStore>
- }
- "http" | "https" => Arc::new(
- HttpBuilder::new()
- .with_url(url.origin().ascii_serialization())
- .build()?,
- ) as Arc<dyn ObjectStore>,
- _ => {
- // for other types, try to get from the object_store_registry
- ctx.runtime_env()
- .object_store_registry
- .get_store(url)
- .map_err(|_| {
- exec_datafusion_err!("Unsupported object store scheme:
{}", scheme)
- })?
- }
- };
- Ok(store)
-}
-
#[cfg(test)]
mod tests {
use std::str::FromStr;
diff --git a/datafusion-cli/src/object_storage.rs
b/datafusion-cli/src/object_storage.rs
index 1d090a665c..897f379655 100644
--- a/datafusion-cli/src/object_storage.rs
+++ b/datafusion-cli/src/object_storage.rs
@@ -17,8 +17,12 @@
use async_trait::async_trait;
use aws_credential_types::provider::ProvideCredentials;
+use datafusion::common::exec_datafusion_err;
use datafusion::error::{DataFusionError, Result};
+use datafusion::execution::context::SessionState;
use object_store::aws::AwsCredential;
+use object_store::http::HttpBuilder;
+use object_store::ObjectStore;
use object_store::{
aws::AmazonS3Builder, gcp::GoogleCloudStorageBuilder, CredentialProvider,
};
@@ -156,6 +160,44 @@ fn get_bucket_name(url: &Url) -> Result<&str> {
})
}
+pub(crate) async fn get_object_store(
+ state: &SessionState,
+ options: &mut HashMap<String, String>,
+ scheme: &str,
+ url: &Url,
+) -> Result<Arc<dyn ObjectStore>, DataFusionError> {
+ let store = match scheme {
+ "s3" => {
+ let builder = get_s3_object_store_builder(url, options).await?;
+ Arc::new(builder.build()?) as Arc<dyn ObjectStore>
+ }
+ "oss" => {
+ let builder = get_oss_object_store_builder(url, options)?;
+ Arc::new(builder.build()?) as Arc<dyn ObjectStore>
+ }
+ "gs" | "gcs" => {
+ let builder = get_gcs_object_store_builder(url, options)?;
+ Arc::new(builder.build()?) as Arc<dyn ObjectStore>
+ }
+ "http" | "https" => Arc::new(
+ HttpBuilder::new()
+ .with_url(url.origin().ascii_serialization())
+ .build()?,
+ ) as Arc<dyn ObjectStore>,
+ _ => {
+ // for other types, try to get from the object_store_registry
+ state
+ .runtime_env()
+ .object_store_registry
+ .get_store(url)
+ .map_err(|_| {
+ exec_datafusion_err!("Unsupported object store scheme:
{}", scheme)
+ })?
+ }
+ };
+ Ok(store)
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/docs/source/user-guide/cli.md b/docs/source/user-guide/cli.md
index a8a9d6f212..a94e2427ea 100644
--- a/docs/source/user-guide/cli.md
+++ b/docs/source/user-guide/cli.md
@@ -194,8 +194,9 @@ DataFusion CLI v16.0.0
2 rows in set. Query took 0.007 seconds.
```
-You can also query directly from the remote location via HTTP(S) without
-registering the location as a table
+You can also query directly from any remote location supported by DataFusion
without
+registering the location as a table.
+For example, to read from a remote parquet file via HTTP(S) you can use the
following:
```sql
select count(*) from
'https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_1.parquet'
@@ -207,6 +208,15 @@ select count(*) from
'https://datasets.clickhouse.com/hits_compatible/athena_par
1 row in set. Query took 0.595 seconds.
```
+To read from an AWS S3 or GCS, use `s3` or `gs` as a protocol prefix. For
example, this will read a file
+in S3 bucket named `my-data-bucket`. Note that this is not a real file
location and therefore the query
+will fail, you need to use your own file location in S3. Also, you need to set
the relevent access credentials
+as environmental variables (e.g. for AWS S3 you need to at least
`AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`)
+
+```sql
+select count(*) from 's3://my-data-bucket/athena_partitioned/hits.parquet'
+```
+
## Creating External Tables
It is also possible to create a table backed by files by explicitly