This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 431be3facb Perform HEAD request for HttpStore::head (#4837)
431be3facb is described below

commit 431be3facb0645528397aa800166089e4a21a834
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Sat Sep 23 18:18:45 2023 +0100

    Perform HEAD request for HttpStore::head (#4837)
    
    * Perform HEAD request for HttpStore::head
    
    * Logical merge conflicts
    
    * Review feedback
---
 object_store/src/client/get.rs    | 20 ++++++---
 object_store/src/client/header.rs | 11 +----
 object_store/src/client/mod.rs    |  1 -
 object_store/src/http/client.rs   | 90 +++++++++++++++++++++++++--------------
 object_store/src/http/mod.rs      | 47 +++-----------------
 5 files changed, 78 insertions(+), 91 deletions(-)

diff --git a/object_store/src/client/get.rs b/object_store/src/client/get.rs
index 8b84a079c7..333f6fe584 100644
--- a/object_store/src/client/get.rs
+++ b/object_store/src/client/get.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::client::header::header_meta;
+use crate::client::header::{header_meta, HeaderConfig};
 use crate::path::Path;
 use crate::{Error, GetOptions, GetResult, ObjectMeta};
 use crate::{GetResultPayload, Result};
@@ -28,6 +28,12 @@ use reqwest::Response;
 pub trait GetClient: Send + Sync + 'static {
     const STORE: &'static str;
 
+    /// Configure the [`HeaderConfig`] for this client
+    const HEADER_CONFIG: HeaderConfig = HeaderConfig {
+        etag_required: true,
+        last_modified_required: true,
+    };
+
     async fn get_request(
         &self,
         path: &Path,
@@ -49,10 +55,12 @@ impl<T: GetClient> GetClientExt for T {
     async fn get_opts(&self, location: &Path, options: GetOptions) -> 
Result<GetResult> {
         let range = options.range.clone();
         let response = self.get_request(location, options, false).await?;
-        let meta = header_meta(location, response.headers(), 
Default::default())
-            .map_err(|e| Error::Generic {
-                store: T::STORE,
-                source: Box::new(e),
+        let meta =
+            header_meta(location, response.headers(), 
T::HEADER_CONFIG).map_err(|e| {
+                Error::Generic {
+                    store: T::STORE,
+                    source: Box::new(e),
+                }
             })?;
 
         let stream = response
@@ -73,7 +81,7 @@ impl<T: GetClient> GetClientExt for T {
     async fn head(&self, location: &Path) -> Result<ObjectMeta> {
         let options = GetOptions::default();
         let response = self.get_request(location, options, true).await?;
-        header_meta(location, response.headers(), 
Default::default()).map_err(|e| {
+        header_meta(location, response.headers(), 
T::HEADER_CONFIG).map_err(|e| {
             Error::Generic {
                 store: T::STORE,
                 source: Box::new(e),
diff --git a/object_store/src/client/header.rs 
b/object_store/src/client/header.rs
index b55494cdb8..6499eff5ae 100644
--- a/object_store/src/client/header.rs
+++ b/object_store/src/client/header.rs
@@ -24,7 +24,7 @@ use hyper::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED};
 use hyper::HeaderMap;
 use snafu::{OptionExt, ResultExt, Snafu};
 
-#[derive(Debug)]
+#[derive(Debug, Copy, Clone)]
 /// Configuration for header extraction
 pub struct HeaderConfig {
     /// Whether to require an ETag header when extracting [`ObjectMeta`] from 
headers.
@@ -37,15 +37,6 @@ pub struct HeaderConfig {
     pub last_modified_required: bool,
 }
 
-impl Default for HeaderConfig {
-    fn default() -> Self {
-        Self {
-            etag_required: true,
-            last_modified_required: true,
-        }
-    }
-}
-
 #[derive(Debug, Snafu)]
 pub enum Error {
     #[snafu(display("ETag Header missing from response"))]
diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs
index 77b14a7587..ee9d62a44f 100644
--- a/object_store/src/client/mod.rs
+++ b/object_store/src/client/mod.rs
@@ -27,7 +27,6 @@ pub mod retry;
 #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
 pub mod pagination;
 
-#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
 pub mod get;
 
 #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs
index 67a4129174..0bd2e5639c 100644
--- a/object_store/src/http/client.rs
+++ b/object_store/src/http/client.rs
@@ -15,11 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::client::get::GetClient;
+use crate::client::header::HeaderConfig;
 use crate::client::retry::{self, RetryConfig, RetryExt};
 use crate::client::GetOptionsExt;
 use crate::path::{Path, DELIMITER};
 use crate::util::deserialize_rfc1123;
 use crate::{ClientOptions, GetOptions, ObjectMeta, Result};
+use async_trait::async_trait;
 use bytes::{Buf, Bytes};
 use chrono::{DateTime, Utc};
 use percent_encoding::percent_decode_str;
@@ -238,39 +241,6 @@ impl Client {
         Ok(())
     }
 
-    pub async fn get(&self, location: &Path, options: GetOptions) -> 
Result<Response> {
-        let url = self.path_url(location);
-        let builder = self.client.get(url);
-        let has_range = options.range.is_some();
-
-        let res = builder
-            .with_get_options(options)
-            .send_retry(&self.retry_config)
-            .await
-            .map_err(|source| match source.status() {
-                // Some stores return METHOD_NOT_ALLOWED for get on directories
-                Some(StatusCode::NOT_FOUND | StatusCode::METHOD_NOT_ALLOWED) 
=> {
-                    crate::Error::NotFound {
-                        source: Box::new(source),
-                        path: location.to_string(),
-                    }
-                }
-                _ => Error::Request { source }.into(),
-            })?;
-
-        // We expect a 206 Partial Content response if a range was requested
-        // a 200 OK response would indicate the server did not fulfill the 
request
-        if has_range && res.status() != StatusCode::PARTIAL_CONTENT {
-            return Err(crate::Error::NotSupported {
-                source: Box::new(Error::RangeNotSupported {
-                    href: location.to_string(),
-                }),
-            });
-        }
-
-        Ok(res)
-    }
-
     pub async fn copy(&self, from: &Path, to: &Path, overwrite: bool) -> 
Result<()> {
         let mut retry = false;
         loop {
@@ -307,6 +277,60 @@ impl Client {
     }
 }
 
+#[async_trait]
+impl GetClient for Client {
+    const STORE: &'static str = "HTTP";
+
+    /// Override the [`HeaderConfig`] to be less strict to support a
+    /// broader range of HTTP servers (#4831)
+    const HEADER_CONFIG: HeaderConfig = HeaderConfig {
+        etag_required: false,
+        last_modified_required: false,
+    };
+
+    async fn get_request(
+        &self,
+        location: &Path,
+        options: GetOptions,
+        head: bool,
+    ) -> Result<Response> {
+        let url = self.path_url(location);
+        let method = match head {
+            true => Method::HEAD,
+            false => Method::GET,
+        };
+        let has_range = options.range.is_some();
+        let builder = self.client.request(method, url);
+
+        let res = builder
+            .with_get_options(options)
+            .send_retry(&self.retry_config)
+            .await
+            .map_err(|source| match source.status() {
+                // Some stores return METHOD_NOT_ALLOWED for get on directories
+                Some(StatusCode::NOT_FOUND | StatusCode::METHOD_NOT_ALLOWED) 
=> {
+                    crate::Error::NotFound {
+                        source: Box::new(source),
+                        path: location.to_string(),
+                    }
+                }
+                _ => Error::Request { source }.into(),
+            })?;
+
+        // We expect a 206 Partial Content response if a range was requested
+        // a 200 OK response would indicate the server did not fulfill the 
request
+        if has_range && res.status() != StatusCode::PARTIAL_CONTENT {
+            return Err(crate::Error::NotSupported {
+                source: Box::new(Error::RangeNotSupported {
+                    href: location.to_string(),
+                }),
+            });
+        }
+
+        Ok(res)
+    }
+}
+
 /// The response returned by a PROPFIND request, i.e. list
 #[derive(Deserialize, Default)]
 pub struct MultiStatus {
diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs
index 6143819756..afbc0ce437 100644
--- a/object_store/src/http/mod.rs
+++ b/object_store/src/http/mod.rs
@@ -34,18 +34,18 @@
 use async_trait::async_trait;
 use bytes::Bytes;
 use futures::stream::BoxStream;
-use futures::{StreamExt, TryStreamExt};
+use futures::StreamExt;
 use itertools::Itertools;
 use snafu::{OptionExt, ResultExt, Snafu};
 use tokio::io::AsyncWrite;
 use url::Url;
 
-use crate::client::header::{header_meta, HeaderConfig};
+use crate::client::get::GetClientExt;
 use crate::http::client::Client;
 use crate::path::Path;
 use crate::{
-    ClientConfigKey, ClientOptions, GetOptions, GetResult, GetResultPayload, 
ListResult,
-    MultipartId, ObjectMeta, ObjectStore, Result, RetryConfig,
+    ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, 
MultipartId,
+    ObjectMeta, ObjectStore, Result, RetryConfig,
 };
 
 mod client;
@@ -115,46 +115,11 @@ impl ObjectStore for HttpStore {
     }
 
     async fn get_opts(&self, location: &Path, options: GetOptions) -> 
Result<GetResult> {
-        let range = options.range.clone();
-        let response = self.client.get(location, options).await?;
-        let cfg = HeaderConfig {
-            last_modified_required: false,
-            etag_required: false,
-        };
-        let meta =
-            header_meta(location, response.headers(), 
cfg).context(MetadataSnafu)?;
-
-        let stream = response
-            .bytes_stream()
-            .map_err(|source| Error::Reqwest { source }.into())
-            .boxed();
-
-        Ok(GetResult {
-            payload: GetResultPayload::Stream(stream),
-            range: range.unwrap_or(0..meta.size),
-            meta,
-        })
+        self.client.get_opts(location, options).await
     }
 
     async fn head(&self, location: &Path) -> Result<ObjectMeta> {
-        let status = self.client.list(Some(location), "0").await?;
-        match status.response.len() {
-            1 => {
-                let response = status.response.into_iter().next().unwrap();
-                response.check_ok()?;
-                match response.is_dir() {
-                    true => Err(crate::Error::NotFound {
-                        path: location.to_string(),
-                        source: "Is directory".to_string().into(),
-                    }),
-                    false => response.object_meta(self.client.base_url()),
-                }
-            }
-            x => Err(crate::Error::NotFound {
-                path: location.to_string(),
-                source: format!("Expected 1 result, got {x}").into(),
-            }),
-        }
+        self.client.head(location).await
     }
 
     async fn delete(&self, location: &Path) -> Result<()> {

Reply via email to