This is an automated email from the ASF dual-hosted git repository.
timsaucer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-python.git
The following commit(s) were added to refs/heads/main by this push:
new 6c8292f feat: expose http object store (#885)
6c8292f is described below
commit 6c8292f8c6c67d79868a87bc16e0b06ac14d7f0b
Author: Daniel Mesejo <[email protected]>
AuthorDate: Sun Oct 6 23:41:45 2024 +0200
feat: expose http object store (#885)
* feat: expose HTTP ObjectStore
The objective is to allow the user to register CSV directly from
an HTTP URL, delaying downloading the file until required
* chore: return PyResult
---
Cargo.toml | 2 +-
python/datafusion/context.py | 4 +++-
python/datafusion/object_store.py | 12 ++----------
python/tests/test_sql.py | 12 ++++++++++--
python/tests/test_store.py | 10 ++--------
python/tests/test_wrapper_coverage.py | 2 +-
src/context.rs | 1 +
src/store.rs | 31 +++++++++++++++++++++++++++++++
8 files changed, 51 insertions(+), 23 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 4f26023..d2d3e79 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -47,7 +47,7 @@ uuid = { version = "1.9", features = ["v4"] }
mimalloc = { version = "0.1", optional = true, default-features = false,
features = ["local_dynamic_tls"] }
async-trait = "0.1"
futures = "0.3"
-object_store = { version = "0.11.0", features = ["aws", "gcp", "azure"] }
+object_store = { version = "0.11.0", features = ["aws", "gcp", "azure",
"http"] }
parking_lot = "0.12"
regex-syntax = "0.8"
syn = "2.0.79"
diff --git a/python/datafusion/context.py b/python/datafusion/context.py
index b08e62d..957d7e3 100644
--- a/python/datafusion/context.py
+++ b/python/datafusion/context.py
@@ -450,7 +450,9 @@ class SessionContext:
self.ctx = SessionContextInternal(config, runtime)
- def register_object_store(self, schema: str, store: Any, host: str | None)
-> None:
+ def register_object_store(
+ self, schema: str, store: Any, host: str | None = None
+ ) -> None:
"""Add a new object store into the session.
Args:
diff --git a/python/datafusion/object_store.py
b/python/datafusion/object_store.py
index c927e76..7cc1750 100644
--- a/python/datafusion/object_store.py
+++ b/python/datafusion/object_store.py
@@ -22,14 +22,6 @@ AmazonS3 = object_store.AmazonS3
GoogleCloud = object_store.GoogleCloud
LocalFileSystem = object_store.LocalFileSystem
MicrosoftAzure = object_store.MicrosoftAzure
+Http = object_store.Http
-__all__ = [
- "AmazonS3",
- "GoogleCloud",
- "LocalFileSystem",
- "MicrosoftAzure",
-]
-
-
-def __getattr__(name):
- return getattr(object_store, name)
+__all__ = ["AmazonS3", "GoogleCloud", "LocalFileSystem", "MicrosoftAzure",
"Http"]
diff --git a/python/tests/test_sql.py b/python/tests/test_sql.py
index e39a9f5..39e5ffe 100644
--- a/python/tests/test_sql.py
+++ b/python/tests/test_sql.py
@@ -22,7 +22,7 @@ import pyarrow as pa
from pyarrow.csv import write_csv
import pyarrow.dataset as ds
import pytest
-from datafusion.object_store import LocalFileSystem
+from datafusion.object_store import Http
from datafusion import udf, col
@@ -139,6 +139,15 @@ def test_register_csv_list(ctx, tmp_path):
assert int_sum == 2 * sum(int_values)
+def test_register_http_csv(ctx):
+ url =
"https://raw.githubusercontent.com/ibis-project/testing-data/refs/heads/master/csv/diamonds.csv"
+ ctx.register_object_store("", Http(url))
+ ctx.register_csv("remote", url)
+ assert ctx.table_exist("remote")
+ res, *_ = ctx.sql("SELECT COUNT(*) AS total FROM remote").to_pylist()
+ assert res["total"] > 0
+
+
def test_register_parquet(ctx, tmp_path):
path = helpers.write_parquet(tmp_path / "a.parquet", helpers.data())
ctx.register_parquet("t", path)
@@ -494,7 +503,6 @@ def test_register_listing_table(
dir_root = f"file://{dir_root}/" if path_to_str else dir_root
- ctx.register_object_store("file://local", LocalFileSystem(), None)
ctx.register_listing_table(
"my_table",
dir_root,
diff --git a/python/tests/test_store.py b/python/tests/test_store.py
index 3ffd9ee..f85b283 100644
--- a/python/tests/test_store.py
+++ b/python/tests/test_store.py
@@ -16,21 +16,15 @@
# under the License.
import os
+
import pytest
from datafusion import SessionContext
-from datafusion.object_store import LocalFileSystem
-
-
[email protected]
-def local():
- return LocalFileSystem()
@pytest.fixture
-def ctx(local):
+def ctx():
ctx = SessionContext()
- ctx.register_object_store("file://local", local, None)
return ctx
diff --git a/python/tests/test_wrapper_coverage.py
b/python/tests/test_wrapper_coverage.py
index c53a89c..86f2d57 100644
--- a/python/tests/test_wrapper_coverage.py
+++ b/python/tests/test_wrapper_coverage.py
@@ -55,7 +55,7 @@ def missing_exports(internal_obj, wrapped_obj) -> None:
def test_datafusion_missing_exports() -> None:
- """Check for any missing pythone exports.
+ """Check for any missing python exports.
This test verifies that every exposed class, attribute, and function in
the internal (pyo3) module is also exposed in our python wrappers.
diff --git a/src/context.rs b/src/context.rs
index 5317a3e..f445874 100644
--- a/src/context.rs
+++ b/src/context.rs
@@ -312,6 +312,7 @@ impl PySessionContext {
StorageContexts::GoogleCloudStorage(gcs) => (gcs.inner,
gcs.bucket_name),
StorageContexts::MicrosoftAzure(azure) => (azure.inner,
azure.container_name),
StorageContexts::LocalFileSystem(local) => (local.inner,
"".to_string()),
+ StorageContexts::HTTP(http) => (http.store, http.url),
};
// let users override the host to match the api signature from upstream
diff --git a/src/store.rs b/src/store.rs
index 846d96a..1e5fab4 100644
--- a/src/store.rs
+++ b/src/store.rs
@@ -22,7 +22,10 @@ use pyo3::prelude::*;
use object_store::aws::{AmazonS3, AmazonS3Builder};
use object_store::azure::{MicrosoftAzure, MicrosoftAzureBuilder};
use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder};
+use object_store::http::{HttpBuilder, HttpStore};
use object_store::local::LocalFileSystem;
+use pyo3::exceptions::PyValueError;
+use url::Url;
#[derive(FromPyObject)]
pub enum StorageContexts {
@@ -30,6 +33,7 @@ pub enum StorageContexts {
GoogleCloudStorage(PyGoogleCloudContext),
MicrosoftAzure(PyMicrosoftAzureContext),
LocalFileSystem(PyLocalFileSystemContext),
+ HTTP(PyHttpContext),
}
#[pyclass(name = "LocalFileSystem", module = "datafusion.store", subclass)]
@@ -219,10 +223,37 @@ impl PyAmazonS3Context {
}
}
+#[pyclass(name = "Http", module = "datafusion.store", subclass)]
+#[derive(Debug, Clone)]
+pub struct PyHttpContext {
+ pub url: String,
+ pub store: Arc<HttpStore>,
+}
+
+#[pymethods]
+impl PyHttpContext {
+ #[new]
+ fn new(url: String) -> PyResult<Self> {
+ let store = match Url::parse(url.as_str()) {
+ Ok(url) => HttpBuilder::new()
+ .with_url(url.origin().ascii_serialization())
+ .build(),
+ Err(_) => HttpBuilder::new().build(),
+ }
+ .map_err(|e| PyValueError::new_err(format!("Error: {:?}",
e.to_string())))?;
+
+ Ok(Self {
+ url,
+ store: Arc::new(store),
+ })
+ }
+}
+
pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyAmazonS3Context>()?;
m.add_class::<PyMicrosoftAzureContext>()?;
m.add_class::<PyGoogleCloudContext>()?;
m.add_class::<PyLocalFileSystemContext>()?;
+ m.add_class::<PyHttpContext>()?;
Ok(())
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]