This is an automated email from the ASF dual-hosted git repository.

timsaucer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 6c8292f  feat: expose http object store (#885)
6c8292f is described below

commit 6c8292f8c6c67d79868a87bc16e0b06ac14d7f0b
Author: Daniel Mesejo <[email protected]>
AuthorDate: Sun Oct 6 23:41:45 2024 +0200

    feat: expose http object store (#885)
    
    * feat: expose HTTP ObjectStore
    
    The objective is to allow the user to register CSV directly from
    an HTTP URL, delaying downloading the file until required
    
    * chore: return PyResult
---
 Cargo.toml                            |  2 +-
 python/datafusion/context.py          |  4 +++-
 python/datafusion/object_store.py     | 12 ++----------
 python/tests/test_sql.py              | 12 ++++++++++--
 python/tests/test_store.py            | 10 ++--------
 python/tests/test_wrapper_coverage.py |  2 +-
 src/context.rs                        |  1 +
 src/store.rs                          | 31 +++++++++++++++++++++++++++++++
 8 files changed, 51 insertions(+), 23 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 4f26023..d2d3e79 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -47,7 +47,7 @@ uuid = { version = "1.9", features = ["v4"] }
 mimalloc = { version = "0.1", optional = true, default-features = false, 
features = ["local_dynamic_tls"] }
 async-trait = "0.1"
 futures = "0.3"
-object_store = { version = "0.11.0", features = ["aws", "gcp", "azure"] }
+object_store = { version = "0.11.0", features = ["aws", "gcp", "azure", 
"http"] }
 parking_lot = "0.12"
 regex-syntax = "0.8"
 syn = "2.0.79"
diff --git a/python/datafusion/context.py b/python/datafusion/context.py
index b08e62d..957d7e3 100644
--- a/python/datafusion/context.py
+++ b/python/datafusion/context.py
@@ -450,7 +450,9 @@ class SessionContext:
 
         self.ctx = SessionContextInternal(config, runtime)
 
-    def register_object_store(self, schema: str, store: Any, host: str | None) 
-> None:
+    def register_object_store(
+        self, schema: str, store: Any, host: str | None = None
+    ) -> None:
         """Add a new object store into the session.
 
         Args:
diff --git a/python/datafusion/object_store.py 
b/python/datafusion/object_store.py
index c927e76..7cc1750 100644
--- a/python/datafusion/object_store.py
+++ b/python/datafusion/object_store.py
@@ -22,14 +22,6 @@ AmazonS3 = object_store.AmazonS3
 GoogleCloud = object_store.GoogleCloud
 LocalFileSystem = object_store.LocalFileSystem
 MicrosoftAzure = object_store.MicrosoftAzure
+Http = object_store.Http
 
-__all__ = [
-    "AmazonS3",
-    "GoogleCloud",
-    "LocalFileSystem",
-    "MicrosoftAzure",
-]
-
-
-def __getattr__(name):
-    return getattr(object_store, name)
+__all__ = ["AmazonS3", "GoogleCloud", "LocalFileSystem", "MicrosoftAzure", 
"Http"]
diff --git a/python/tests/test_sql.py b/python/tests/test_sql.py
index e39a9f5..39e5ffe 100644
--- a/python/tests/test_sql.py
+++ b/python/tests/test_sql.py
@@ -22,7 +22,7 @@ import pyarrow as pa
 from pyarrow.csv import write_csv
 import pyarrow.dataset as ds
 import pytest
-from datafusion.object_store import LocalFileSystem
+from datafusion.object_store import Http
 
 from datafusion import udf, col
 
@@ -139,6 +139,15 @@ def test_register_csv_list(ctx, tmp_path):
     assert int_sum == 2 * sum(int_values)
 
 
+def test_register_http_csv(ctx):
+    url = 
"https://raw.githubusercontent.com/ibis-project/testing-data/refs/heads/master/csv/diamonds.csv";
+    ctx.register_object_store("", Http(url))
+    ctx.register_csv("remote", url)
+    assert ctx.table_exist("remote")
+    res, *_ = ctx.sql("SELECT COUNT(*) AS total FROM remote").to_pylist()
+    assert res["total"] > 0
+
+
 def test_register_parquet(ctx, tmp_path):
     path = helpers.write_parquet(tmp_path / "a.parquet", helpers.data())
     ctx.register_parquet("t", path)
@@ -494,7 +503,6 @@ def test_register_listing_table(
 
     dir_root = f"file://{dir_root}/" if path_to_str else dir_root
 
-    ctx.register_object_store("file://local", LocalFileSystem(), None)
     ctx.register_listing_table(
         "my_table",
         dir_root,
diff --git a/python/tests/test_store.py b/python/tests/test_store.py
index 3ffd9ee..f85b283 100644
--- a/python/tests/test_store.py
+++ b/python/tests/test_store.py
@@ -16,21 +16,15 @@
 # under the License.
 
 import os
+
 import pytest
 
 from datafusion import SessionContext
-from datafusion.object_store import LocalFileSystem
-
-
[email protected]
-def local():
-    return LocalFileSystem()
 
 
 @pytest.fixture
-def ctx(local):
+def ctx():
     ctx = SessionContext()
-    ctx.register_object_store("file://local", local, None)
     return ctx
 
 
diff --git a/python/tests/test_wrapper_coverage.py 
b/python/tests/test_wrapper_coverage.py
index c53a89c..86f2d57 100644
--- a/python/tests/test_wrapper_coverage.py
+++ b/python/tests/test_wrapper_coverage.py
@@ -55,7 +55,7 @@ def missing_exports(internal_obj, wrapped_obj) -> None:
 
 
 def test_datafusion_missing_exports() -> None:
-    """Check for any missing pythone exports.
+    """Check for any missing python exports.
 
     This test verifies that every exposed class, attribute, and function in
     the internal (pyo3) module is also exposed in our python wrappers.
diff --git a/src/context.rs b/src/context.rs
index 5317a3e..f445874 100644
--- a/src/context.rs
+++ b/src/context.rs
@@ -312,6 +312,7 @@ impl PySessionContext {
             StorageContexts::GoogleCloudStorage(gcs) => (gcs.inner, 
gcs.bucket_name),
             StorageContexts::MicrosoftAzure(azure) => (azure.inner, 
azure.container_name),
             StorageContexts::LocalFileSystem(local) => (local.inner, 
"".to_string()),
+            StorageContexts::HTTP(http) => (http.store, http.url),
         };
 
         // let users override the host to match the api signature from upstream
diff --git a/src/store.rs b/src/store.rs
index 846d96a..1e5fab4 100644
--- a/src/store.rs
+++ b/src/store.rs
@@ -22,7 +22,10 @@ use pyo3::prelude::*;
 use object_store::aws::{AmazonS3, AmazonS3Builder};
 use object_store::azure::{MicrosoftAzure, MicrosoftAzureBuilder};
 use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder};
+use object_store::http::{HttpBuilder, HttpStore};
 use object_store::local::LocalFileSystem;
+use pyo3::exceptions::PyValueError;
+use url::Url;
 
 #[derive(FromPyObject)]
 pub enum StorageContexts {
@@ -30,6 +33,7 @@ pub enum StorageContexts {
     GoogleCloudStorage(PyGoogleCloudContext),
     MicrosoftAzure(PyMicrosoftAzureContext),
     LocalFileSystem(PyLocalFileSystemContext),
+    HTTP(PyHttpContext),
 }
 
 #[pyclass(name = "LocalFileSystem", module = "datafusion.store", subclass)]
@@ -219,10 +223,37 @@ impl PyAmazonS3Context {
     }
 }
 
+#[pyclass(name = "Http", module = "datafusion.store", subclass)]
+#[derive(Debug, Clone)]
+pub struct PyHttpContext {
+    pub url: String,
+    pub store: Arc<HttpStore>,
+}
+
+#[pymethods]
+impl PyHttpContext {
+    #[new]
+    fn new(url: String) -> PyResult<Self> {
+        let store = match Url::parse(url.as_str()) {
+            Ok(url) => HttpBuilder::new()
+                .with_url(url.origin().ascii_serialization())
+                .build(),
+            Err(_) => HttpBuilder::new().build(),
+        }
+        .map_err(|e| PyValueError::new_err(format!("Error: {:?}", 
e.to_string())))?;
+
+        Ok(Self {
+            url,
+            store: Arc::new(store),
+        })
+    }
+}
+
 pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
     m.add_class::<PyAmazonS3Context>()?;
     m.add_class::<PyMicrosoftAzureContext>()?;
     m.add_class::<PyGoogleCloudContext>()?;
     m.add_class::<PyLocalFileSystemContext>()?;
+    m.add_class::<PyHttpContext>()?;
     Ok(())
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to