roeap commented on code in PR #3380:
URL: https://github.com/apache/arrow-rs/pull/3380#discussion_r1054727275


##########
object_store/src/http/mod.rs:
##########
@@ -0,0 +1,281 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An object store implementation for generic HTTP servers
+//!
+//! This follows [rfc2518] commonly known called [WebDAV]
+//!
+//! Basic get support will work out of the box with most HTTP servers,
+//! even those that don't explicitly support [rfc2518]
+//!
+//! Other operations such as list, delete, copy, etc... will likely
+//! require server-side configuration. A list of HTTP servers with support
+//! can be found [here](https://wiki.archlinux.org/title/WebDAV#Server)
+//!
+//! Multipart uploads are not currently supported
+//!
+//! [rfc2518]: https://datatracker.ietf.org/doc/html/rfc2518
+//! [WebDAV]: https://en.wikipedia.org/wiki/WebDAV
+
+use std::ops::Range;
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::stream::BoxStream;
+use futures::{StreamExt, TryStreamExt};
+use snafu::{OptionExt, ResultExt, Snafu};
+use tokio::io::AsyncWrite;
+use url::Url;
+
+use crate::http::client::Client;
+use crate::path::Path;
+use crate::{
+    ClientOptions, GetResult, ListResult, MultipartId, ObjectMeta, 
ObjectStore, Result,
+    RetryConfig,
+};
+
+mod client;
+
+#[derive(Debug, Snafu)]
+enum Error {
+    #[snafu(display("Must specify a URL"))]
+    MissingUrl,
+
+    #[snafu(display("Invalid URL: {}", source))]
+    InvalidUrl { source: reqwest::Error },
+
+    #[snafu(display("Object is a directory"))]
+    IsDirectory,
+
+    #[snafu(display("PROPFIND response contained no valid objects"))]
+    NoObjects,
+
+    #[snafu(display("PROPFIND response contained more than one object"))]
+    MultipleObjects,
+
+    #[snafu(display("Request error: {}", source))]
+    Reqwest { source: reqwest::Error },
+}
+
+impl From<Error> for crate::Error {
+    fn from(err: Error) -> Self {
+        Self::Generic {
+            store: "HTTP",
+            source: Box::new(err),
+        }
+    }
+}
+
+/// An [`ObjectStore`] implementation for generic HTTP servers
+///
+/// See [`crate::http`] for more information
+#[derive(Debug)]
+pub struct HttpStore {
+    client: Client,
+}
+
+impl std::fmt::Display for HttpStore {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "HttpStore")
+    }
+}
+
+#[async_trait]
+impl ObjectStore for HttpStore {
+    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+        self.client.put(location, bytes).await
+    }
+
+    async fn put_multipart(
+        &self,
+        _location: &Path,
+    ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+        Err(super::Error::NotImplemented)
+    }
+
+    async fn abort_multipart(
+        &self,
+        _location: &Path,
+        _multipart_id: &MultipartId,
+    ) -> Result<()> {
+        Err(super::Error::NotImplemented)
+    }
+
+    async fn get(&self, location: &Path) -> Result<GetResult> {
+        let response = self.client.get(location, None).await?;
+        let stream = response
+            .bytes_stream()
+            .map_err(|source| Error::Reqwest { source }.into())
+            .boxed();
+
+        Ok(GetResult::Stream(stream))
+    }
+
+    async fn get_range(&self, location: &Path, range: Range<usize>) -> 
Result<Bytes> {
+        let bytes = self
+            .client
+            .get(location, Some(range))
+            .await?
+            .bytes()
+            .await
+            .context(ReqwestSnafu)?;
+        Ok(bytes)
+    }
+
+    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
+        let status = self.client.list(Some(location), "0").await?;
+        match status.response.len() {
+            1 => {
+                let response = status.response.into_iter().next().unwrap();
+                response.check_ok()?;
+                match response.is_dir() {
+                    true => Err(Error::IsDirectory.into()),
+                    false => response.object_meta(self.client.base_url()),
+                }
+            }
+            0 => Err(Error::NoObjects.into()),
+            _ => Err(Error::MultipleObjects.into()),
+        }
+    }
+
+    async fn delete(&self, location: &Path) -> Result<()> {
+        self.client.delete(location).await
+    }
+
+    async fn list(
+        &self,
+        prefix: Option<&Path>,
+    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+        let status = self.client.list(prefix, "infinity").await?;
+        Ok(futures::stream::iter(
+            status
+                .response
+                .into_iter()
+                .filter(|r| !r.is_dir())
+                .map(|response| {
+                    response.check_ok()?;
+                    response.object_meta(self.client.base_url())
+                }),
+        )
+        .boxed())
+    }
+
+    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult> {
+        let status = self.client.list(prefix, "1").await?;
+        let prefix_len = prefix.map(|p| p.as_ref().len()).unwrap_or(0);
+
+        let mut objects: Vec<ObjectMeta> = 
Vec::with_capacity(status.response.len());
+        let mut common_prefixes = Vec::with_capacity(status.response.len());
+        for response in status.response {
+            response.check_ok()?;
+            match response.is_dir() {
+                false => 
objects.push(response.object_meta(self.client.base_url())?),
+                true => {
+                    let path = response.path(self.client.base_url())?;
+                    // Exclude the current object
+                    if path.as_ref().len() > prefix_len {
+                        common_prefixes.push(path);
+                    }
+                }
+            }
+        }
+
+        Ok(ListResult {
+            common_prefixes,
+            objects,
+        })
+    }
+
+    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+        self.client.copy(from, to, true).await
+    }
+
+    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
+        self.client.copy(from, to, false).await
+    }
+}
+
+/// Configure a connection to a generic HTTP server
+#[derive(Debug, Default)]
+pub struct HttpBuilder {
+    url: Option<Result<Url>>,
+    client_options: ClientOptions,
+    retry_config: RetryConfig,
+}
+
+impl HttpBuilder {

Review Comment:
   should we add a `from_env` method here as well, to pick up the `HTTP_URL` 
variable? That said, while i don't see much conflicting configs in the env, 
`HTTP_URL` is about as generic as it gets 😆 - is something like 
`HTTP_STORE_URL` maybe more telling?



##########
object_store/src/http/client.rs:
##########
@@ -0,0 +1,372 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::client::retry::{self, RetryConfig, RetryExt};
+use crate::path::{Path, DELIMITER};
+use crate::util::{deserialize_rfc1123, format_http_range};
+use crate::{ClientOptions, ObjectMeta, Result};
+use bytes::{Buf, Bytes};
+use chrono::{DateTime, Utc};
+use percent_encoding::percent_decode_str;
+use reqwest::header::{CONTENT_TYPE, RANGE};
+use reqwest::{Method, Response, StatusCode};
+use serde::Deserialize;
+use snafu::{OptionExt, ResultExt, Snafu};
+use std::ops::Range;
+use url::Url;
+
+#[derive(Debug, Snafu)]
+enum Error {
+    #[snafu(display("Request error: {}", source))]
+    Request { source: retry::Error },
+
+    #[snafu(display("Request error: {}", source))]
+    Reqwest { source: reqwest::Error },
+
+    #[snafu(display("Error decoding PROPFIND response: {}", source))]
+    InvalidPropFind { source: quick_xml::de::DeError },
+
+    #[snafu(display("Missing content size for {}", href))]
+    MissingSize { href: String },
+
+    #[snafu(display("Error getting properties of \"{}\" got \"{}\"", href, 
status))]
+    PropStatus { href: String, status: String },
+
+    #[snafu(display("Failed to parse href \"{}\": {}", href, source))]
+    InvalidHref {
+        href: String,
+        source: url::ParseError,
+    },
+
+    #[snafu(display("Path \"{}\" contained non-unicode characters: {}", path, 
source))]
+    NonUnicode {
+        path: String,
+        source: std::str::Utf8Error,
+    },
+
+    #[snafu(display("Encountered invalid path \"{}\": {}", path, source))]
+    InvalidPath {
+        path: String,
+        source: crate::path::Error,
+    },
+}
+
+impl From<Error> for crate::Error {
+    fn from(err: Error) -> Self {
+        Self::Generic {
+            store: "HTTP",
+            source: Box::new(err),
+        }
+    }
+}
+
+/// Internal client for HttpStore
+#[derive(Debug)]
+pub struct Client {
+    url: Url,
+    client: reqwest::Client,
+    retry_config: RetryConfig,
+    client_options: ClientOptions,
+}
+
+impl Client {
+    pub fn new(
+        url: Url,
+        client_options: ClientOptions,
+        retry_config: RetryConfig,
+    ) -> Result<Self> {
+        let client = client_options.client()?;
+        Ok(Self {
+            url,
+            retry_config,
+            client_options,
+            client,
+        })
+    }
+
+    pub fn base_url(&self) -> &Url {
+        &self.url
+    }
+
+    fn path_url(&self, location: &Path) -> Url {
+        let mut url = self.url.clone();
+        url.path_segments_mut().unwrap().extend(location.parts());
+        url
+    }
+
+    /// Create a directory with `path` using MKCOL
+    async fn make_directory(&self, path: &str) -> Result<(), Error> {
+        let method = Method::from_bytes(b"MKCOL").unwrap();
+        let mut url = self.url.clone();
+        url.path_segments_mut()
+            .unwrap()
+            .extend(path.split(DELIMITER));
+
+        self.client
+            .request(method, url)
+            .send_retry(&self.retry_config)
+            .await
+            .context(RequestSnafu)?;
+
+        Ok(())
+    }
+
+    /// Recursively create parent directories
+    async fn create_parent_directories(&self, location: &Path) -> Result<()> {
+        let mut stack = vec![];
+
+        // Walk backwards until a request succeeds
+        let mut last_prefix = location.as_ref();
+        while let Some((prefix, _)) = last_prefix.rsplit_once(DELIMITER) {
+            last_prefix = prefix;
+
+            match self.make_directory(prefix).await {
+                Ok(_) => break,
+                Err(Error::Request { source })
+                    if matches!(source.status(), Some(StatusCode::CONFLICT)) =>
+                {
+                    // Need to create parent
+                    stack.push(prefix)
+                }
+                Err(e) => return Err(e.into()),
+            }
+        }
+
+        // Retry the failed requests, which should now succeed
+        for prefix in stack.into_iter().rev() {
+            self.make_directory(prefix).await?;
+        }
+
+        Ok(())
+    }
+
+    pub async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+        let mut retry = false;
+        loop {
+            let url = self.path_url(location);
+            let mut builder = self.client.put(url).body(bytes.clone());
+            if let Some(value) = 
self.client_options.get_content_type(location) {
+                builder = builder.header(CONTENT_TYPE, value);
+            }
+
+            match builder.send_retry(&self.retry_config).await {
+                Ok(_) => return Ok(()),
+                Err(source) => match source.status() {
+                    // Some implementations return 404 instead of 409
+                    Some(StatusCode::CONFLICT | StatusCode::NOT_FOUND) if 
!retry => {
+                        retry = true;
+                        self.create_parent_directories(location).await?
+                    }
+                    _ => return Err(Error::Request { source }.into()),
+                },
+            }
+        }
+    }
+
+    pub async fn list(
+        &self,
+        location: Option<&Path>,
+        depth: &str,
+    ) -> Result<MultiStatus> {
+        let url = location
+            .map(|path| self.path_url(path))
+            .unwrap_or_else(|| self.url.clone());
+
+        let method = Method::from_bytes(b"PROPFIND").unwrap();
+        let result = self
+            .client
+            .request(method, url)
+            .header("Depth", depth)
+            .send_retry(&self.retry_config)
+            .await;
+
+        let response = match result {
+            Ok(result) => result.bytes().await.context(ReqwestSnafu)?,
+            Err(e) if matches!(e.status(), Some(StatusCode::NOT_FOUND)) => {
+                return match depth {
+                    "0" => {
+                        let path = location.map(|x| x.as_ref()).unwrap_or("");
+                        Err(crate::Error::NotFound {
+                            path: path.to_string(),
+                            source: Box::new(e),
+                        })
+                    }
+                    _ => {
+                        // If prefix not found, return empty result set
+                        Ok(Default::default())
+                    }
+                };
+            }
+            Err(source) => return Err(Error::Request { source }.into()),
+        };
+
+        let status = quick_xml::de::from_reader(response.reader())
+            .context(InvalidPropFindSnafu)?;
+        Ok(status)
+    }
+
+    pub async fn delete(&self, path: &Path) -> Result<()> {
+        let url = self.path_url(path);
+        self.client
+            .delete(url)
+            .send_retry(&self.retry_config)
+            .await
+            .context(RequestSnafu)?;
+        Ok(())
+    }
+
+    pub async fn get(
+        &self,
+        location: &Path,
+        range: Option<Range<usize>>,
+    ) -> Result<Response> {
+        let url = self.path_url(location);
+        let mut builder = self.client.get(url);
+
+        if let Some(range) = range {
+            builder = builder.header(RANGE, format_http_range(range));
+        }
+
+        builder
+            .send_retry(&self.retry_config)
+            .await
+            .map_err(|source| match source.status() {
+                Some(StatusCode::NOT_FOUND) => crate::Error::NotFound {
+                    source: Box::new(source),
+                    path: location.to_string(),
+                },
+                _ => Error::Request { source }.into(),
+            })
+    }
+
+    pub async fn copy(&self, from: &Path, to: &Path, overwrite: bool) -> 
Result<()> {
+        let from = self.path_url(from);
+        let to = self.path_url(to);
+        let method = Method::from_bytes(b"COPY").unwrap();
+
+        let mut builder = self
+            .client
+            .request(method, from)
+            .header("Destination", to.as_str());
+
+        if !overwrite {
+            builder = builder.header("Overwrite", "F");
+        }
+
+        match builder.send_retry(&self.retry_config).await {
+            Ok(_) => Ok(()),
+            Err(e)
+                if !overwrite
+                    && matches!(e.status(), 
Some(StatusCode::PRECONDITION_FAILED)) =>
+            {
+                Err(crate::Error::AlreadyExists {
+                    path: to.to_string(),
+                    source: Box::new(e),
+                })
+            }
+            Err(source) => Err(Error::Request { source }.into()),
+        }
+    }
+}
+
+/// The response returned by a PROPFIND request, i.e. list
+#[derive(Deserialize, Default)]
+pub struct MultiStatus {
+    pub response: Vec<MultiStatusResponse>,
+}
+
+#[derive(Deserialize)]
+pub struct MultiStatusResponse {
+    href: String,
+    #[serde(rename = "propstat")]
+    prop_stat: PropStat,
+}
+
+impl MultiStatusResponse {
+    /// Returns an error if this response is not OK
+    pub fn check_ok(&self) -> Result<()> {
+        match self.prop_stat.status.contains("200 OK") {
+            true => Ok(()),
+            false => Err(Error::PropStatus {
+                href: self.href.clone(),
+                status: self.prop_stat.status.clone(),
+            }
+            .into()),

Review Comment:
   Would that be simpler as if / else? 



##########
object_store/src/http/mod.rs:
##########
@@ -0,0 +1,281 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An object store implementation for generic HTTP servers
+//!
+//! This follows [rfc2518] commonly known called [WebDAV]
+//!
+//! Basic get support will work out of the box with most HTTP servers,
+//! even those that don't explicitly support [rfc2518]
+//!
+//! Other operations such as list, delete, copy, etc... will likely
+//! require server-side configuration. A list of HTTP servers with support
+//! can be found [here](https://wiki.archlinux.org/title/WebDAV#Server)
+//!
+//! Multipart uploads are not currently supported
+//!
+//! [rfc2518]: https://datatracker.ietf.org/doc/html/rfc2518
+//! [WebDAV]: https://en.wikipedia.org/wiki/WebDAV
+
+use std::ops::Range;
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::stream::BoxStream;
+use futures::{StreamExt, TryStreamExt};
+use snafu::{OptionExt, ResultExt, Snafu};
+use tokio::io::AsyncWrite;
+use url::Url;
+
+use crate::http::client::Client;
+use crate::path::Path;
+use crate::{
+    ClientOptions, GetResult, ListResult, MultipartId, ObjectMeta, 
ObjectStore, Result,
+    RetryConfig,
+};
+
+mod client;
+
+#[derive(Debug, Snafu)]
+enum Error {
+    #[snafu(display("Must specify a URL"))]
+    MissingUrl,
+
+    #[snafu(display("Invalid URL: {}", source))]
+    InvalidUrl { source: reqwest::Error },
+
+    #[snafu(display("Object is a directory"))]
+    IsDirectory,
+
+    #[snafu(display("PROPFIND response contained no valid objects"))]
+    NoObjects,
+
+    #[snafu(display("PROPFIND response contained more than one object"))]
+    MultipleObjects,
+
+    #[snafu(display("Request error: {}", source))]
+    Reqwest { source: reqwest::Error },
+}
+
+impl From<Error> for crate::Error {
+    fn from(err: Error) -> Self {
+        Self::Generic {
+            store: "HTTP",
+            source: Box::new(err),
+        }
+    }
+}
+
+/// An [`ObjectStore`] implementation for generic HTTP servers
+///
+/// See [`crate::http`] for more information
+#[derive(Debug)]
+pub struct HttpStore {
+    client: Client,
+}
+
+impl std::fmt::Display for HttpStore {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "HttpStore")
+    }
+}
+
+#[async_trait]
+impl ObjectStore for HttpStore {
+    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+        self.client.put(location, bytes).await
+    }
+
+    async fn put_multipart(
+        &self,
+        _location: &Path,
+    ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+        Err(super::Error::NotImplemented)
+    }
+
+    async fn abort_multipart(
+        &self,
+        _location: &Path,
+        _multipart_id: &MultipartId,
+    ) -> Result<()> {
+        Err(super::Error::NotImplemented)
+    }
+
+    async fn get(&self, location: &Path) -> Result<GetResult> {
+        let response = self.client.get(location, None).await?;
+        let stream = response
+            .bytes_stream()
+            .map_err(|source| Error::Reqwest { source }.into())
+            .boxed();
+
+        Ok(GetResult::Stream(stream))
+    }
+
+    async fn get_range(&self, location: &Path, range: Range<usize>) -> 
Result<Bytes> {
+        let bytes = self
+            .client
+            .get(location, Some(range))
+            .await?
+            .bytes()
+            .await
+            .context(ReqwestSnafu)?;
+        Ok(bytes)
+    }
+
+    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
+        let status = self.client.list(Some(location), "0").await?;
+        match status.response.len() {
+            1 => {
+                let response = status.response.into_iter().next().unwrap();
+                response.check_ok()?;
+                match response.is_dir() {
+                    true => Err(Error::IsDirectory.into()),
+                    false => response.object_meta(self.client.base_url()),
+                }

Review Comment:
   same as before, albeit here I fine the match syntax quite concise 😆 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to