This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 431be3facb Perform HEAD request for HttpStore::head (#4837)
431be3facb is described below
commit 431be3facb0645528397aa800166089e4a21a834
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Sat Sep 23 18:18:45 2023 +0100
Perform HEAD request for HttpStore::head (#4837)
* Perform HEAD request for HttpStore::head
* Logical merge conflicts
* Review feedback
---
object_store/src/client/get.rs | 20 ++++++---
object_store/src/client/header.rs | 11 +----
object_store/src/client/mod.rs | 1 -
object_store/src/http/client.rs | 90 +++++++++++++++++++++++++--------------
object_store/src/http/mod.rs | 47 +++-----------------
5 files changed, 78 insertions(+), 91 deletions(-)
diff --git a/object_store/src/client/get.rs b/object_store/src/client/get.rs
index 8b84a079c7..333f6fe584 100644
--- a/object_store/src/client/get.rs
+++ b/object_store/src/client/get.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use crate::client::header::header_meta;
+use crate::client::header::{header_meta, HeaderConfig};
use crate::path::Path;
use crate::{Error, GetOptions, GetResult, ObjectMeta};
use crate::{GetResultPayload, Result};
@@ -28,6 +28,12 @@ use reqwest::Response;
pub trait GetClient: Send + Sync + 'static {
const STORE: &'static str;
+ /// Configure the [`HeaderConfig`] for this client
+ const HEADER_CONFIG: HeaderConfig = HeaderConfig {
+ etag_required: true,
+ last_modified_required: true,
+ };
+
async fn get_request(
&self,
path: &Path,
@@ -49,10 +55,12 @@ impl<T: GetClient> GetClientExt for T {
async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
let range = options.range.clone();
let response = self.get_request(location, options, false).await?;
- let meta = header_meta(location, response.headers(),
Default::default())
- .map_err(|e| Error::Generic {
- store: T::STORE,
- source: Box::new(e),
+ let meta =
+ header_meta(location, response.headers(),
T::HEADER_CONFIG).map_err(|e| {
+ Error::Generic {
+ store: T::STORE,
+ source: Box::new(e),
+ }
})?;
let stream = response
@@ -73,7 +81,7 @@ impl<T: GetClient> GetClientExt for T {
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let options = GetOptions::default();
let response = self.get_request(location, options, true).await?;
- header_meta(location, response.headers(),
Default::default()).map_err(|e| {
+ header_meta(location, response.headers(),
T::HEADER_CONFIG).map_err(|e| {
Error::Generic {
store: T::STORE,
source: Box::new(e),
diff --git a/object_store/src/client/header.rs
b/object_store/src/client/header.rs
index b55494cdb8..6499eff5ae 100644
--- a/object_store/src/client/header.rs
+++ b/object_store/src/client/header.rs
@@ -24,7 +24,7 @@ use hyper::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED};
use hyper::HeaderMap;
use snafu::{OptionExt, ResultExt, Snafu};
-#[derive(Debug)]
+#[derive(Debug, Copy, Clone)]
/// Configuration for header extraction
pub struct HeaderConfig {
/// Whether to require an ETag header when extracting [`ObjectMeta`] from
headers.
@@ -37,15 +37,6 @@ pub struct HeaderConfig {
pub last_modified_required: bool,
}
-impl Default for HeaderConfig {
- fn default() -> Self {
- Self {
- etag_required: true,
- last_modified_required: true,
- }
- }
-}
-
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("ETag Header missing from response"))]
diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs
index 77b14a7587..ee9d62a44f 100644
--- a/object_store/src/client/mod.rs
+++ b/object_store/src/client/mod.rs
@@ -27,7 +27,6 @@ pub mod retry;
#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
pub mod pagination;
-#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
pub mod get;
#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs
index 67a4129174..0bd2e5639c 100644
--- a/object_store/src/http/client.rs
+++ b/object_store/src/http/client.rs
@@ -15,11 +15,14 @@
// specific language governing permissions and limitations
// under the License.
+use crate::client::get::GetClient;
+use crate::client::header::HeaderConfig;
use crate::client::retry::{self, RetryConfig, RetryExt};
use crate::client::GetOptionsExt;
use crate::path::{Path, DELIMITER};
use crate::util::deserialize_rfc1123;
use crate::{ClientOptions, GetOptions, ObjectMeta, Result};
+use async_trait::async_trait;
use bytes::{Buf, Bytes};
use chrono::{DateTime, Utc};
use percent_encoding::percent_decode_str;
@@ -238,39 +241,6 @@ impl Client {
Ok(())
}
- pub async fn get(&self, location: &Path, options: GetOptions) ->
Result<Response> {
- let url = self.path_url(location);
- let builder = self.client.get(url);
- let has_range = options.range.is_some();
-
- let res = builder
- .with_get_options(options)
- .send_retry(&self.retry_config)
- .await
- .map_err(|source| match source.status() {
- // Some stores return METHOD_NOT_ALLOWED for get on directories
- Some(StatusCode::NOT_FOUND | StatusCode::METHOD_NOT_ALLOWED)
=> {
- crate::Error::NotFound {
- source: Box::new(source),
- path: location.to_string(),
- }
- }
- _ => Error::Request { source }.into(),
- })?;
-
- // We expect a 206 Partial Content response if a range was requested
- // a 200 OK response would indicate the server did not fulfill the
request
- if has_range && res.status() != StatusCode::PARTIAL_CONTENT {
- return Err(crate::Error::NotSupported {
- source: Box::new(Error::RangeNotSupported {
- href: location.to_string(),
- }),
- });
- }
-
- Ok(res)
- }
-
pub async fn copy(&self, from: &Path, to: &Path, overwrite: bool) ->
Result<()> {
let mut retry = false;
loop {
@@ -307,6 +277,60 @@ impl Client {
}
}
+#[async_trait]
+impl GetClient for Client {
+ const STORE: &'static str = "HTTP";
+
+ /// Override the [`HeaderConfig`] to be less strict to support a
+ /// broader range of HTTP servers (#4831)
+ const HEADER_CONFIG: HeaderConfig = HeaderConfig {
+ etag_required: false,
+ last_modified_required: false,
+ };
+
+ async fn get_request(
+ &self,
+ location: &Path,
+ options: GetOptions,
+ head: bool,
+ ) -> Result<Response> {
+ let url = self.path_url(location);
+ let method = match head {
+ true => Method::HEAD,
+ false => Method::GET,
+ };
+ let has_range = options.range.is_some();
+ let builder = self.client.request(method, url);
+
+ let res = builder
+ .with_get_options(options)
+ .send_retry(&self.retry_config)
+ .await
+ .map_err(|source| match source.status() {
+ // Some stores return METHOD_NOT_ALLOWED for get on directories
+ Some(StatusCode::NOT_FOUND | StatusCode::METHOD_NOT_ALLOWED)
=> {
+ crate::Error::NotFound {
+ source: Box::new(source),
+ path: location.to_string(),
+ }
+ }
+ _ => Error::Request { source }.into(),
+ })?;
+
+ // We expect a 206 Partial Content response if a range was requested
+ // a 200 OK response would indicate the server did not fulfill the
request
+ if has_range && res.status() != StatusCode::PARTIAL_CONTENT {
+ return Err(crate::Error::NotSupported {
+ source: Box::new(Error::RangeNotSupported {
+ href: location.to_string(),
+ }),
+ });
+ }
+
+ Ok(res)
+ }
+}
+
/// The response returned by a PROPFIND request, i.e. list
#[derive(Deserialize, Default)]
pub struct MultiStatus {
diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs
index 6143819756..afbc0ce437 100644
--- a/object_store/src/http/mod.rs
+++ b/object_store/src/http/mod.rs
@@ -34,18 +34,18 @@
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
-use futures::{StreamExt, TryStreamExt};
+use futures::StreamExt;
use itertools::Itertools;
use snafu::{OptionExt, ResultExt, Snafu};
use tokio::io::AsyncWrite;
use url::Url;
-use crate::client::header::{header_meta, HeaderConfig};
+use crate::client::get::GetClientExt;
use crate::http::client::Client;
use crate::path::Path;
use crate::{
- ClientConfigKey, ClientOptions, GetOptions, GetResult, GetResultPayload,
ListResult,
- MultipartId, ObjectMeta, ObjectStore, Result, RetryConfig,
+ ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult,
MultipartId,
+ ObjectMeta, ObjectStore, Result, RetryConfig,
};
mod client;
@@ -115,46 +115,11 @@ impl ObjectStore for HttpStore {
}
async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
- let range = options.range.clone();
- let response = self.client.get(location, options).await?;
- let cfg = HeaderConfig {
- last_modified_required: false,
- etag_required: false,
- };
- let meta =
- header_meta(location, response.headers(),
cfg).context(MetadataSnafu)?;
-
- let stream = response
- .bytes_stream()
- .map_err(|source| Error::Reqwest { source }.into())
- .boxed();
-
- Ok(GetResult {
- payload: GetResultPayload::Stream(stream),
- range: range.unwrap_or(0..meta.size),
- meta,
- })
+ self.client.get_opts(location, options).await
}
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
- let status = self.client.list(Some(location), "0").await?;
- match status.response.len() {
- 1 => {
- let response = status.response.into_iter().next().unwrap();
- response.check_ok()?;
- match response.is_dir() {
- true => Err(crate::Error::NotFound {
- path: location.to_string(),
- source: "Is directory".to_string().into(),
- }),
- false => response.object_meta(self.client.base_url()),
- }
- }
- x => Err(crate::Error::NotFound {
- path: location.to_string(),
- source: format!("Expected 1 result, got {x}").into(),
- }),
- }
+ self.client.head(location).await
}
async fn delete(&self, location: &Path) -> Result<()> {