This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 11b2f5fecc Object tagging (#4754)  (#4999)
11b2f5fecc is described below

commit 11b2f5fecc257d97005f2393ee17777ed5d38e7c
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Oct 30 11:30:35 2023 +0000

    Object tagging (#4754)  (#4999)
    
    * Object tagging (#4754)
    
    * Allow disabling tagging
    
    * Rename to disable_tagging
---
 object_store/src/aws/builder.rs   | 22 ++++++++++
 object_store/src/aws/client.rs    | 23 +++++++++++
 object_store/src/aws/mod.rs       | 16 +++++++-
 object_store/src/azure/builder.rs | 22 ++++++++++
 object_store/src/azure/client.rs  | 27 ++++++++++++-
 object_store/src/azure/mod.rs     |  7 ++++
 object_store/src/lib.rs           | 85 ++++++++++++++++++++++++++++++++++++++-
 object_store/src/tags.rs          | 60 +++++++++++++++++++++++++++
 8 files changed, 258 insertions(+), 4 deletions(-)

diff --git a/object_store/src/aws/builder.rs b/object_store/src/aws/builder.rs
index 79ea75b5ab..cf9490d96e 100644
--- a/object_store/src/aws/builder.rs
+++ b/object_store/src/aws/builder.rs
@@ -155,6 +155,8 @@ pub struct AmazonS3Builder {
     copy_if_not_exists: Option<ConfigValue<S3CopyIfNotExists>>,
     /// Put precondition
     conditional_put: Option<ConfigValue<S3ConditionalPut>>,
+    /// Ignore tags
+    disable_tagging: ConfigValue<bool>,
 }
 
 /// Configuration keys for [`AmazonS3Builder`]
@@ -299,6 +301,15 @@ pub enum AmazonS3ConfigKey {
     /// Skip signing request
     SkipSignature,
 
+    /// Disable tagging objects
+    ///
+    /// This can be desirable if not supported by the backing store
+    ///
+    /// Supported keys:
+    /// - `aws_disable_tagging`
+    /// - `disable_tagging`
+    DisableTagging,
+
     /// Client options
     Client(ClientConfigKey),
 }
@@ -322,6 +333,7 @@ impl AsRef<str> for AmazonS3ConfigKey {
             Self::SkipSignature => "aws_skip_signature",
             Self::CopyIfNotExists => "aws_copy_if_not_exists",
             Self::ConditionalPut => "aws_conditional_put",
+            Self::DisableTagging => "aws_disable_tagging",
             Self::Client(opt) => opt.as_ref(),
         }
     }
@@ -350,6 +362,7 @@ impl FromStr for AmazonS3ConfigKey {
             "aws_skip_signature" | "skip_signature" => Ok(Self::SkipSignature),
             "aws_copy_if_not_exists" | "copy_if_not_exists" => 
Ok(Self::CopyIfNotExists),
             "aws_conditional_put" | "conditional_put" => 
Ok(Self::ConditionalPut),
+            "aws_disable_tagging" | "disable_tagging" => 
Ok(Self::DisableTagging),
             // Backwards compatibility
             "aws_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)),
             _ => match s.parse() {
@@ -453,6 +466,7 @@ impl AmazonS3Builder {
                 self.client_options = self.client_options.with_config(key, 
value)
             }
             AmazonS3ConfigKey::SkipSignature => 
self.skip_signature.parse(value),
+            AmazonS3ConfigKey::DisableTagging => 
self.disable_tagging.parse(value),
             AmazonS3ConfigKey::CopyIfNotExists => {
                 self.copy_if_not_exists = 
Some(ConfigValue::Deferred(value.into()))
             }
@@ -525,6 +539,7 @@ impl AmazonS3Builder {
             AmazonS3ConfigKey::ConditionalPut => {
                 self.conditional_put.as_ref().map(ToString::to_string)
             }
+            AmazonS3ConfigKey::DisableTagging => 
Some(self.disable_tagging.to_string()),
         }
     }
 
@@ -735,6 +750,12 @@ impl AmazonS3Builder {
         self
     }
 
+    /// If set to `true` will ignore any tags provided to put_opts
+    pub fn with_disable_tagging(mut self, ignore: bool) -> Self {
+        self.disable_tagging = ignore.into();
+        self
+    }
+
     /// Create a [`AmazonS3`] instance from the provided values,
     /// consuming `self`.
     pub fn build(mut self) -> Result<AmazonS3> {
@@ -851,6 +872,7 @@ impl AmazonS3Builder {
             client_options: self.client_options,
             sign_payload: !self.unsigned_payload.get()?,
             skip_signature: self.skip_signature.get()?,
+            disable_tagging: self.disable_tagging.get()?,
             checksum,
             copy_if_not_exists,
             conditional_put: put_precondition,
diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index 20c2a96b57..3e47abd4bc 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -204,6 +204,7 @@ pub struct S3Config {
     pub client_options: ClientOptions,
     pub sign_payload: bool,
     pub skip_signature: bool,
+    pub disable_tagging: bool,
     pub checksum: Option<Checksum>,
     pub copy_if_not_exists: Option<S3CopyIfNotExists>,
     pub conditional_put: Option<S3ConditionalPut>,
@@ -588,6 +589,28 @@ impl S3Client {
             version,
         })
     }
+
+    #[cfg(test)]
+    pub async fn get_object_tagging(&self, path: &Path) -> Result<Response> {
+        let credential = self.config.get_credential().await?;
+        let url = format!("{}?tagging", self.config.path_url(path));
+        let response = self
+            .client
+            .request(Method::GET, url)
+            .with_aws_sigv4(
+                credential.as_deref(),
+                &self.config.region,
+                "s3",
+                self.config.sign_payload,
+                None,
+            )
+            .send_retry(&self.config.retry_config)
+            .await
+            .context(GetRequestSnafu {
+                path: path.as_ref(),
+            })?;
+        Ok(response)
+    }
 }
 
 #[async_trait]
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index 99e6376950..cbb3cffdf4 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -35,7 +35,7 @@ use async_trait::async_trait;
 use bytes::Bytes;
 use futures::stream::BoxStream;
 use futures::{StreamExt, TryStreamExt};
-use reqwest::header::{IF_MATCH, IF_NONE_MATCH};
+use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH};
 use reqwest::Method;
 use std::{sync::Arc, time::Duration};
 use tokio::io::AsyncWrite;
@@ -52,6 +52,8 @@ use crate::{
     PutOptions, PutResult, Result,
 };
 
+static TAGS_HEADER: HeaderName = HeaderName::from_static("x-amz-tagging");
+
 mod builder;
 mod checksum;
 mod client;
@@ -160,7 +162,12 @@ impl Signer for AmazonS3 {
 #[async_trait]
 impl ObjectStore for AmazonS3 {
     async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) 
-> Result<PutResult> {
-        let request = self.client.put_request(location, bytes);
+        let mut request = self.client.put_request(location, bytes);
+        let tags = opts.tags.encoded();
+        if !tags.is_empty() && !self.client.config().disable_tagging {
+            request = request.header(&TAGS_HEADER, tags);
+        }
+
         match (opts.mode, &self.client.config().conditional_put) {
             (PutMode::Overwrite, _) => request.send().await,
             (PutMode::Create | PutMode::Update(_), None) => 
Err(Error::NotImplemented),
@@ -342,6 +349,11 @@ mod tests {
         stream_get(&integration).await;
         multipart(&integration, &integration).await;
 
+        tagging(&integration, !config.disable_tagging, |p| {
+            let client = Arc::clone(&integration.client);
+            async move { client.get_object_tagging(&p).await }
+        })
+        .await;
         if test_not_exists {
             copy_if_not_exists(&integration).await;
         }
diff --git a/object_store/src/azure/builder.rs 
b/object_store/src/azure/builder.rs
index 02e0762b6d..6bd2b265b5 100644
--- a/object_store/src/azure/builder.rs
+++ b/object_store/src/azure/builder.rs
@@ -173,6 +173,8 @@ pub struct MicrosoftAzureBuilder {
     ///
     /// i.e. https://{account_name}.dfs.fabric.microsoft.com
     use_fabric_endpoint: ConfigValue<bool>,
+    /// When set to true, skips tagging objects
+    disable_tagging: ConfigValue<bool>,
 }
 
 /// Configuration keys for [`MicrosoftAzureBuilder`]
@@ -321,6 +323,15 @@ pub enum AzureConfigKey {
     /// - `container_name`
     ContainerName,
 
+    /// Disables tagging objects
+    ///
+    /// This can be desirable if not supported by the backing store
+    ///
+    /// Supported keys:
+    /// - `azure_disable_tagging`
+    /// - `disable_tagging`
+    DisableTagging,
+
     /// Client options
     Client(ClientConfigKey),
 }
@@ -344,6 +355,7 @@ impl AsRef<str> for AzureConfigKey {
             Self::FederatedTokenFile => "azure_federated_token_file",
             Self::UseAzureCli => "azure_use_azure_cli",
             Self::ContainerName => "azure_container_name",
+            Self::DisableTagging => "azure_disable_tagging",
             Self::Client(key) => key.as_ref(),
         }
     }
@@ -387,6 +399,7 @@ impl FromStr for AzureConfigKey {
             "azure_use_fabric_endpoint" | "use_fabric_endpoint" => 
Ok(Self::UseFabricEndpoint),
             "azure_use_azure_cli" | "use_azure_cli" => Ok(Self::UseAzureCli),
             "azure_container_name" | "container_name" => 
Ok(Self::ContainerName),
+            "azure_disable_tagging" | "disable_tagging" => 
Ok(Self::DisableTagging),
             // Backwards compatibility
             "azure_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)),
             _ => match s.parse() {
@@ -503,6 +516,7 @@ impl MicrosoftAzureBuilder {
                 self.client_options = self.client_options.with_config(key, 
value)
             }
             AzureConfigKey::ContainerName => self.container_name = 
Some(value.into()),
+            AzureConfigKey::DisableTagging => 
self.disable_tagging.parse(value),
         };
         self
     }
@@ -556,6 +570,7 @@ impl MicrosoftAzureBuilder {
             AzureConfigKey::UseAzureCli => 
Some(self.use_azure_cli.to_string()),
             AzureConfigKey::Client(key) => 
self.client_options.get_config_value(key),
             AzureConfigKey::ContainerName => self.container_name.clone(),
+            AzureConfigKey::DisableTagging => 
Some(self.disable_tagging.to_string()),
         }
     }
 
@@ -781,6 +796,12 @@ impl MicrosoftAzureBuilder {
         self
     }
 
+    /// If set to `true` will ignore any tags provided to put_opts
+    pub fn with_disable_tagging(mut self, ignore: bool) -> Self {
+        self.disable_tagging = ignore.into();
+        self
+    }
+
     /// Configure a connection to container with given name on Microsoft Azure 
Blob store.
     pub fn build(mut self) -> Result<MicrosoftAzure> {
         if let Some(url) = self.url.take() {
@@ -885,6 +906,7 @@ impl MicrosoftAzureBuilder {
             account,
             is_emulator,
             container,
+            disable_tagging: self.disable_tagging.get()?,
             retry_config: self.retry_config,
             client_options: self.client_options,
             service: storage_url,
diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs
index c7bd791498..3c71e69da0 100644
--- a/object_store/src/azure/client.rs
+++ b/object_store/src/azure/client.rs
@@ -50,6 +50,8 @@ use url::Url;
 
 const VERSION_HEADER: &str = "x-ms-version-id";
 
+static TAGS_HEADER: HeaderName = HeaderName::from_static("x-ms-tags");
+
 /// A specialized `Error` for object store-related errors
 #[derive(Debug, Snafu)]
 #[allow(missing_docs)]
@@ -124,11 +126,12 @@ pub(crate) struct AzureConfig {
     pub retry_config: RetryConfig,
     pub service: Url,
     pub is_emulator: bool,
+    pub disable_tagging: bool,
     pub client_options: ClientOptions,
 }
 
 impl AzureConfig {
-    fn path_url(&self, path: &Path) -> Url {
+    pub(crate) fn path_url(&self, path: &Path) -> Url {
         let mut url = self.service.clone();
         {
             let mut path_mut = url.path_segments_mut().unwrap();
@@ -229,6 +232,11 @@ impl AzureClient {
             }
         };
 
+        let builder = match (opts.tags.encoded(), self.config.disable_tagging) 
{
+            ("", _) | (_, true) => builder,
+            (tags, false) => builder.header(&TAGS_HEADER, tags),
+        };
+
         let response = builder.header(&BLOB_TYPE, "BlockBlob").send().await?;
         Ok(get_put_result(response.headers(), 
VERSION_HEADER).context(MetadataSnafu)?)
     }
@@ -315,6 +323,23 @@ impl AzureClient {
 
         Ok(())
     }
+
+    #[cfg(test)]
+    pub async fn get_blob_tagging(&self, path: &Path) -> Result<Response> {
+        let credential = self.get_credential().await?;
+        let url = self.config.path_url(path);
+        let response = self
+            .client
+            .request(Method::GET, url)
+            .query(&[("comp", "tags")])
+            .with_azure_authorization(&credential, &self.config.account)
+            .send_retry(&self.config.retry_config)
+            .await
+            .context(GetRequestSnafu {
+                path: path.as_ref(),
+            })?;
+        Ok(response)
+    }
 }
 
 #[async_trait]
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index 762a51dd9d..1d51cbdc02 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -202,6 +202,13 @@ mod tests {
         stream_get(&integration).await;
         put_opts(&integration, true).await;
         multipart(&integration, &integration).await;
+
+        let validate = !integration.client.config().disable_tagging;
+        tagging(&integration, validate, |p| {
+            let client = Arc::clone(&integration.client);
+            async move { client.get_blob_tagging(&p).await }
+        })
+        .await
     }
 
     #[test]
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 66964304e8..51203ca4a4 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -263,6 +263,10 @@ pub use client::{
 #[cfg(feature = "cloud")]
 mod config;
 
+mod tags;
+
+pub use tags::TagSet;
+
 pub mod multipart;
 mod parse;
 mod util;
@@ -893,11 +897,27 @@ impl From<PutResult> for UpdateVersion {
 pub struct PutOptions {
     /// Configure the [`PutMode`] for this operation
     pub mode: PutMode,
+    /// Provide a [`TagSet`] for this object
+    ///
+    /// Implementations that don't support object tagging should ignore this
+    pub tags: TagSet,
 }
 
 impl From<PutMode> for PutOptions {
     fn from(mode: PutMode) -> Self {
-        Self { mode }
+        Self {
+            mode,
+            ..Default::default()
+        }
+    }
+}
+
+impl From<TagSet> for PutOptions {
+    fn from(tags: TagSet) -> Self {
+        Self {
+            tags,
+            ..Default::default()
+        }
     }
 }
 
@@ -1015,6 +1035,7 @@ mod tests {
     use chrono::TimeZone;
     use futures::stream::FuturesUnordered;
     use rand::{thread_rng, Rng};
+    use std::future::Future;
     use tokio::io::AsyncWriteExt;
 
     pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) {
@@ -1882,6 +1903,68 @@ mod tests {
         assert_eq!(meta.size, chunk_size * 2);
     }
 
+    #[cfg(any(feature = "aws", feature = "azure"))]
+    pub(crate) async fn tagging<F, Fut>(storage: &dyn ObjectStore, validate: 
bool, get_tags: F)
+    where
+        F: Fn(Path) -> Fut + Send + Sync,
+        Fut: Future<Output = Result<reqwest::Response>> + Send,
+    {
+        use bytes::Buf;
+        use serde::Deserialize;
+
+        #[derive(Deserialize)]
+        struct Tagging {
+            #[serde(rename = "TagSet")]
+            list: TagList,
+        }
+
+        #[derive(Debug, Deserialize)]
+        struct TagList {
+            #[serde(rename = "Tag")]
+            tags: Vec<Tag>,
+        }
+
+        #[derive(Debug, Deserialize, Eq, PartialEq)]
+        #[serde(rename_all = "PascalCase")]
+        struct Tag {
+            key: String,
+            value: String,
+        }
+
+        let tags = vec![
+            Tag {
+                key: "foo.com=bar/s".to_string(),
+                value: "bananas/foo.com-_".to_string(),
+            },
+            Tag {
+                key: "namespace/key.foo".to_string(),
+                value: "value with a space".to_string(),
+            },
+        ];
+        let mut tag_set = TagSet::default();
+        for t in &tags {
+            tag_set.push(&t.key, &t.value)
+        }
+
+        let path = Path::from("tag_test");
+        storage
+            .put_opts(&path, "test".into(), tag_set.into())
+            .await
+            .unwrap();
+
+        // Write should always succeed, but certain configurations may simply 
ignore tags
+        if !validate {
+            return;
+        }
+
+        let resp = get_tags(path.clone()).await.unwrap();
+        let body = resp.bytes().await.unwrap();
+
+        let mut resp: Tagging = 
quick_xml::de::from_reader(body.reader()).unwrap();
+        resp.list.tags.sort_by(|a, b| a.key.cmp(&b.key));
+        assert_eq!(resp.list.tags, tags);
+    }
+
     async fn delete_fixtures(storage: &DynObjectStore) {
         let paths = storage.list(None).map_ok(|meta| meta.location).boxed();
         storage
diff --git a/object_store/src/tags.rs b/object_store/src/tags.rs
new file mode 100644
index 0000000000..fa6e5913f4
--- /dev/null
+++ b/object_store/src/tags.rs
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use url::form_urlencoded::Serializer;
+
+/// A collection of key value pairs used to annotate objects
+///
+/// <https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-tagging.html>
+/// <https://learn.microsoft.com/en-us/rest/api/storageservices/set-blob-tags>
+#[derive(Debug, Clone, Default, Eq, PartialEq)]
+pub struct TagSet(String);
+
+impl TagSet {
+    /// Append a key value pair to this [`TagSet`]
+    ///
+    /// Stores have different restrictions on what characters are permitted,
+    /// for portability it is recommended applications use no more than 10 
tags,
+    /// and stick to alphanumeric characters, and `+ - = . _ : /`
+    ///
+    /// 
<https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObjectTagging.html>
+    /// 
<https://learn.microsoft.com/en-us/rest/api/storageservices/set-blob-tags?tabs=azure-ad#request-body>
+    pub fn push(&mut self, key: &str, value: &str) {
+        Serializer::new(&mut self.0).append_pair(key, value);
+    }
+
+    /// Return this [`TagSet`] as a URL-encoded string
+    pub fn encoded(&self) -> &str {
+        &self.0
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_tag_set() {
+        let mut set = TagSet::default();
+        set.push("test/foo", "value sdlks");
+        set.push("foo", " sdf _ /+./sd");
+        assert_eq!(
+            set.encoded(),
+            "test%2Ffoo=value+sdlks&foo=+sdf+_+%2F%2B.%2Fsd"
+        );
+    }
+}

Reply via email to