This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 11b2f5fecc Object tagging (#4754) (#4999)
11b2f5fecc is described below
commit 11b2f5fecc257d97005f2393ee17777ed5d38e7c
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Oct 30 11:30:35 2023 +0000
Object tagging (#4754) (#4999)
* Object tagging (#4754)
* Allow disabling tagging
* Rename to disable_tagging
---
object_store/src/aws/builder.rs | 22 ++++++++++
object_store/src/aws/client.rs | 23 +++++++++++
object_store/src/aws/mod.rs | 16 +++++++-
object_store/src/azure/builder.rs | 22 ++++++++++
object_store/src/azure/client.rs | 27 ++++++++++++-
object_store/src/azure/mod.rs | 7 ++++
object_store/src/lib.rs | 85 ++++++++++++++++++++++++++++++++++++++-
object_store/src/tags.rs | 60 +++++++++++++++++++++++++++
8 files changed, 258 insertions(+), 4 deletions(-)
diff --git a/object_store/src/aws/builder.rs b/object_store/src/aws/builder.rs
index 79ea75b5ab..cf9490d96e 100644
--- a/object_store/src/aws/builder.rs
+++ b/object_store/src/aws/builder.rs
@@ -155,6 +155,8 @@ pub struct AmazonS3Builder {
copy_if_not_exists: Option<ConfigValue<S3CopyIfNotExists>>,
/// Put precondition
conditional_put: Option<ConfigValue<S3ConditionalPut>>,
+ /// Ignore tags
+ disable_tagging: ConfigValue<bool>,
}
/// Configuration keys for [`AmazonS3Builder`]
@@ -299,6 +301,15 @@ pub enum AmazonS3ConfigKey {
/// Skip signing request
SkipSignature,
+ /// Disable tagging objects
+ ///
+ /// This can be desirable if not supported by the backing store
+ ///
+ /// Supported keys:
+ /// - `aws_disable_tagging`
+ /// - `disable_tagging`
+ DisableTagging,
+
/// Client options
Client(ClientConfigKey),
}
@@ -322,6 +333,7 @@ impl AsRef<str> for AmazonS3ConfigKey {
Self::SkipSignature => "aws_skip_signature",
Self::CopyIfNotExists => "aws_copy_if_not_exists",
Self::ConditionalPut => "aws_conditional_put",
+ Self::DisableTagging => "aws_disable_tagging",
Self::Client(opt) => opt.as_ref(),
}
}
@@ -350,6 +362,7 @@ impl FromStr for AmazonS3ConfigKey {
"aws_skip_signature" | "skip_signature" => Ok(Self::SkipSignature),
"aws_copy_if_not_exists" | "copy_if_not_exists" =>
Ok(Self::CopyIfNotExists),
"aws_conditional_put" | "conditional_put" =>
Ok(Self::ConditionalPut),
+ "aws_disable_tagging" | "disable_tagging" =>
Ok(Self::DisableTagging),
// Backwards compatibility
"aws_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)),
_ => match s.parse() {
@@ -453,6 +466,7 @@ impl AmazonS3Builder {
self.client_options = self.client_options.with_config(key,
value)
}
AmazonS3ConfigKey::SkipSignature =>
self.skip_signature.parse(value),
+ AmazonS3ConfigKey::DisableTagging =>
self.disable_tagging.parse(value),
AmazonS3ConfigKey::CopyIfNotExists => {
self.copy_if_not_exists =
Some(ConfigValue::Deferred(value.into()))
}
@@ -525,6 +539,7 @@ impl AmazonS3Builder {
AmazonS3ConfigKey::ConditionalPut => {
self.conditional_put.as_ref().map(ToString::to_string)
}
+ AmazonS3ConfigKey::DisableTagging =>
Some(self.disable_tagging.to_string()),
}
}
@@ -735,6 +750,12 @@ impl AmazonS3Builder {
self
}
+ /// If set to `true` will ignore any tags provided to put_opts
+ pub fn with_disable_tagging(mut self, ignore: bool) -> Self {
+ self.disable_tagging = ignore.into();
+ self
+ }
+
/// Create a [`AmazonS3`] instance from the provided values,
/// consuming `self`.
pub fn build(mut self) -> Result<AmazonS3> {
@@ -851,6 +872,7 @@ impl AmazonS3Builder {
client_options: self.client_options,
sign_payload: !self.unsigned_payload.get()?,
skip_signature: self.skip_signature.get()?,
+ disable_tagging: self.disable_tagging.get()?,
checksum,
copy_if_not_exists,
conditional_put: put_precondition,
diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index 20c2a96b57..3e47abd4bc 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -204,6 +204,7 @@ pub struct S3Config {
pub client_options: ClientOptions,
pub sign_payload: bool,
pub skip_signature: bool,
+ pub disable_tagging: bool,
pub checksum: Option<Checksum>,
pub copy_if_not_exists: Option<S3CopyIfNotExists>,
pub conditional_put: Option<S3ConditionalPut>,
@@ -588,6 +589,28 @@ impl S3Client {
version,
})
}
+
+ #[cfg(test)]
+ pub async fn get_object_tagging(&self, path: &Path) -> Result<Response> {
+ let credential = self.config.get_credential().await?;
+ let url = format!("{}?tagging", self.config.path_url(path));
+ let response = self
+ .client
+ .request(Method::GET, url)
+ .with_aws_sigv4(
+ credential.as_deref(),
+ &self.config.region,
+ "s3",
+ self.config.sign_payload,
+ None,
+ )
+ .send_retry(&self.config.retry_config)
+ .await
+ .context(GetRequestSnafu {
+ path: path.as_ref(),
+ })?;
+ Ok(response)
+ }
}
#[async_trait]
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index 99e6376950..cbb3cffdf4 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -35,7 +35,7 @@ use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
-use reqwest::header::{IF_MATCH, IF_NONE_MATCH};
+use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH};
use reqwest::Method;
use std::{sync::Arc, time::Duration};
use tokio::io::AsyncWrite;
@@ -52,6 +52,8 @@ use crate::{
PutOptions, PutResult, Result,
};
+static TAGS_HEADER: HeaderName = HeaderName::from_static("x-amz-tagging");
+
mod builder;
mod checksum;
mod client;
@@ -160,7 +162,12 @@ impl Signer for AmazonS3 {
#[async_trait]
impl ObjectStore for AmazonS3 {
async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions)
-> Result<PutResult> {
- let request = self.client.put_request(location, bytes);
+ let mut request = self.client.put_request(location, bytes);
+ let tags = opts.tags.encoded();
+ if !tags.is_empty() && !self.client.config().disable_tagging {
+ request = request.header(&TAGS_HEADER, tags);
+ }
+
match (opts.mode, &self.client.config().conditional_put) {
(PutMode::Overwrite, _) => request.send().await,
(PutMode::Create | PutMode::Update(_), None) =>
Err(Error::NotImplemented),
@@ -342,6 +349,11 @@ mod tests {
stream_get(&integration).await;
multipart(&integration, &integration).await;
+ tagging(&integration, !config.disable_tagging, |p| {
+ let client = Arc::clone(&integration.client);
+ async move { client.get_object_tagging(&p).await }
+ })
+ .await;
if test_not_exists {
copy_if_not_exists(&integration).await;
}
diff --git a/object_store/src/azure/builder.rs
b/object_store/src/azure/builder.rs
index 02e0762b6d..6bd2b265b5 100644
--- a/object_store/src/azure/builder.rs
+++ b/object_store/src/azure/builder.rs
@@ -173,6 +173,8 @@ pub struct MicrosoftAzureBuilder {
///
/// i.e. https://{account_name}.dfs.fabric.microsoft.com
use_fabric_endpoint: ConfigValue<bool>,
+ /// When set to true, skips tagging objects
+ disable_tagging: ConfigValue<bool>,
}
/// Configuration keys for [`MicrosoftAzureBuilder`]
@@ -321,6 +323,15 @@ pub enum AzureConfigKey {
/// - `container_name`
ContainerName,
+ /// Disables tagging objects
+ ///
+ /// This can be desirable if not supported by the backing store
+ ///
+ /// Supported keys:
+ /// - `azure_disable_tagging`
+ /// - `disable_tagging`
+ DisableTagging,
+
/// Client options
Client(ClientConfigKey),
}
@@ -344,6 +355,7 @@ impl AsRef<str> for AzureConfigKey {
Self::FederatedTokenFile => "azure_federated_token_file",
Self::UseAzureCli => "azure_use_azure_cli",
Self::ContainerName => "azure_container_name",
+ Self::DisableTagging => "azure_disable_tagging",
Self::Client(key) => key.as_ref(),
}
}
@@ -387,6 +399,7 @@ impl FromStr for AzureConfigKey {
"azure_use_fabric_endpoint" | "use_fabric_endpoint" =>
Ok(Self::UseFabricEndpoint),
"azure_use_azure_cli" | "use_azure_cli" => Ok(Self::UseAzureCli),
"azure_container_name" | "container_name" =>
Ok(Self::ContainerName),
+ "azure_disable_tagging" | "disable_tagging" =>
Ok(Self::DisableTagging),
// Backwards compatibility
"azure_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)),
_ => match s.parse() {
@@ -503,6 +516,7 @@ impl MicrosoftAzureBuilder {
self.client_options = self.client_options.with_config(key,
value)
}
AzureConfigKey::ContainerName => self.container_name =
Some(value.into()),
+ AzureConfigKey::DisableTagging =>
self.disable_tagging.parse(value),
};
self
}
@@ -556,6 +570,7 @@ impl MicrosoftAzureBuilder {
AzureConfigKey::UseAzureCli =>
Some(self.use_azure_cli.to_string()),
AzureConfigKey::Client(key) =>
self.client_options.get_config_value(key),
AzureConfigKey::ContainerName => self.container_name.clone(),
+ AzureConfigKey::DisableTagging =>
Some(self.disable_tagging.to_string()),
}
}
@@ -781,6 +796,12 @@ impl MicrosoftAzureBuilder {
self
}
+ /// If set to `true` will ignore any tags provided to put_opts
+ pub fn with_disable_tagging(mut self, ignore: bool) -> Self {
+ self.disable_tagging = ignore.into();
+ self
+ }
+
/// Configure a connection to container with given name on Microsoft Azure
Blob store.
pub fn build(mut self) -> Result<MicrosoftAzure> {
if let Some(url) = self.url.take() {
@@ -885,6 +906,7 @@ impl MicrosoftAzureBuilder {
account,
is_emulator,
container,
+ disable_tagging: self.disable_tagging.get()?,
retry_config: self.retry_config,
client_options: self.client_options,
service: storage_url,
diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs
index c7bd791498..3c71e69da0 100644
--- a/object_store/src/azure/client.rs
+++ b/object_store/src/azure/client.rs
@@ -50,6 +50,8 @@ use url::Url;
const VERSION_HEADER: &str = "x-ms-version-id";
+static TAGS_HEADER: HeaderName = HeaderName::from_static("x-ms-tags");
+
/// A specialized `Error` for object store-related errors
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
@@ -124,11 +126,12 @@ pub(crate) struct AzureConfig {
pub retry_config: RetryConfig,
pub service: Url,
pub is_emulator: bool,
+ pub disable_tagging: bool,
pub client_options: ClientOptions,
}
impl AzureConfig {
- fn path_url(&self, path: &Path) -> Url {
+ pub(crate) fn path_url(&self, path: &Path) -> Url {
let mut url = self.service.clone();
{
let mut path_mut = url.path_segments_mut().unwrap();
@@ -229,6 +232,11 @@ impl AzureClient {
}
};
+ let builder = match (opts.tags.encoded(), self.config.disable_tagging)
{
+ ("", _) | (_, true) => builder,
+ (tags, false) => builder.header(&TAGS_HEADER, tags),
+ };
+
let response = builder.header(&BLOB_TYPE, "BlockBlob").send().await?;
Ok(get_put_result(response.headers(),
VERSION_HEADER).context(MetadataSnafu)?)
}
@@ -315,6 +323,23 @@ impl AzureClient {
Ok(())
}
+
+ #[cfg(test)]
+ pub async fn get_blob_tagging(&self, path: &Path) -> Result<Response> {
+ let credential = self.get_credential().await?;
+ let url = self.config.path_url(path);
+ let response = self
+ .client
+ .request(Method::GET, url)
+ .query(&[("comp", "tags")])
+ .with_azure_authorization(&credential, &self.config.account)
+ .send_retry(&self.config.retry_config)
+ .await
+ .context(GetRequestSnafu {
+ path: path.as_ref(),
+ })?;
+ Ok(response)
+ }
}
#[async_trait]
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index 762a51dd9d..1d51cbdc02 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -202,6 +202,13 @@ mod tests {
stream_get(&integration).await;
put_opts(&integration, true).await;
multipart(&integration, &integration).await;
+
+ let validate = !integration.client.config().disable_tagging;
+ tagging(&integration, validate, |p| {
+ let client = Arc::clone(&integration.client);
+ async move { client.get_blob_tagging(&p).await }
+ })
+ .await
}
#[test]
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 66964304e8..51203ca4a4 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -263,6 +263,10 @@ pub use client::{
#[cfg(feature = "cloud")]
mod config;
+mod tags;
+
+pub use tags::TagSet;
+
pub mod multipart;
mod parse;
mod util;
@@ -893,11 +897,27 @@ impl From<PutResult> for UpdateVersion {
pub struct PutOptions {
/// Configure the [`PutMode`] for this operation
pub mode: PutMode,
+ /// Provide a [`TagSet`] for this object
+ ///
+ /// Implementations that don't support object tagging should ignore this
+ pub tags: TagSet,
}
impl From<PutMode> for PutOptions {
fn from(mode: PutMode) -> Self {
- Self { mode }
+ Self {
+ mode,
+ ..Default::default()
+ }
+ }
+}
+
+impl From<TagSet> for PutOptions {
+ fn from(tags: TagSet) -> Self {
+ Self {
+ tags,
+ ..Default::default()
+ }
}
}
@@ -1015,6 +1035,7 @@ mod tests {
use chrono::TimeZone;
use futures::stream::FuturesUnordered;
use rand::{thread_rng, Rng};
+ use std::future::Future;
use tokio::io::AsyncWriteExt;
pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) {
@@ -1882,6 +1903,68 @@ mod tests {
assert_eq!(meta.size, chunk_size * 2);
}
+ #[cfg(any(feature = "aws", feature = "azure"))]
+ pub(crate) async fn tagging<F, Fut>(storage: &dyn ObjectStore, validate:
bool, get_tags: F)
+ where
+ F: Fn(Path) -> Fut + Send + Sync,
+ Fut: Future<Output = Result<reqwest::Response>> + Send,
+ {
+ use bytes::Buf;
+ use serde::Deserialize;
+
+ #[derive(Deserialize)]
+ struct Tagging {
+ #[serde(rename = "TagSet")]
+ list: TagList,
+ }
+
+ #[derive(Debug, Deserialize)]
+ struct TagList {
+ #[serde(rename = "Tag")]
+ tags: Vec<Tag>,
+ }
+
+ #[derive(Debug, Deserialize, Eq, PartialEq)]
+ #[serde(rename_all = "PascalCase")]
+ struct Tag {
+ key: String,
+ value: String,
+ }
+
+ let tags = vec![
+ Tag {
+ key: "foo.com=bar/s".to_string(),
+ value: "bananas/foo.com-_".to_string(),
+ },
+ Tag {
+ key: "namespace/key.foo".to_string(),
+ value: "value with a space".to_string(),
+ },
+ ];
+ let mut tag_set = TagSet::default();
+ for t in &tags {
+ tag_set.push(&t.key, &t.value)
+ }
+
+ let path = Path::from("tag_test");
+ storage
+ .put_opts(&path, "test".into(), tag_set.into())
+ .await
+ .unwrap();
+
+ // Write should always succeed, but certain configurations may simply
ignore tags
+ if !validate {
+ return;
+ }
+
+ let resp = get_tags(path.clone()).await.unwrap();
+ let body = resp.bytes().await.unwrap();
+
+ let mut resp: Tagging =
quick_xml::de::from_reader(body.reader()).unwrap();
+ resp.list.tags.sort_by(|a, b| a.key.cmp(&b.key));
+ assert_eq!(resp.list.tags, tags);
+ }
+
async fn delete_fixtures(storage: &DynObjectStore) {
let paths = storage.list(None).map_ok(|meta| meta.location).boxed();
storage
diff --git a/object_store/src/tags.rs b/object_store/src/tags.rs
new file mode 100644
index 0000000000..fa6e5913f4
--- /dev/null
+++ b/object_store/src/tags.rs
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use url::form_urlencoded::Serializer;
+
+/// A collection of key value pairs used to annotate objects
+///
+/// <https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-tagging.html>
+/// <https://learn.microsoft.com/en-us/rest/api/storageservices/set-blob-tags>
+#[derive(Debug, Clone, Default, Eq, PartialEq)]
+pub struct TagSet(String);
+
+impl TagSet {
+ /// Append a key value pair to this [`TagSet`]
+ ///
+ /// Stores have different restrictions on what characters are permitted,
+ /// for portability it is recommended applications use no more than 10
tags,
+ /// and stick to alphanumeric characters, and `+ - = . _ : /`
+ ///
+ ///
<https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObjectTagging.html>
+ ///
<https://learn.microsoft.com/en-us/rest/api/storageservices/set-blob-tags?tabs=azure-ad#request-body>
+ pub fn push(&mut self, key: &str, value: &str) {
+ Serializer::new(&mut self.0).append_pair(key, value);
+ }
+
+ /// Return this [`TagSet`] as a URL-encoded string
+ pub fn encoded(&self) -> &str {
+ &self.0
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_tag_set() {
+ let mut set = TagSet::default();
+ set.push("test/foo", "value sdlks");
+ set.push("foo", " sdf _ /+./sd");
+ assert_eq!(
+ set.encoded(),
+ "test%2Ffoo=value+sdlks&foo=+sdf+_+%2F%2B.%2Fsd"
+ );
+ }
+}