This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new bd1c76c7c8 implement direct query for s3 and gcs (#9199)
bd1c76c7c8 is described below

commit bd1c76c7c8e91463daf56c576094e12e4705c8cd
Author: Niko <[email protected]>
AuthorDate: Tue Feb 13 14:32:21 2024 +0000

    implement direct query for s3 and gcs (#9199)
    
    Signed-off-by: Nikolay Ulmasov <[email protected]>
---
 datafusion-cli/src/catalog.rs        | 124 +++++++++++++++++++++++++----------
 datafusion-cli/src/exec.rs           |  60 ++++-------------
 datafusion-cli/src/object_storage.rs |  42 ++++++++++++
 docs/source/user-guide/cli.md        |  14 +++-
 4 files changed, 156 insertions(+), 84 deletions(-)

diff --git a/datafusion-cli/src/catalog.rs b/datafusion-cli/src/catalog.rs
index 65c1feb885..f664d40df5 100644
--- a/datafusion-cli/src/catalog.rs
+++ b/datafusion-cli/src/catalog.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::object_storage::get_object_store;
 use async_trait::async_trait;
 use datafusion::catalog::schema::SchemaProvider;
 use datafusion::catalog::{CatalogProvider, CatalogProviderList};
@@ -24,10 +25,9 @@ use datafusion::datasource::listing::{
 use datafusion::datasource::TableProvider;
 use datafusion::error::Result;
 use datafusion::execution::context::SessionState;
-use object_store::http::HttpBuilder;
-use object_store::ObjectStore;
 use parking_lot::RwLock;
 use std::any::Any;
+use std::collections::HashMap;
 use std::sync::{Arc, Weak};
 use url::Url;
 
@@ -155,27 +155,24 @@ impl SchemaProvider for DynamicFileSchemaProvider {
         // that name, try to treat it as a listing table
         let state = self.state.upgrade()?.read().clone();
         let table_url = ListingTableUrl::parse(name).ok()?;
-
-        // Assure the `http` store for this url is registered if this
-        // is an `http(s)` listing
-        // TODO: support for other types, e.g. `s3`, may need to be added
-        match table_url.scheme() {
-            "http" | "https" => {
-                let url: &Url = table_url.as_ref();
-                match state.runtime_env().object_store_registry.get_store(url) 
{
-                    Ok(_) => {}
-                    Err(_) => {
-                        let store = Arc::new(
-                            HttpBuilder::new()
-                                .with_url(url.origin().ascii_serialization())
-                                .build()
-                                .ok()?,
-                        ) as Arc<dyn ObjectStore>;
-                        state.runtime_env().register_object_store(url, store);
-                    }
-                }
+        let url: &Url = table_url.as_ref();
+
+        // If the store is already registered for this URL then `get_store`
+        // will return `Ok` which means we don't need to register it again. 
However,
+        // if `get_store` returns an `Err` then it means the corresponding 
store is
+        // not registered yet and we need to register it
+        match state.runtime_env().object_store_registry.get_store(url) {
+            Ok(_) => { /*Nothing to do here, store for this URL is already 
registered*/ }
+            Err(_) => {
+                // Register the store for this URL. Here we don't have access
+                // to any command options so the only choice is to use an 
empty collection
+                let mut options = HashMap::new();
+                let store =
+                    get_object_store(&state, &mut options, table_url.scheme(), 
url)
+                        .await
+                        .unwrap();
+                state.runtime_env().register_object_store(url, store);
             }
-            _ => {}
         }
 
         let config = ListingTableConfig::new(table_url)
@@ -198,15 +195,10 @@ impl SchemaProvider for DynamicFileSchemaProvider {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use datafusion::catalog::schema::SchemaProvider;
     use datafusion::prelude::SessionContext;
 
-    #[tokio::test]
-    async fn query_http_location_test() -> Result<()> {
-        // Perhaps this could be changed to use an existing file but
-        // that will require a permanently availalble web resource
-        let domain = "example.com";
-        let location = format!("http://{domain}/file.parquet";);
-
+    fn setup_context() -> (SessionContext, Arc<dyn SchemaProvider>) {
         let mut ctx = SessionContext::new();
         ctx.register_catalog_list(Arc::new(DynamicFileCatalog::new(
             ctx.state().catalog_list(),
@@ -222,12 +214,23 @@ mod tests {
         let schema = catalog
             .schema(catalog.schema_names().first().unwrap())
             .unwrap();
-        let none = schema.table(&location).await;
+        (ctx, schema)
+    }
 
-        // That's a non-existing location so expecting None here
-        assert!(none.is_none());
+    #[tokio::test]
+    async fn query_http_location_test() -> Result<()> {
+        // This is a unit test so not expecting a connection or a file to be
+        // available
+        let domain = "example.com";
+        let location = format!("http://{domain}/file.parquet";);
 
-        // It should still create an object store for the location
+        let (ctx, schema) = setup_context();
+
+        // That's a non registered table so expecting None here
+        let table = schema.table(&location).await;
+        assert!(table.is_none());
+
+        // It should still create an object store for the location in the 
SessionState
         let store = ctx
             .runtime_env()
             .object_store(ListingTableUrl::parse(location)?)?;
@@ -240,4 +243,59 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn query_s3_location_test() -> Result<()> {
+        let bucket = "examples3bucket";
+        let location = format!("s3://{bucket}/file.parquet");
+
+        let (ctx, schema) = setup_context();
+
+        let table = schema.table(&location).await;
+        assert!(table.is_none());
+
+        let store = ctx
+            .runtime_env()
+            .object_store(ListingTableUrl::parse(location)?)?;
+        assert_eq!(format!("{store}"), format!("AmazonS3({bucket})"));
+
+        // The store must be configured for this domain
+        let expected_bucket = format!("bucket: \"{bucket}\"");
+        assert!(format!("{store:?}").contains(&expected_bucket));
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn query_gs_location_test() -> Result<()> {
+        let bucket = "examplegsbucket";
+        let location = format!("gs://{bucket}/file.parquet");
+
+        let (ctx, schema) = setup_context();
+
+        let table = schema.table(&location).await;
+        assert!(table.is_none());
+
+        let store = ctx
+            .runtime_env()
+            .object_store(ListingTableUrl::parse(location)?)?;
+        assert_eq!(format!("{store}"), 
format!("GoogleCloudStorage({bucket})"));
+
+        // The store must be configured for this domain
+        let expected_bucket = format!("bucket_name_encoded: \"{bucket}\"");
+        assert!(format!("{store:?}").contains(&expected_bucket));
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    #[should_panic]
+    async fn query_invalid_location_test() {
+        let location = "ts://file.parquet";
+        let (_ctx, schema) = setup_context();
+
+        // This will panic, we cannot prevent that because `schema.table`
+        // returns an Option
+        schema.table(location).await;
+    }
 }
diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs
index f2392c49a4..6ca8dfe927 100644
--- a/datafusion-cli/src/exec.rs
+++ b/datafusion-cli/src/exec.rs
@@ -18,23 +18,20 @@
 //! Execution functions
 
 use std::collections::HashMap;
+use std::fs::File;
 use std::io::prelude::*;
 use std::io::BufReader;
 use std::time::Instant;
-use std::{fs::File, sync::Arc};
 
 use crate::print_format::PrintFormat;
 use crate::{
     command::{Command, OutputFormat},
     helper::{unescape_input, CliHelper},
-    object_storage::{
-        get_gcs_object_store_builder, get_oss_object_store_builder,
-        get_s3_object_store_builder,
-    },
+    object_storage::get_object_store,
     print_options::{MaxRows, PrintOptions},
 };
 
-use datafusion::common::{exec_datafusion_err, plan_datafusion_err};
+use datafusion::common::plan_datafusion_err;
 use datafusion::datasource::listing::ListingTableUrl;
 use datafusion::datasource::physical_plan::is_plan_streaming;
 use datafusion::error::{DataFusionError, Result};
@@ -45,8 +42,6 @@ use datafusion::sql::{parser::DFParser, 
sqlparser::dialect::dialect_from_str};
 
 use datafusion::logical_expr::dml::CopyTo;
 use datafusion::sql::parser::Statement;
-use object_store::http::HttpBuilder;
-use object_store::ObjectStore;
 use rustyline::error::ReadlineError;
 use rustyline::Editor;
 use tokio::signal;
@@ -280,8 +275,13 @@ async fn register_object_store(
     copy_to: &mut CopyTo,
 ) -> Result<(), DataFusionError> {
     let url = ListingTableUrl::parse(copy_to.output_url.as_str())?;
-    let store =
-        get_object_store(ctx, &mut HashMap::new(), url.scheme(), 
url.as_ref()).await?;
+    let store = get_object_store(
+        &ctx.state(),
+        &mut HashMap::new(),
+        url.scheme(),
+        url.as_ref(),
+    )
+    .await?;
     ctx.runtime_env().register_object_store(url.as_ref(), store);
     Ok(())
 }
@@ -295,50 +295,12 @@ async fn create_external_table(
     let url: &Url = table_path.as_ref();
 
     // registering the cloud object store dynamically using cmd.options
-    let store = get_object_store(ctx, &mut cmd.options, scheme, url).await?;
-
+    let store = get_object_store(&ctx.state(), &mut cmd.options, scheme, 
url).await?;
     ctx.runtime_env().register_object_store(url, store);
 
     Ok(())
 }
 
-async fn get_object_store(
-    ctx: &SessionContext,
-    options: &mut HashMap<String, String>,
-    scheme: &str,
-    url: &Url,
-) -> Result<Arc<dyn ObjectStore>, DataFusionError> {
-    let store = match scheme {
-        "s3" => {
-            let builder = get_s3_object_store_builder(url, options).await?;
-            Arc::new(builder.build()?) as Arc<dyn ObjectStore>
-        }
-        "oss" => {
-            let builder = get_oss_object_store_builder(url, options)?;
-            Arc::new(builder.build()?) as Arc<dyn ObjectStore>
-        }
-        "gs" | "gcs" => {
-            let builder = get_gcs_object_store_builder(url, options)?;
-            Arc::new(builder.build()?) as Arc<dyn ObjectStore>
-        }
-        "http" | "https" => Arc::new(
-            HttpBuilder::new()
-                .with_url(url.origin().ascii_serialization())
-                .build()?,
-        ) as Arc<dyn ObjectStore>,
-        _ => {
-            // for other types, try to get from the object_store_registry
-            ctx.runtime_env()
-                .object_store_registry
-                .get_store(url)
-                .map_err(|_| {
-                    exec_datafusion_err!("Unsupported object store scheme: 
{}", scheme)
-                })?
-        }
-    };
-    Ok(store)
-}
-
 #[cfg(test)]
 mod tests {
     use std::str::FromStr;
diff --git a/datafusion-cli/src/object_storage.rs 
b/datafusion-cli/src/object_storage.rs
index 1d090a665c..897f379655 100644
--- a/datafusion-cli/src/object_storage.rs
+++ b/datafusion-cli/src/object_storage.rs
@@ -17,8 +17,12 @@
 
 use async_trait::async_trait;
 use aws_credential_types::provider::ProvideCredentials;
+use datafusion::common::exec_datafusion_err;
 use datafusion::error::{DataFusionError, Result};
+use datafusion::execution::context::SessionState;
 use object_store::aws::AwsCredential;
+use object_store::http::HttpBuilder;
+use object_store::ObjectStore;
 use object_store::{
     aws::AmazonS3Builder, gcp::GoogleCloudStorageBuilder, CredentialProvider,
 };
@@ -156,6 +160,44 @@ fn get_bucket_name(url: &Url) -> Result<&str> {
     })
 }
 
+pub(crate) async fn get_object_store(
+    state: &SessionState,
+    options: &mut HashMap<String, String>,
+    scheme: &str,
+    url: &Url,
+) -> Result<Arc<dyn ObjectStore>, DataFusionError> {
+    let store = match scheme {
+        "s3" => {
+            let builder = get_s3_object_store_builder(url, options).await?;
+            Arc::new(builder.build()?) as Arc<dyn ObjectStore>
+        }
+        "oss" => {
+            let builder = get_oss_object_store_builder(url, options)?;
+            Arc::new(builder.build()?) as Arc<dyn ObjectStore>
+        }
+        "gs" | "gcs" => {
+            let builder = get_gcs_object_store_builder(url, options)?;
+            Arc::new(builder.build()?) as Arc<dyn ObjectStore>
+        }
+        "http" | "https" => Arc::new(
+            HttpBuilder::new()
+                .with_url(url.origin().ascii_serialization())
+                .build()?,
+        ) as Arc<dyn ObjectStore>,
+        _ => {
+            // for other types, try to get from the object_store_registry
+            state
+                .runtime_env()
+                .object_store_registry
+                .get_store(url)
+                .map_err(|_| {
+                    exec_datafusion_err!("Unsupported object store scheme: 
{}", scheme)
+                })?
+        }
+    };
+    Ok(store)
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/docs/source/user-guide/cli.md b/docs/source/user-guide/cli.md
index a8a9d6f212..a94e2427ea 100644
--- a/docs/source/user-guide/cli.md
+++ b/docs/source/user-guide/cli.md
@@ -194,8 +194,9 @@ DataFusion CLI v16.0.0
 2 rows in set. Query took 0.007 seconds.
 ```
 
-You can also query directly from the remote location via HTTP(S) without
-registering the location as a table
+You can also query directly from any remote location supported by DataFusion 
without
+registering the location as a table.
+For example, to read from a remote parquet file via HTTP(S) you can use the 
following:
 
 ```sql
 select count(*) from 
'https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_1.parquet'
@@ -207,6 +208,15 @@ select count(*) from 
'https://datasets.clickhouse.com/hits_compatible/athena_par
 1 row in set. Query took 0.595 seconds.
 ```
 
+To read from an AWS S3 or GCS, use `s3` or `gs` as a protocol prefix. For 
example, this will read a file  
+in S3 bucket named `my-data-bucket`. Note that this is not a real file 
location and therefore the query
+will fail, you need to use your own file location in S3. Also, you need to set 
the relevent access credentials
+as environmental variables (e.g. for AWS S3 you need to at least 
`AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`)
+
+```sql
+select count(*) from 's3://my-data-bucket/athena_partitioned/hits.parquet'
+```
+
 ## Creating External Tables
 
 It is also possible to create a table backed by files by explicitly

Reply via email to