This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git
The following commit(s) were added to refs/heads/main by this push:
new 8d3d831 Support `object_store` with wasm: Default
wasm32-unknown-unknown HttpConnector (#329)
8d3d831 is described below
commit 8d3d8319dbaae7c07d7bd52339a0d0cca033e9ac
Author: Nicholas Roberts <[email protected]>
AuthorDate: Tue Apr 22 00:42:37 2025 +1000
Support `object_store` with wasm: Default wasm32-unknown-unknown
HttpConnector (#329)
* Implement default wasm32-unknown-unknown (JS platform dependent)
HttpConnector.
* Disable all test blocks that strictly require the fs feature on wasm32,
as well as those that use the MockServer. Very basic test wasm tests, address
dropped/unused receiver in wasm bridge code
* CI wasm32-unknown-unknown tests
* Install node for wasm tests in CI
* cargo fmt
* Hoist use/imports in wasm HttpService
---
.cargo/config.toml | 2 ++
.github/workflows/ci.yml | 7 ++++++
Cargo.toml | 18 ++++++++++++++
src/client/body.rs | 8 ++++++
src/client/connection.rs | 64 ++++++++++++++++++++++++++++++++++++++++++++----
src/client/mod.rs | 19 +++++++++++++-
src/client/retry.rs | 4 +++
src/parse.rs | 2 +-
src/prefix.rs | 1 +
src/throttle.rs | 1 +
tests/http.rs | 23 +++++++++++++++++
11 files changed, 142 insertions(+), 7 deletions(-)
diff --git a/.cargo/config.toml b/.cargo/config.toml
new file mode 100644
index 0000000..cb9ecf9
--- /dev/null
+++ b/.cargo/config.toml
@@ -0,0 +1,2 @@
+[target.wasm32-unknown-unknown]
+rustflags = ['--cfg', 'getrandom_backend="wasm_js"']
\ No newline at end of file
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 308a304..f227da0 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -192,6 +192,13 @@ jobs:
run: rustup target add wasm32-wasip1
- name: Build wasm32-wasip1
run: cargo build --all-features --target wasm32-wasip1
+ - name: Install wasm-pack
+ run: curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf
| sh
+ - uses: actions/setup-node@v4
+ with:
+ node-version: 20
+ - name: Run wasm32-unknown-unknown tests (via Node)
+ run: wasm-pack test --node --features http --no-default-features
windows:
name: cargo test LocalFileSystem (win64)
diff --git a/Cargo.toml b/Cargo.toml
index e3aa72f..4e8bc74 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -64,6 +64,10 @@ tokio = { version = "1.29.0", features = ["sync", "macros",
"rt", "time", "io-ut
[target.'cfg(target_family="unix")'.dev-dependencies]
nix = { version = "0.29.0", features = ["fs"] }
+[target.'cfg(all(target_arch = "wasm32", target_os = "unknown"))'.dependencies]
+web-time = { version = "1.1.0" }
+wasm-bindgen-futures = "0.4.18"
+
[features]
default = ["fs"]
cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest",
"reqwest/stream", "chrono/serde", "base64", "rand", "ring", "http-body-util",
"form_urlencoded", "serde_urlencoded"]
@@ -84,6 +88,20 @@ regex = "1.11.1"
# The "gzip" feature for reqwest is enabled for an integration test.
reqwest = { version = "0.12", features = ["gzip"] }
+[target.'cfg(all(target_arch = "wasm32", target_os =
"unknown"))'.dev-dependencies]
+wasm-bindgen-test = "*"
+
+[dev-dependencies.getrandom_v03]
+package = "getrandom"
+version = "0.3"
+features = ["wasm_js"]
+
+[dev-dependencies.getrandom_v02]
+package = "getrandom"
+version = "0.2"
+features = ["js"]
+
[[test]]
name = "get_range_file"
path = "tests/get_range_file.rs"
+required-features = ["fs"]
diff --git a/src/client/body.rs b/src/client/body.rs
index 8f62afa..ed87972 100644
--- a/src/client/body.rs
+++ b/src/client/body.rs
@@ -49,6 +49,14 @@ impl HttpRequestBody {
}
}
+ #[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
+ pub(crate) fn into_reqwest(self) -> reqwest::Body {
+ match self.0 {
+ Inner::Bytes(b) => b.into(),
+ Inner::PutPayload(_, payload) => Bytes::from(payload).into(),
+ }
+ }
+
/// Returns true if this body is empty
pub fn is_empty(&self) -> bool {
match &self.0 {
diff --git a/src/client/connection.rs b/src/client/connection.rs
index 7e2daf4..ac4db5c 100644
--- a/src/client/connection.rs
+++ b/src/client/connection.rs
@@ -224,6 +224,60 @@ impl HttpService for reqwest::Client {
}
}
+#[async_trait]
+#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
+impl HttpService for reqwest::Client {
+ async fn call(&self, req: HttpRequest) -> Result<HttpResponse, HttpError> {
+ use futures::{
+ channel::{mpsc, oneshot},
+ SinkExt, StreamExt, TryStreamExt,
+ };
+ use http_body_util::{Empty, StreamBody};
+ use wasm_bindgen_futures::spawn_local;
+
+ let (parts, body) = req.into_parts();
+ let url = parts.uri.to_string().parse().unwrap();
+ let mut req = reqwest::Request::new(parts.method, url);
+ *req.headers_mut() = parts.headers;
+ *req.body_mut() = Some(body.into_reqwest());
+
+ let (mut tx, rx) = mpsc::channel(1);
+ let (tx_parts, rx_parts) = oneshot::channel();
+ let res_fut = self.execute(req);
+
+ spawn_local(async move {
+ match res_fut.await.map_err(HttpError::reqwest) {
+ Err(err) => {
+ let _ = tx_parts.send(Err(err));
+ drop(tx);
+ }
+ Ok(res) => {
+ let (mut parts, _) =
http::Response::new(Empty::<()>::new()).into_parts();
+ parts.headers = res.headers().clone();
+ parts.status = res.status();
+ let _ = tx_parts.send(Ok(parts));
+ let mut stream =
res.bytes_stream().map_err(HttpError::reqwest);
+ while let Some(chunk) = stream.next().await {
+ if let Err(_e) = tx.send(chunk).await {
+ // Disconnected due to a transitive drop of the
receiver
+ break;
+ }
+ }
+ }
+ }
+ });
+
+ let parts = rx_parts.await.unwrap()?;
+ let safe_stream = rx.map(|chunk| {
+ let frame = hyper::body::Frame::data(chunk?);
+ Ok(frame)
+ });
+ let body = HttpResponseBody::new(StreamBody::new(safe_stream));
+
+ Ok(HttpResponse::from_parts(parts, body))
+ }
+}
+
/// A factory for [`HttpClient`]
pub trait HttpConnector: std::fmt::Debug + Send + Sync + 'static {
/// Create a new [`HttpClient`] with the provided [`ClientOptions`]
@@ -233,10 +287,10 @@ pub trait HttpConnector: std::fmt::Debug + Send + Sync +
'static {
/// [`HttpConnector`] using [`reqwest::Client`]
#[derive(Debug, Default)]
#[allow(missing_copy_implementations)]
-#[cfg(not(target_arch = "wasm32"))]
+#[cfg(not(all(target_arch = "wasm32", target_os = "wasi")))]
pub struct ReqwestConnector {}
-#[cfg(not(target_arch = "wasm32"))]
+#[cfg(not(all(target_arch = "wasm32", target_os = "wasi")))]
impl HttpConnector for ReqwestConnector {
fn connect(&self, options: &ClientOptions) -> crate::Result<HttpClient> {
let client = options.client()?;
@@ -244,21 +298,21 @@ impl HttpConnector for ReqwestConnector {
}
}
-#[cfg(target_arch = "wasm32")]
+#[cfg(all(target_arch = "wasm32", target_os = "wasi"))]
pub(crate) fn http_connector(
custom: Option<Arc<dyn HttpConnector>>,
) -> crate::Result<Arc<dyn HttpConnector>> {
match custom {
Some(x) => Ok(x),
None => Err(crate::Error::NotSupported {
- source: "WASM32 architectures must provide an HTTPConnector"
+ source: "WASI architectures must provide an HTTPConnector"
.to_string()
.into(),
}),
}
}
-#[cfg(not(target_arch = "wasm32"))]
+#[cfg(not(all(target_arch = "wasm32", target_os = "wasi")))]
pub(crate) fn http_connector(
custom: Option<Arc<dyn HttpConnector>>,
) -> crate::Result<Arc<dyn HttpConnector>> {
diff --git a/src/client/mod.rs b/src/client/mod.rs
index bd0347b..0c5dcc1 100644
--- a/src/client/mod.rs
+++ b/src/client/mod.rs
@@ -22,6 +22,7 @@ pub(crate) mod backoff;
#[cfg(not(target_arch = "wasm32"))]
mod dns;
+#[cfg(not(target_arch = "wasm32"))]
#[cfg(test)]
pub(crate) mod mock_server;
@@ -50,7 +51,7 @@ pub(crate) mod builder;
mod connection;
pub(crate) use connection::http_connector;
-#[cfg(not(target_arch = "wasm32"))]
+#[cfg(not(all(target_arch = "wasm32", target_os = "wasi")))]
pub use connection::ReqwestConnector;
pub use connection::{HttpClient, HttpConnector, HttpError, HttpErrorKind,
HttpService};
@@ -718,6 +719,22 @@ impl ClientOptions {
.build()
.map_err(map_client_error)
}
+
+ #[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
+ pub(crate) fn client(&self) -> Result<reqwest::Client> {
+ let mut builder = reqwest::ClientBuilder::new();
+
+ match &self.user_agent {
+ Some(user_agent) => builder =
builder.user_agent(user_agent.get()?),
+ None => builder = builder.user_agent(DEFAULT_USER_AGENT),
+ }
+
+ if let Some(headers) = &self.default_headers {
+ builder = builder.default_headers(headers.clone())
+ }
+
+ builder.build().map_err(map_client_error)
+ }
}
pub(crate) trait GetOptionsExt {
diff --git a/src/client/retry.rs b/src/client/retry.rs
index 96244aa..a3a1449 100644
--- a/src/client/retry.rs
+++ b/src/client/retry.rs
@@ -26,8 +26,11 @@ use futures::future::BoxFuture;
use http::{Method, Uri};
use reqwest::header::LOCATION;
use reqwest::StatusCode;
+#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
use std::time::{Duration, Instant};
use tracing::info;
+#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
+use web_time::{Duration, Instant};
/// Retry request error
#[derive(Debug, thiserror::Error)]
@@ -469,6 +472,7 @@ impl RetryExt for HttpRequestBuilder {
}
}
+#[cfg(not(target_arch = "wasm32"))]
#[cfg(test)]
mod tests {
use crate::client::mock_server::MockServer;
diff --git a/src/parse.rs b/src/parse.rs
index 00ea6cf..e37f85b 100644
--- a/src/parse.rs
+++ b/src/parse.rs
@@ -348,7 +348,7 @@ mod tests {
}
#[tokio::test]
- #[cfg(feature = "http")]
+ #[cfg(all(feature = "http", not(target_arch = "wasm32")))]
async fn test_url_http() {
use crate::client::mock_server::MockServer;
use http::{header::USER_AGENT, Response};
diff --git a/src/prefix.rs b/src/prefix.rs
index ac9803e..c2802c1 100644
--- a/src/prefix.rs
+++ b/src/prefix.rs
@@ -221,6 +221,7 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
}
}
+#[cfg(not(target_arch = "wasm32"))]
#[cfg(test)]
mod tests {
use super::*;
diff --git a/src/throttle.rs b/src/throttle.rs
index 6586ba9..efe2949 100644
--- a/src/throttle.rs
+++ b/src/throttle.rs
@@ -597,6 +597,7 @@ mod tests {
}
#[allow(dead_code)]
+ #[cfg(target_os = "linux")]
async fn measure_get(store: &ThrottledStore<InMemory>, n_bytes:
Option<usize>) -> Duration {
let path = place_test_object(store, n_bytes).await;
diff --git a/tests/http.rs b/tests/http.rs
index a9b3145..cb0b7d6 100644
--- a/tests/http.rs
+++ b/tests/http.rs
@@ -20,6 +20,9 @@
#[cfg(feature = "http")]
use object_store::{http::HttpBuilder, path::Path, GetOptions, GetRange,
ObjectStore};
+#[cfg(all(feature = "http", target_arch = "wasm32", target_os = "unknown"))]
+use wasm_bindgen_test::*;
+
/// Tests that even when reqwest has the `gzip` feature enabled, the HTTP store
/// does not error on a missing `Content-Length` header.
#[tokio::test]
@@ -41,3 +44,23 @@ async fn test_http_store_gzip() {
.await
.unwrap();
}
+
+#[cfg(all(feature = "http", target_arch = "wasm32", target_os = "unknown"))]
+#[wasm_bindgen_test]
+async fn basic_wasm_get() {
+ let http_store = HttpBuilder::new()
+
.with_url("https://raw.githubusercontent.com/apache/arrow-rs/refs/heads/main")
+ .build()
+ .unwrap();
+
+ let _ = http_store
+ .get_opts(
+ &Path::parse("LICENSE.txt").unwrap(),
+ GetOptions {
+ range: Some(GetRange::Bounded(0..100)),
+ ..Default::default()
+ },
+ )
+ .await
+ .unwrap();
+}