This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new f3baeaab8 Make ring optional dependency and cleanup tests (#2344)
f3baeaab8 is described below
commit f3baeaab8340d5cbbb1334fa3c179b08151a9a91
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Aug 8 10:05:29 2022 +0100
Make ring optional dependency and cleanup tests (#2344)
---
object_store/Cargo.toml | 8 +--
object_store/src/aws.rs | 57 ++-----------------
object_store/src/azure.rs | 10 ++--
object_store/src/gcp.rs | 10 ++--
object_store/src/lib.rs | 129 ++++++++++++++++++++-----------------------
object_store/src/limit.rs | 10 ++--
object_store/src/local.rs | 20 +++----
object_store/src/memory.rs | 12 ++--
object_store/src/throttle.rs | 10 ++--
9 files changed, 104 insertions(+), 162 deletions(-)
diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml
index aaf9ee947..7ccec8651 100644
--- a/object_store/Cargo.toml
+++ b/object_store/Cargo.toml
@@ -46,9 +46,9 @@ serde = { version = "1.0", default-features = false, features
= ["derive"], opti
serde_json = { version = "1.0", default-features = false, optional = true }
quick-xml = { version = "0.23.0", features = ["serialize"], optional = true }
rustls-pemfile = { version = "1.0", default-features = false, optional = true }
-ring = { version = "0.16", default-features = false, features = ["std"] }
+ring = { version = "0.16", default-features = false, features = ["std"],
optional = true }
base64 = { version = "0.13", default-features = false, optional = true }
-rand = { version = "0.8", default-features = false, optional = true, features
= ["std", "std_rng"] }
+rand = { version = "0.8", default-features = false, features = ["std",
"std_rng"], optional = true }
# for rusoto
hyper = { version = "0.14", optional = true, default-features = false }
# for rusoto
@@ -63,7 +63,7 @@ rusoto_sts = { version = "0.48.0", optional = true,
default-features = false, fe
snafu = "0.7"
tokio = { version = "1.18", features = ["sync", "macros", "parking_lot",
"rt-multi-thread", "time", "io-util"] }
tracing = { version = "0.1" }
-reqwest = { version = "0.11", optional = true, default-features = false,
features = ["rustls-tls"] }
+reqwest = { version = "0.11", default-features = false, features =
["rustls-tls"], optional = true }
parking_lot = { version = "0.12" }
# Filesystem integration
url = "2.2"
@@ -72,7 +72,7 @@ walkdir = "2"
[features]
azure = ["azure_core", "azure_storage_blobs", "azure_storage", "reqwest"]
azure_test = ["azure", "azure_core/azurite_workaround",
"azure_storage/azurite_workaround", "azure_storage_blobs/azurite_workaround"]
-gcp = ["serde", "serde_json", "quick-xml", "reqwest", "reqwest/json",
"reqwest/stream", "chrono/serde", "rustls-pemfile", "base64", "rand"]
+gcp = ["serde", "serde_json", "quick-xml", "reqwest", "reqwest/json",
"reqwest/stream", "chrono/serde", "rustls-pemfile", "base64", "rand", "ring"]
aws = ["rusoto_core", "rusoto_credential", "rusoto_s3", "rusoto_sts", "hyper",
"hyper-rustls"]
[dev-dependencies] # In alphabetical order
diff --git a/object_store/src/aws.rs b/object_store/src/aws.rs
index 86766b052..bcb294c00 100644
--- a/object_store/src/aws.rs
+++ b/object_store/src/aws.rs
@@ -1027,34 +1027,6 @@ where
}
}
-impl Error {
- #[cfg(test)]
- fn s3_error_due_to_credentials(&self) -> bool {
- use rusoto_core::RusotoError;
- use Error::*;
-
- matches!(
- self,
- UnableToPutData {
- source: RusotoError::Credentials(_),
- bucket: _,
- path: _,
- } | UnableToGetData {
- source: RusotoError::Credentials(_),
- bucket: _,
- path: _,
- } | UnableToDeleteData {
- source: RusotoError::Credentials(_),
- bucket: _,
- path: _,
- } | UnableToListData {
- source: RusotoError::Credentials(_),
- bucket: _,
- }
- )
- }
-}
-
struct S3MultiPartUpload {
bucket: String,
key: String,
@@ -1186,9 +1158,6 @@ mod tests {
use bytes::Bytes;
use std::env;
- type TestError = Box<dyn std::error::Error + Send + Sync + 'static>;
- type Result<T, E = TestError> = std::result::Result<T, E>;
-
const NON_EXISTENT_NAME: &str = "nonexistentname";
// Helper macro to skip tests if TEST_INTEGRATION and the AWS
@@ -1268,32 +1237,16 @@ mod tests {
}};
}
- fn check_credentials<T>(r: Result<T>) -> Result<T> {
- if let Err(e) = &r {
- let e = &**e;
- if let Some(e) = e.downcast_ref::<Error>() {
- if e.s3_error_due_to_credentials() {
- eprintln!(
- "Try setting the AWS_ACCESS_KEY_ID and
AWS_SECRET_ACCESS_KEY \
- environment variables"
- );
- }
- }
- }
-
- r
- }
-
#[tokio::test]
async fn s3_test() {
let config = maybe_skip_integration!();
let integration = config.build().unwrap();
- check_credentials(put_get_delete_list(&integration).await).unwrap();
-
check_credentials(list_uses_directories_correctly(&integration).await).unwrap();
- check_credentials(list_with_delimiter(&integration).await).unwrap();
- check_credentials(rename_and_copy(&integration).await).unwrap();
- check_credentials(stream_get(&integration).await).unwrap();
+ put_get_delete_list(&integration).await;
+ list_uses_directories_correctly(&integration).await;
+ list_with_delimiter(&integration).await;
+ rename_and_copy(&integration).await;
+ stream_get(&integration).await;
}
#[tokio::test]
diff --git a/object_store/src/azure.rs b/object_store/src/azure.rs
index cee874b95..6a5f53799 100644
--- a/object_store/src/azure.rs
+++ b/object_store/src/azure.rs
@@ -858,10 +858,10 @@ mod tests {
async fn azure_blob_test() {
let integration = maybe_skip_integration!().build().unwrap();
- put_get_delete_list(&integration).await.unwrap();
- list_uses_directories_correctly(&integration).await.unwrap();
- list_with_delimiter(&integration).await.unwrap();
- rename_and_copy(&integration).await.unwrap();
- copy_if_not_exists(&integration).await.unwrap();
+ put_get_delete_list(&integration).await;
+ list_uses_directories_correctly(&integration).await;
+ list_with_delimiter(&integration).await;
+ rename_and_copy(&integration).await;
+ copy_if_not_exists(&integration).await;
}
}
diff --git a/object_store/src/gcp.rs b/object_store/src/gcp.rs
index f9cb2b207..0dc5a956a 100644
--- a/object_store/src/gcp.rs
+++ b/object_store/src/gcp.rs
@@ -1003,14 +1003,14 @@ mod test {
async fn gcs_test() {
let integration = maybe_skip_integration!().build().unwrap();
- put_get_delete_list(&integration).await.unwrap();
- list_uses_directories_correctly(&integration).await.unwrap();
- list_with_delimiter(&integration).await.unwrap();
- rename_and_copy(&integration).await.unwrap();
+ put_get_delete_list(&integration).await;
+ list_uses_directories_correctly(&integration).await;
+ list_with_delimiter(&integration).await;
+ rename_and_copy(&integration).await;
if integration.client.base_url == default_gcs_base_url() {
// Fake GCS server does not yet implement XML Multipart uploads
// https://github.com/fsouza/fake-gcs-server/issues/852
- stream_get(&integration).await.unwrap();
+ stream_get(&integration).await;
}
}
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index b60a29575..564799dbc 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -478,15 +478,12 @@ mod tests {
use crate::test_util::flatten_list_stream;
use tokio::io::AsyncWriteExt;
- type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
- type Result<T, E = Error> = std::result::Result<T, E>;
-
- pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) ->
Result<()> {
+ pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) {
let store_str = storage.to_string();
delete_fixtures(storage).await;
- let content_list = flatten_list_stream(storage, None).await?;
+ let content_list = flatten_list_stream(storage, None).await.unwrap();
assert!(
content_list.is_empty(),
"Expected list to be empty; found: {:?}",
@@ -497,16 +494,16 @@ mod tests {
let data = Bytes::from("arbitrary data");
let expected_data = data.clone();
- storage.put(&location, data).await?;
+ storage.put(&location, data).await.unwrap();
let root = Path::from("/");
// List everything
- let content_list = flatten_list_stream(storage, None).await?;
+ let content_list = flatten_list_stream(storage, None).await.unwrap();
assert_eq!(content_list, &[location.clone()]);
// Should behave the same as no prefix
- let content_list = flatten_list_stream(storage, Some(&root)).await?;
+ let content_list = flatten_list_stream(storage,
Some(&root)).await.unwrap();
assert_eq!(content_list, &[location.clone()]);
// List with delimiter
@@ -523,15 +520,15 @@ mod tests {
// List everything starting with a prefix that should return results
let prefix = Path::from("test_dir");
- let content_list = flatten_list_stream(storage, Some(&prefix)).await?;
+ let content_list = flatten_list_stream(storage,
Some(&prefix)).await.unwrap();
assert_eq!(content_list, &[location.clone()]);
// List everything starting with a prefix that shouldn't return results
let prefix = Path::from("something");
- let content_list = flatten_list_stream(storage, Some(&prefix)).await?;
+ let content_list = flatten_list_stream(storage,
Some(&prefix)).await.unwrap();
assert!(content_list.is_empty());
- let read_data = storage.get(&location).await?.bytes().await?;
+ let read_data =
storage.get(&location).await.unwrap().bytes().await.unwrap();
assert_eq!(&*read_data, expected_data);
// Test range request
@@ -557,12 +554,12 @@ mod tests {
out_of_range_result.unwrap_err();
}
- let head = storage.head(&location).await?;
+ let head = storage.head(&location).await.unwrap();
assert_eq!(head.size, expected_data.len());
- storage.delete(&location).await?;
+ storage.delete(&location).await.unwrap();
- let content_list = flatten_list_stream(storage, None).await?;
+ let content_list = flatten_list_stream(storage, None).await.unwrap();
assert!(content_list.is_empty());
let err = storage.get(&location).await.unwrap_err();
@@ -647,8 +644,6 @@ mod tests {
.await
.unwrap();
assert!(files.is_empty());
-
- Ok(())
}
fn get_vec_of_bytes(chunk_length: usize, num_chunks: usize) -> Vec<Bytes> {
@@ -657,15 +652,15 @@ mod tests {
.collect()
}
- pub(crate) async fn stream_get(storage: &DynObjectStore) -> Result<()> {
+ pub(crate) async fn stream_get(storage: &DynObjectStore) {
let location = Path::from("test_dir/test_upload_file.txt");
// Can write to storage
let data = get_vec_of_bytes(5_000_000, 10);
let bytes_expected = data.concat();
- let (_, mut writer) = storage.put_multipart(&location).await?;
+ let (_, mut writer) = storage.put_multipart(&location).await.unwrap();
for chunk in &data {
- writer.write_all(chunk).await?;
+ writer.write_all(chunk).await.unwrap();
}
// Object should not yet exist in store
@@ -676,26 +671,29 @@ mod tests {
crate::Error::NotFound { .. }
));
- writer.shutdown().await?;
- let bytes_written = storage.get(&location).await?.bytes().await?;
+ writer.shutdown().await.unwrap();
+ let bytes_written =
storage.get(&location).await.unwrap().bytes().await.unwrap();
assert_eq!(bytes_expected, bytes_written);
// Can overwrite some storage
let data = get_vec_of_bytes(5_000, 5);
let bytes_expected = data.concat();
- let (_, mut writer) = storage.put_multipart(&location).await?;
+ let (_, mut writer) = storage.put_multipart(&location).await.unwrap();
for chunk in &data {
- writer.write_all(chunk).await?;
+ writer.write_all(chunk).await.unwrap();
}
- writer.shutdown().await?;
- let bytes_written = storage.get(&location).await?.bytes().await?;
+ writer.shutdown().await.unwrap();
+ let bytes_written =
storage.get(&location).await.unwrap().bytes().await.unwrap();
assert_eq!(bytes_expected, bytes_written);
// We can abort an empty write
let location = Path::from("test_dir/test_abort_upload.txt");
- let (upload_id, writer) = storage.put_multipart(&location).await?;
+ let (upload_id, writer) =
storage.put_multipart(&location).await.unwrap();
drop(writer);
- storage.abort_multipart(&location, &upload_id).await?;
+ storage
+ .abort_multipart(&location, &upload_id)
+ .await
+ .unwrap();
let get_res = storage.get(&location).await;
assert!(get_res.is_err());
assert!(matches!(
@@ -704,30 +702,29 @@ mod tests {
));
// We can abort an in-progress write
- let (upload_id, mut writer) = storage.put_multipart(&location).await?;
+ let (upload_id, mut writer) =
storage.put_multipart(&location).await.unwrap();
if let Some(chunk) = data.get(0) {
- writer.write_all(chunk).await?;
- let _ = writer.write(chunk).await?;
+ writer.write_all(chunk).await.unwrap();
+ let _ = writer.write(chunk).await.unwrap();
}
drop(writer);
- storage.abort_multipart(&location, &upload_id).await?;
+ storage
+ .abort_multipart(&location, &upload_id)
+ .await
+ .unwrap();
let get_res = storage.get(&location).await;
assert!(get_res.is_err());
assert!(matches!(
get_res.unwrap_err(),
crate::Error::NotFound { .. }
));
-
- Ok(())
}
- pub(crate) async fn list_uses_directories_correctly(
- storage: &DynObjectStore,
- ) -> Result<()> {
+ pub(crate) async fn list_uses_directories_correctly(storage:
&DynObjectStore) {
delete_fixtures(storage).await;
- let content_list = flatten_list_stream(storage, None).await?;
+ let content_list = flatten_list_stream(storage, None).await.unwrap();
assert!(
content_list.is_empty(),
"Expected list to be empty; found: {:?}",
@@ -738,25 +735,23 @@ mod tests {
let location2 = Path::from("foo.bar/y.json");
let data = Bytes::from("arbitrary data");
- storage.put(&location1, data.clone()).await?;
- storage.put(&location2, data).await?;
+ storage.put(&location1, data.clone()).await.unwrap();
+ storage.put(&location2, data).await.unwrap();
let prefix = Path::from("foo");
- let content_list = flatten_list_stream(storage, Some(&prefix)).await?;
+ let content_list = flatten_list_stream(storage,
Some(&prefix)).await.unwrap();
assert_eq!(content_list, &[location1.clone()]);
let prefix = Path::from("foo/x");
- let content_list = flatten_list_stream(storage, Some(&prefix)).await?;
+ let content_list = flatten_list_stream(storage,
Some(&prefix)).await.unwrap();
assert_eq!(content_list, &[]);
-
- Ok(())
}
- pub(crate) async fn list_with_delimiter(storage: &DynObjectStore) ->
Result<()> {
+ pub(crate) async fn list_with_delimiter(storage: &DynObjectStore) {
delete_fixtures(storage).await;
// ==================== check: store is empty ====================
- let content_list = flatten_list_stream(storage, None).await?;
+ let content_list = flatten_list_stream(storage, None).await.unwrap();
assert!(content_list.is_empty());
// ==================== do: create files ====================
@@ -818,10 +813,8 @@ mod tests {
}
// ==================== check: store is empty ====================
- let content_list = flatten_list_stream(storage, None).await?;
+ let content_list = flatten_list_stream(storage, None).await.unwrap();
assert!(content_list.is_empty());
-
- Ok(())
}
pub(crate) async fn get_nonexistent_object(
@@ -837,7 +830,7 @@ mod tests {
storage.get(&location).await?.bytes().await
}
- pub(crate) async fn rename_and_copy(storage: &DynObjectStore) ->
Result<()> {
+ pub(crate) async fn rename_and_copy(storage: &DynObjectStore) {
// Create two objects
let path1 = Path::from("test1");
let path2 = Path::from("test2");
@@ -845,29 +838,27 @@ mod tests {
let contents2 = Bytes::from("dogs");
// copy() make both objects identical
- storage.put(&path1, contents1.clone()).await?;
- storage.put(&path2, contents2.clone()).await?;
- storage.copy(&path1, &path2).await?;
- let new_contents = storage.get(&path2).await?.bytes().await?;
+ storage.put(&path1, contents1.clone()).await.unwrap();
+ storage.put(&path2, contents2.clone()).await.unwrap();
+ storage.copy(&path1, &path2).await.unwrap();
+ let new_contents =
storage.get(&path2).await.unwrap().bytes().await.unwrap();
assert_eq!(&new_contents, &contents1);
// rename() copies contents and deletes original
- storage.put(&path1, contents1.clone()).await?;
- storage.put(&path2, contents2.clone()).await?;
- storage.rename(&path1, &path2).await?;
- let new_contents = storage.get(&path2).await?.bytes().await?;
+ storage.put(&path1, contents1.clone()).await.unwrap();
+ storage.put(&path2, contents2.clone()).await.unwrap();
+ storage.rename(&path1, &path2).await.unwrap();
+ let new_contents =
storage.get(&path2).await.unwrap().bytes().await.unwrap();
assert_eq!(&new_contents, &contents1);
let result = storage.get(&path1).await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
// Clean up
- storage.delete(&path2).await?;
-
- Ok(())
+ storage.delete(&path2).await.unwrap();
}
- pub(crate) async fn copy_if_not_exists(storage: &DynObjectStore) ->
Result<()> {
+ pub(crate) async fn copy_if_not_exists(storage: &DynObjectStore) {
// Create two objects
let path1 = Path::from("test1");
let path2 = Path::from("test2");
@@ -875,8 +866,8 @@ mod tests {
let contents2 = Bytes::from("dogs");
// copy_if_not_exists() errors if destination already exists
- storage.put(&path1, contents1.clone()).await?;
- storage.put(&path2, contents2.clone()).await?;
+ storage.put(&path1, contents1.clone()).await.unwrap();
+ storage.put(&path2, contents2.clone()).await.unwrap();
let result = storage.copy_if_not_exists(&path1, &path2).await;
assert!(result.is_err());
assert!(matches!(
@@ -885,19 +876,17 @@ mod tests {
));
// copy_if_not_exists() copies contents and allows deleting original
- storage.delete(&path2).await?;
- storage.copy_if_not_exists(&path1, &path2).await?;
- storage.delete(&path1).await?;
- let new_contents = storage.get(&path2).await?.bytes().await?;
+ storage.delete(&path2).await.unwrap();
+ storage.copy_if_not_exists(&path1, &path2).await.unwrap();
+ storage.delete(&path1).await.unwrap();
+ let new_contents =
storage.get(&path2).await.unwrap().bytes().await.unwrap();
assert_eq!(&new_contents, &contents1);
let result = storage.get(&path1).await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
// Clean up
- storage.delete(&path2).await?;
-
- Ok(())
+ storage.delete(&path2).await.unwrap();
}
async fn delete_fixtures(storage: &DynObjectStore) {
diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs
index fd21ccb58..acee7d5a1 100644
--- a/object_store/src/limit.rs
+++ b/object_store/src/limit.rs
@@ -237,11 +237,11 @@ mod tests {
let memory = InMemory::new();
let integration = LimitStore::new(memory, max_requests);
- put_get_delete_list(&integration).await.unwrap();
- list_uses_directories_correctly(&integration).await.unwrap();
- list_with_delimiter(&integration).await.unwrap();
- rename_and_copy(&integration).await.unwrap();
- stream_get(&integration).await.unwrap();
+ put_get_delete_list(&integration).await;
+ list_uses_directories_correctly(&integration).await;
+ list_with_delimiter(&integration).await;
+ rename_and_copy(&integration).await;
+ stream_get(&integration).await;
let mut streams = Vec::with_capacity(max_requests);
for _ in 0..max_requests {
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index c3f54e0c6..095498178 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -888,12 +888,12 @@ mod tests {
let root = TempDir::new().unwrap();
let integration =
LocalFileSystem::new_with_prefix(root.path()).unwrap();
- put_get_delete_list(&integration).await.unwrap();
- list_uses_directories_correctly(&integration).await.unwrap();
- list_with_delimiter(&integration).await.unwrap();
- rename_and_copy(&integration).await.unwrap();
- copy_if_not_exists(&integration).await.unwrap();
- stream_get(&integration).await.unwrap();
+ put_get_delete_list(&integration).await;
+ list_uses_directories_correctly(&integration).await;
+ list_with_delimiter(&integration).await;
+ rename_and_copy(&integration).await;
+ copy_if_not_exists(&integration).await;
+ stream_get(&integration).await;
}
#[test]
@@ -901,10 +901,10 @@ mod tests {
let root = TempDir::new().unwrap();
let integration =
LocalFileSystem::new_with_prefix(root.path()).unwrap();
futures::executor::block_on(async move {
- put_get_delete_list(&integration).await.unwrap();
- list_uses_directories_correctly(&integration).await.unwrap();
- list_with_delimiter(&integration).await.unwrap();
- stream_get(&integration).await.unwrap();
+ put_get_delete_list(&integration).await;
+ list_uses_directories_correctly(&integration).await;
+ list_with_delimiter(&integration).await;
+ stream_get(&integration).await;
});
}
diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs
index dc3967d99..98eb3aa23 100644
--- a/object_store/src/memory.rs
+++ b/object_store/src/memory.rs
@@ -305,12 +305,12 @@ mod tests {
async fn in_memory_test() {
let integration = InMemory::new();
- put_get_delete_list(&integration).await.unwrap();
- list_uses_directories_correctly(&integration).await.unwrap();
- list_with_delimiter(&integration).await.unwrap();
- rename_and_copy(&integration).await.unwrap();
- copy_if_not_exists(&integration).await.unwrap();
- stream_get(&integration).await.unwrap();
+ put_get_delete_list(&integration).await;
+ list_uses_directories_correctly(&integration).await;
+ list_with_delimiter(&integration).await;
+ rename_and_copy(&integration).await;
+ copy_if_not_exists(&integration).await;
+ stream_get(&integration).await;
}
#[tokio::test]
diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs
index 6789f0e68..dba9f249a 100644
--- a/object_store/src/throttle.rs
+++ b/object_store/src/throttle.rs
@@ -308,11 +308,11 @@ mod tests {
let inner = InMemory::new();
let store = ThrottledStore::new(inner, ThrottleConfig::default());
- put_get_delete_list(&store).await.unwrap();
- list_uses_directories_correctly(&store).await.unwrap();
- list_with_delimiter(&store).await.unwrap();
- rename_and_copy(&store).await.unwrap();
- copy_if_not_exists(&store).await.unwrap();
+ put_get_delete_list(&store).await;
+ list_uses_directories_correctly(&store).await;
+ list_with_delimiter(&store).await;
+ rename_and_copy(&store).await;
+ copy_if_not_exists(&store).await;
}
#[tokio::test]