This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new b94c99e7 chore: upgrade object store version (#1541)
b94c99e7 is described below
commit b94c99e79554f8c915d3b733ed628e4f837f0183
Author: 鲍金日 <[email protected]>
AuthorDate: Mon Aug 19 14:04:41 2024 +0800
chore: upgrade object store version (#1541)
## Rationale
The object store version is upgraded to 0.10.1 to prepare for access to
opendal
## Detailed Changes
- Impl AsyncWrite for ObjectStoreMultiUpload
- Impl MultipartUpload for ObkvMultiPartUpload
- Adapt new api on query writing path
## Test Plan
- Existing tests
---------
Co-authored-by: jiacai2050 <[email protected]>
---
Cargo.lock | 367 +++++++--
src/analytic_engine/src/setup.rs | 25 -
src/analytic_engine/src/sst/meta_data/cache.rs | 2 +-
src/analytic_engine/src/sst/parquet/writer.rs | 100 +--
src/components/object_store/Cargo.toml | 2 +-
src/components/object_store/src/config.rs | 35 -
src/components/object_store/src/disk_cache.rs | 62 +-
src/components/object_store/src/lib.rs | 12 +-
src/components/object_store/src/mem_cache.rs | 44 +-
src/components/object_store/src/metrics.rs | 85 +-
src/components/object_store/src/multi_part.rs | 221 ++++++
src/components/object_store/src/multipart.rs | 280 -------
src/components/object_store/src/obkv/meta.rs | 437 ----------
src/components/object_store/src/obkv/mod.rs | 1015 ------------------------
src/components/object_store/src/obkv/util.rs | 122 ---
src/components/object_store/src/prefix.rs | 94 ++-
src/components/object_store/src/test_util.rs | 66 +-
src/tools/src/bin/sst-metadata.rs | 2 +-
18 files changed, 822 insertions(+), 2149 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index c904497a..66815f7a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -797,6 +797,12 @@ dependencies = [
"syn 2.0.48",
]
+[[package]]
+name = "atomic-waker"
+version = "1.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
+
[[package]]
name = "atomic_enum"
version = "0.2.0"
@@ -836,9 +842,9 @@ dependencies = [
"bitflags 1.3.2",
"bytes",
"futures-util",
- "http",
- "http-body",
- "hyper",
+ "http 0.2.9",
+ "http-body 0.4.5",
+ "hyper 0.14.25",
"itoa",
"matchit",
"memchr",
@@ -862,8 +868,8 @@ dependencies = [
"async-trait",
"bytes",
"futures-util",
- "http",
- "http-body",
+ "http 0.2.9",
+ "http-body 0.4.5",
"mime",
"rustversion",
"tower-layer",
@@ -897,6 +903,12 @@ version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a"
+[[package]]
+name = "base64"
+version = "0.22.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
+
[[package]]
name = "base64ct"
version = "1.6.0"
@@ -1333,9 +1345,9 @@ checksum =
"baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
-version = "0.4.33"
+version = "0.4.38"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9f13690e35a5e4ace198e7beea2895d29f3a9cc55015fcebe6336bd2010af9eb"
+checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401"
dependencies = [
"android-tzdata",
"iana-time-zone",
@@ -2494,7 +2506,7 @@ version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4319dc0fb739a6e84cb8678b8cf50c9bcfa4712ae826b33ecf00cc0850550a58"
dependencies = [
- "http",
+ "http 0.2.9",
"prost 0.11.8",
"tokio",
"tokio-stream",
@@ -2893,7 +2905,26 @@ dependencies = [
"futures-core",
"futures-sink",
"futures-util",
- "http",
+ "http 0.2.9",
+ "indexmap 2.0.0",
+ "slab",
+ "tokio",
+ "tokio-util",
+ "tracing",
+]
+
+[[package]]
+name = "h2"
+version = "0.4.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fa82e28a107a8cc405f0839610bdc9b15f1e25ec7d696aa5cf173edbcb1486ab"
+dependencies = [
+ "atomic-waker",
+ "bytes",
+ "fnv",
+ "futures-core",
+ "futures-sink",
+ "http 1.1.0",
"indexmap 2.0.0",
"slab",
"tokio",
@@ -2978,7 +3009,7 @@ dependencies = [
"bitflags 1.3.2",
"bytes",
"headers-core",
- "http",
+ "http 0.2.9",
"httpdate",
"mime",
"sha1",
@@ -2990,7 +3021,7 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429"
dependencies = [
- "http",
+ "http 0.2.9",
]
[[package]]
@@ -3047,7 +3078,7 @@ dependencies = [
"clap",
"lazy_static",
"prettytable",
- "reqwest",
+ "reqwest 0.11.24",
"serde",
"shell-words",
"tokio",
@@ -3112,7 +3143,7 @@ dependencies = [
"async-trait",
"horaedb-client",
"local-ip-address",
- "reqwest",
+ "reqwest 0.11.24",
"serde",
"sqlness",
"tokio",
@@ -3155,6 +3186,17 @@ dependencies = [
"itoa",
]
+[[package]]
+name = "http"
+version = "1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258"
+dependencies = [
+ "bytes",
+ "fnv",
+ "itoa",
+]
+
[[package]]
name = "http-body"
version = "0.4.5"
@@ -3162,7 +3204,30 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1"
dependencies = [
"bytes",
- "http",
+ "http 0.2.9",
+ "pin-project-lite",
+]
+
+[[package]]
+name = "http-body"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643"
+dependencies = [
+ "bytes",
+ "http 1.1.0",
+]
+
+[[package]]
+name = "http-body-util"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f"
+dependencies = [
+ "bytes",
+ "futures-util",
+ "http 1.1.0",
+ "http-body 1.0.0",
"pin-project-lite",
]
@@ -3203,9 +3268,9 @@ dependencies = [
"futures-channel",
"futures-core",
"futures-util",
- "h2",
- "http",
- "http-body",
+ "h2 0.3.26",
+ "http 0.2.9",
+ "http-body 0.4.5",
"httparse",
"httpdate",
"itoa",
@@ -3217,6 +3282,26 @@ dependencies = [
"want",
]
+[[package]]
+name = "hyper"
+version = "1.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fe575dd17d0862a9a33781c8c4696a55c320909004a67a00fb286ba8b1bc496d"
+dependencies = [
+ "bytes",
+ "futures-channel",
+ "futures-util",
+ "h2 0.4.5",
+ "http 1.1.0",
+ "http-body 1.0.0",
+ "httparse",
+ "itoa",
+ "pin-project-lite",
+ "smallvec",
+ "tokio",
+ "want",
+]
+
[[package]]
name = "hyper-rustls"
version = "0.24.2"
@@ -3224,25 +3309,62 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590"
dependencies = [
"futures-util",
- "http",
- "hyper",
+ "http 0.2.9",
+ "hyper 0.14.25",
"rustls 0.21.6",
"tokio",
"tokio-rustls 0.24.1",
]
+[[package]]
+name = "hyper-rustls"
+version = "0.26.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c"
+dependencies = [
+ "futures-util",
+ "http 1.1.0",
+ "hyper 1.3.1",
+ "hyper-util",
+ "rustls 0.22.2",
+ "rustls-pki-types",
+ "tokio",
+ "tokio-rustls 0.25.0",
+ "tower-service",
+]
+
[[package]]
name = "hyper-timeout"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1"
dependencies = [
- "hyper",
+ "hyper 0.14.25",
"pin-project-lite",
"tokio",
"tokio-io-timeout",
]
+[[package]]
+name = "hyper-util"
+version = "0.1.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7b875924a60b96e5d7b9ae7b066540b1dd1cbd90d1828f54c92e02a283351c56"
+dependencies = [
+ "bytes",
+ "futures-channel",
+ "futures-util",
+ "http 1.1.0",
+ "http-body 1.0.0",
+ "hyper 1.3.1",
+ "pin-project-lite",
+ "socket2 0.5.3",
+ "tokio",
+ "tower",
+ "tower-service",
+ "tracing",
+]
+
[[package]]
name = "hyperloglog"
version = "1.0.2"
@@ -3904,10 +4026,11 @@ checksum =
"b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40"
[[package]]
name = "md-5"
-version = "0.10.5"
+version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca"
+checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf"
dependencies = [
+ "cfg-if 1.0.0",
"digest",
]
@@ -3988,7 +4111,7 @@ dependencies = [
"logger",
"macros",
"prost 0.11.8",
- "reqwest",
+ "reqwest 0.11.24",
"serde",
"serde_json",
"snafu 0.6.10",
@@ -4471,24 +4594,18 @@ dependencies = [
[[package]]
name = "object_store"
-version = "0.5.6"
+version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ec9cd6ca25e796a49fa242876d1c4de36a24a6da5258e9f0bc062dbf5e81c53b"
+checksum = "2524735495ea1268be33d200e1ee97455096a0846295a21548cd2f3541de7050"
dependencies = [
"async-trait",
- "base64 0.21.0",
"bytes",
"chrono",
"futures 0.3.28",
- "itertools 0.10.5",
+ "humantime 2.1.0",
+ "itertools 0.11.0",
"parking_lot 0.12.1",
"percent-encoding",
- "quick-xml 0.28.2",
- "rand 0.8.5",
- "reqwest",
- "ring 0.16.20",
- "serde",
- "serde_json",
"snafu 0.7.4",
"tokio",
"tracing",
@@ -4498,18 +4615,27 @@ dependencies = [
[[package]]
name = "object_store"
-version = "0.8.0"
+version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2524735495ea1268be33d200e1ee97455096a0846295a21548cd2f3541de7050"
+checksum = "fbebfd32c213ba1907fa7a9c9138015a8de2b43e30c5aa45b18f7deb46786ad6"
dependencies = [
"async-trait",
+ "base64 0.22.1",
"bytes",
"chrono",
"futures 0.3.28",
"humantime 2.1.0",
- "itertools 0.11.0",
+ "hyper 1.3.1",
+ "itertools 0.12.0",
+ "md-5",
"parking_lot 0.12.1",
"percent-encoding",
+ "quick-xml 0.31.0",
+ "rand 0.8.5",
+ "reqwest 0.12.4",
+ "ring 0.17.7",
+ "serde",
+ "serde_json",
"snafu 0.7.4",
"tokio",
"tracing",
@@ -4535,7 +4661,7 @@ dependencies = [
"lru 0.7.8",
"macros",
"notifier",
- "object_store 0.5.6",
+ "object_store 0.10.1",
"partitioned_lock",
"prometheus 0.12.0",
"prometheus-static-metric",
@@ -4576,7 +4702,7 @@ dependencies = [
"quick-error",
"r2d2",
"rand 0.8.5",
- "reqwest",
+ "reqwest 0.11.24",
"rust-crypto",
"scheduled-thread-pool",
"serde",
@@ -4627,6 +4753,12 @@ dependencies = [
"tokio",
]
+[[package]]
+name = "openssl-probe"
+version = "0.1.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
+
[[package]]
name = "ordered-float"
version = "2.10.0"
@@ -5450,7 +5582,7 @@ dependencies = [
"futures 0.3.28",
"generic_error",
"horaedbproto 2.0.0",
- "http",
+ "http 0.2.9",
"influxdb-line-protocol",
"interpreters",
"iox_query",
@@ -5636,9 +5768,9 @@ dependencies = [
[[package]]
name = "quick-xml"
-version = "0.28.2"
+version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0ce5e73202a820a31f8a0ee32ada5e21029c81fd9e3ebf668a40832e4219d9d1"
+checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33"
dependencies = [
"memchr",
"serde",
@@ -5894,11 +6026,11 @@ dependencies = [
"encoding_rs",
"futures-core",
"futures-util",
- "h2",
- "http",
- "http-body",
- "hyper",
- "hyper-rustls",
+ "h2 0.3.26",
+ "http 0.2.9",
+ "http-body 0.4.5",
+ "hyper 0.14.25",
+ "hyper-rustls 0.24.2",
"ipnet",
"js-sys",
"log",
@@ -5915,6 +6047,49 @@ dependencies = [
"system-configuration",
"tokio",
"tokio-rustls 0.24.1",
+ "tower-service",
+ "url",
+ "wasm-bindgen",
+ "wasm-bindgen-futures",
+ "web-sys",
+ "webpki-roots 0.25.4",
+ "winreg 0.50.0",
+]
+
+[[package]]
+name = "reqwest"
+version = "0.12.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "566cafdd92868e0939d3fb961bd0dc25fcfaaed179291093b3d43e6b3150ea10"
+dependencies = [
+ "base64 0.22.1",
+ "bytes",
+ "futures-core",
+ "futures-util",
+ "h2 0.4.5",
+ "http 1.1.0",
+ "http-body 1.0.0",
+ "http-body-util",
+ "hyper 1.3.1",
+ "hyper-rustls 0.26.0",
+ "hyper-util",
+ "ipnet",
+ "js-sys",
+ "log",
+ "mime",
+ "once_cell",
+ "percent-encoding",
+ "pin-project-lite",
+ "rustls 0.22.2",
+ "rustls-native-certs",
+ "rustls-pemfile 2.1.2",
+ "rustls-pki-types",
+ "serde",
+ "serde_json",
+ "serde_urlencoded",
+ "sync_wrapper",
+ "tokio",
+ "tokio-rustls 0.25.0",
"tokio-util",
"tower-service",
"url",
@@ -5922,8 +6097,7 @@ dependencies = [
"wasm-bindgen-futures",
"wasm-streams",
"web-sys",
- "webpki-roots 0.25.4",
- "winreg",
+ "winreg 0.52.0",
]
[[package]]
@@ -6172,6 +6346,19 @@ dependencies = [
"zeroize",
]
+[[package]]
+name = "rustls-native-certs"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792"
+dependencies = [
+ "openssl-probe",
+ "rustls-pemfile 2.1.2",
+ "rustls-pki-types",
+ "schannel",
+ "security-framework",
+]
+
[[package]]
name = "rustls-pemfile"
version = "0.2.1"
@@ -6190,11 +6377,21 @@ dependencies = [
"base64 0.21.0",
]
+[[package]]
+name = "rustls-pemfile"
+version = "2.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "29993a25686778eb88d4189742cd713c9bce943bc54251a33509dc63cbacf73d"
+dependencies = [
+ "base64 0.22.1",
+ "rustls-pki-types",
+]
+
[[package]]
name = "rustls-pki-types"
-version = "1.1.0"
+version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9e9d979b3ce68192e42760c7810125eb6cf2ea10efae545a156063e61f314e2a"
+checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d"
[[package]]
name = "rustls-webpki"
@@ -6267,6 +6464,15 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ece8e78b2f38ec51c51f5d475df0a7187ba5111b2a28bdc761ee05b075d40a71"
+[[package]]
+name = "schannel"
+version = "0.1.23"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fbc91545643bcf3a0bbb6569265615222618bdf33ce4ffbbd13c4bbd4c093534"
+dependencies = [
+ "windows-sys 0.52.0",
+]
+
[[package]]
name = "scheduled-thread-pool"
version = "0.2.7"
@@ -6323,6 +6529,29 @@ version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b"
+[[package]]
+name = "security-framework"
+version = "2.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "770452e37cad93e0a50d5abc3990d2bc351c36d0328f86cefec2f2fb206eaef6"
+dependencies = [
+ "bitflags 1.3.2",
+ "core-foundation",
+ "core-foundation-sys",
+ "libc",
+ "security-framework-sys",
+]
+
+[[package]]
+name = "security-framework-sys"
+version = "2.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "317936bbbd05227752583946b9e66d7ce3b489f84e11a94a510b4437fef407d7"
+dependencies = [
+ "core-foundation-sys",
+ "libc",
+]
+
[[package]]
name = "semver"
version = "1.0.17"
@@ -6421,7 +6650,7 @@ dependencies = [
"futures 0.3.28",
"generic_error",
"horaedbproto 2.0.0",
- "http",
+ "http 0.2.9",
"influxdb-line-protocol",
"interpreters",
"lazy_static",
@@ -6649,9 +6878,9 @@ dependencies = [
[[package]]
name = "smallvec"
-version = "1.10.0"
+version = "1.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0"
+checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67"
[[package]]
name = "snafu"
@@ -7459,10 +7688,10 @@ dependencies = [
"bytes",
"futures-core",
"futures-util",
- "h2",
- "http",
- "http-body",
- "hyper",
+ "h2 0.3.26",
+ "http 0.2.9",
+ "http-body 0.4.5",
+ "hyper 0.14.25",
"hyper-timeout",
"percent-encoding",
"pin-project",
@@ -7492,10 +7721,10 @@ dependencies = [
"bytes",
"futures-core",
"futures-util",
- "h2",
- "http",
- "http-body",
- "hyper",
+ "h2 0.3.26",
+ "http 0.2.9",
+ "http-body 0.4.5",
+ "hyper 0.14.25",
"hyper-timeout",
"percent-encoding",
"pin-project",
@@ -7713,7 +7942,7 @@ dependencies = [
"base64 0.13.1",
"byteorder",
"bytes",
- "http",
+ "http 0.2.9",
"httparse",
"log",
"rand 0.8.5",
@@ -7950,8 +8179,8 @@ dependencies = [
"futures-channel",
"futures-util",
"headers",
- "http",
- "hyper",
+ "http 0.2.9",
+ "hyper 0.14.25",
"log",
"mime",
"mime_guess",
@@ -8441,6 +8670,16 @@ dependencies = [
"windows-sys 0.48.0",
]
+[[package]]
+name = "winreg"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a277a57398d4bfa075df44f501a17cfdf8542d224f0d36095a2adc7aee4ef0a5"
+dependencies = [
+ "cfg-if 1.0.0",
+ "windows-sys 0.48.0",
+]
+
[[package]]
name = "wyz"
version = "0.5.1"
diff --git a/src/analytic_engine/src/setup.rs b/src/analytic_engine/src/setup.rs
index 1e25fb2e..be0d9354 100644
--- a/src/analytic_engine/src/setup.rs
+++ b/src/analytic_engine/src/setup.rs
@@ -27,13 +27,11 @@ use object_store::{
disk_cache::DiskCacheStore,
mem_cache::{MemCache, MemCacheStore},
metrics::StoreWithMetrics,
- obkv,
prefix::StoreWithPrefix,
s3, LocalFileSystem, ObjectStoreRef,
};
use snafu::{ResultExt, Snafu};
use table_engine::engine::{EngineRuntimes, TableEngineRef};
-use table_kv::obkv::ObkvImpl;
use wal::manager::{OpenedWals, WalManagerRef};
use crate::{
@@ -55,9 +53,6 @@ pub enum Error {
source: crate::instance::engine::Error,
},
- #[snafu(display("Failed to open obkv, err:{}", source))]
- OpenObkv { source: table_kv::obkv::Error },
-
#[snafu(display("Failed to execute in runtime, err:{}", source))]
RuntimeExec { source: runtime::Error },
@@ -214,26 +209,6 @@ fn open_storage(
let store_with_prefix =
StoreWithPrefix::new(aliyun_opts.prefix, oss);
Arc::new(store_with_prefix.context(OpenObjectStore)?) as _
}
- ObjectStoreOptions::Obkv(obkv_opts) => {
- let obkv_config = obkv_opts.client;
- let obkv = engine_runtimes
- .write_runtime
- .spawn_blocking(move ||
ObkvImpl::new(obkv_config).context(OpenObkv))
- .await
- .context(RuntimeExec)??;
-
- let oss: ObjectStoreRef = Arc::new(
- obkv::ObkvObjectStore::try_new(
- Arc::new(obkv),
- obkv_opts.shard_num,
- obkv_opts.part_size.0 as usize,
- obkv_opts.max_object_size.0 as usize,
- obkv_opts.upload_parallelism,
- )
- .context(OpenObjectStore)?,
- );
- Arc::new(StoreWithPrefix::new(obkv_opts.prefix,
oss).context(OpenObjectStore)?) as _
- }
ObjectStoreOptions::S3(s3_option) => {
let oss: ObjectStoreRef =
Arc::new(s3::try_new(&s3_option).context(OpenObjectStore)?);
diff --git a/src/analytic_engine/src/sst/meta_data/cache.rs
b/src/analytic_engine/src/sst/meta_data/cache.rs
index 016c1cbb..8ddaf487 100644
--- a/src/analytic_engine/src/sst/meta_data/cache.rs
+++ b/src/analytic_engine/src/sst/meta_data/cache.rs
@@ -290,7 +290,7 @@ mod tests {
let bytes =
encoding::encode_sst_meta_data(custom_meta_data.clone()).unwrap();
let meta_path = object_store::Path::from(meta_path);
- store.put(&meta_path, bytes).await.unwrap();
+ store.put(&meta_path, bytes.into()).await.unwrap();
}
#[tokio::test]
diff --git a/src/analytic_engine/src/sst/parquet/writer.rs
b/src/analytic_engine/src/sst/parquet/writer.rs
index 59c68f42..5fe669c5 100644
--- a/src/analytic_engine/src/sst/parquet/writer.rs
+++ b/src/analytic_engine/src/sst/parquet/writer.rs
@@ -28,10 +28,9 @@ use datafusion::parquet::basic::Compression;
use futures::StreamExt;
use generic_error::BoxError;
use logger::{debug, error};
-use object_store::{ObjectStoreRef, Path};
-use parquet::data_type::AsBytes;
+use object_store::{MultiUploadWriter, ObjectStore, ObjectStoreRef, Path,
WriteMultipartRef};
use snafu::{OptionExt, ResultExt};
-use tokio::io::{AsyncWrite, AsyncWriteExt};
+use tokio::io::AsyncWrite;
use crate::{
sst::{
@@ -45,8 +44,8 @@ use crate::{
},
},
writer::{
- self, BuildParquetFilter, EncodePbData, EncodeRecordBatch,
ExpectTimestampColumn, Io,
- MetaData, PollRecordBatch, RecordBatchStream, Result, SstInfo,
SstWriter, Storage,
+ BuildParquetFilter, EncodePbData, EncodeRecordBatch,
ExpectTimestampColumn, MetaData,
+ PollRecordBatch, RecordBatchStream, Result, SstInfo, SstWriter,
Storage,
},
},
table::sst_util,
@@ -405,67 +404,24 @@ impl<'a> RecordBatchGroupWriter<'a> {
}
}
-struct ObjectStoreMultiUploadAborter<'a> {
- location: &'a Path,
- session_id: String,
- object_store: &'a ObjectStoreRef,
-}
-
-impl<'a> ObjectStoreMultiUploadAborter<'a> {
- async fn initialize_upload(
- object_store: &'a ObjectStoreRef,
- location: &'a Path,
- ) -> Result<(
- ObjectStoreMultiUploadAborter<'a>,
- Box<dyn AsyncWrite + Unpin + Send>,
- )> {
- let (session_id, upload_writer) = object_store
- .put_multipart(location)
- .await
- .context(Storage)?;
- let aborter = Self {
- location,
- session_id,
- object_store,
- };
- Ok((aborter, upload_writer))
- }
-
- async fn abort(self) -> Result<()> {
- self.object_store
- .abort_multipart(self.location, &self.session_id)
- .await
- .context(Storage)
- }
-}
-
-async fn write_metadata<W>(
- mut meta_sink: W,
+async fn write_metadata(
+ meta_sink: MultiUploadWriter,
parquet_metadata: ParquetMetaData,
- meta_path: &object_store::Path,
-) -> writer::Result<usize>
-where
- W: AsyncWrite + Send + Unpin,
-{
+) -> Result<usize> {
let buf = encode_sst_meta_data(parquet_metadata).context(EncodePbData)?;
- let bytes = buf.as_bytes();
- let bytes_size = bytes.len();
- meta_sink.write_all(bytes).await.with_context(|| Io {
- file: meta_path.clone(),
- })?;
-
- meta_sink.shutdown().await.with_context(|| Io {
- file: meta_path.clone(),
- })?;
+ let buf_size = buf.len();
+ let mut uploader = meta_sink.multi_upload.lock().await;
+ uploader.put(buf);
+ uploader.finish().await.context(Storage)?;
- Ok(bytes_size)
+ Ok(buf_size)
}
-async fn multi_upload_abort(path: &Path, aborter:
ObjectStoreMultiUploadAborter<'_>) {
- // The uploading file will be leaked if failed to abort. A repair command
will
- // be provided to clean up the leaked files.
- if let Err(e) = aborter.abort().await {
- error!("Failed to abort multi-upload for sst:{}, err:{}", path, e);
+async fn multi_upload_abort(aborter: WriteMultipartRef) {
+ // The uploading file will be leaked if failed to abort. A repair command
+ // will be provided to clean up the leaked files.
+ if let Err(e) = aborter.lock().await.abort().await {
+ error!("Failed to abort multi-upload sst, err:{}", e);
}
}
@@ -476,7 +432,7 @@ impl<'a> SstWriter for ParquetSstWriter<'a> {
request_id: RequestId,
meta: &MetaData,
input: RecordBatchStream,
- ) -> writer::Result<SstInfo> {
+ ) -> Result<SstInfo> {
debug!(
"Build parquet file, request_id:{}, meta:{:?},
num_rows_per_row_group:{}",
request_id, meta, self.options.num_rows_per_row_group
@@ -491,8 +447,10 @@ impl<'a> SstWriter for ParquetSstWriter<'a> {
};
let group_writer = RecordBatchGroupWriter::new(request_id, input,
meta, write_options);
- let (aborter, sink) =
- ObjectStoreMultiUploadAborter::initialize_upload(self.store,
self.path).await?;
+ let sink = MultiUploadWriter::new(self.store, self.path)
+ .await
+ .context(Storage)?;
+ let aborter = sink.aborter();
let meta_path =
Path::from(sst_util::new_metadata_path(self.path.as_ref()));
@@ -500,19 +458,21 @@ impl<'a> SstWriter for ParquetSstWriter<'a> {
match group_writer.write_all(sink, &meta_path).await {
Ok(v) => v,
Err(e) => {
- multi_upload_abort(self.path, aborter).await;
+ multi_upload_abort(aborter).await;
return Err(e);
}
};
let time_range = parquet_metadata.time_range;
- let (meta_aborter, meta_sink) =
- ObjectStoreMultiUploadAborter::initialize_upload(self.store,
&meta_path).await?;
- let meta_size = match write_metadata(meta_sink, parquet_metadata,
&meta_path).await {
+ let meta_sink = MultiUploadWriter::new(self.store, &meta_path)
+ .await
+ .context(Storage)?;
+ let meta_aborter = meta_sink.aborter();
+ let meta_size = match write_metadata(meta_sink,
parquet_metadata).await {
Ok(v) => v,
Err(e) => {
- multi_upload_abort(self.path, aborter).await;
- multi_upload_abort(&meta_path, meta_aborter).await;
+ multi_upload_abort(aborter).await;
+ multi_upload_abort(meta_aborter).await;
return Err(e);
}
};
diff --git a/src/components/object_store/Cargo.toml
b/src/components/object_store/Cargo.toml
index 66c3437e..f9221e1d 100644
--- a/src/components/object_store/Cargo.toml
+++ b/src/components/object_store/Cargo.toml
@@ -59,7 +59,7 @@ table_kv = { workspace = true }
time_ext = { workspace = true }
tokio = { workspace = true }
twox-hash = "1.6"
-upstream = { package = "object_store", version = "0.5.6", features = [ "aws" ]
}
+upstream = { package = "object_store", version = "0.10.1", features = [ "aws"
] }
uuid = { version = "1.3.3", features = ["v4"] }
[dev-dependencies]
diff --git a/src/components/object_store/src/config.rs
b/src/components/object_store/src/config.rs
index 0fbae62b..d0ecbfb0 100644
--- a/src/components/object_store/src/config.rs
+++ b/src/components/object_store/src/config.rs
@@ -19,7 +19,6 @@ use std::time::Duration;
use serde::{Deserialize, Serialize};
use size_ext::ReadableSize;
-use table_kv::config::ObkvConfig;
use time_ext::ReadableDuration;
#[derive(Debug, Clone, Deserialize, Serialize)]
@@ -63,7 +62,6 @@ impl Default for StorageOptions {
pub enum ObjectStoreOptions {
Local(LocalOptions),
Aliyun(AliyunOptions),
- Obkv(ObkvOptions),
S3(S3Options),
}
@@ -85,39 +83,6 @@ pub struct AliyunOptions {
pub retry: RetryOptions,
}
-#[derive(Debug, Clone, Serialize, Deserialize)]
-pub struct ObkvOptions {
- pub prefix: String,
- #[serde(default = "ObkvOptions::default_shard_num")]
- pub shard_num: usize,
- #[serde(default = "ObkvOptions::default_part_size")]
- pub part_size: ReadableSize,
- #[serde(default = "ObkvOptions::default_max_object_size")]
- pub max_object_size: ReadableSize,
- #[serde(default = "ObkvOptions::default_upload_parallelism")]
- pub upload_parallelism: usize,
- /// Obkv client config
- pub client: ObkvConfig,
-}
-
-impl ObkvOptions {
- fn default_max_object_size() -> ReadableSize {
- ReadableSize::gb(1)
- }
-
- fn default_part_size() -> ReadableSize {
- ReadableSize::mb(1)
- }
-
- fn default_shard_num() -> usize {
- 512
- }
-
- fn default_upload_parallelism() -> usize {
- 8
- }
-}
-
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct S3Options {
pub region: String,
diff --git a/src/components/object_store/src/disk_cache.rs
b/src/components/object_store/src/disk_cache.rs
index b0c7ba13..33ab7776 100644
--- a/src/components/object_store/src/disk_cache.rs
+++ b/src/components/object_store/src/disk_cache.rs
@@ -40,12 +40,12 @@ use snafu::{ensure, Backtrace, ResultExt, Snafu};
use time_ext;
use tokio::{
fs::{self, File, OpenOptions},
- io::{AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt},
+ io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
sync::oneshot::{self, error::RecvError, Receiver},
};
use upstream::{
- path::Path, Error as ObjectStoreError, GetResult, ListResult, MultipartId,
ObjectMeta,
- ObjectStore, Result,
+ path::Path, Error as ObjectStoreError, GetOptions, GetResult, ListResult,
MultipartUpload,
+ ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload,
PutResult, Result,
};
use crate::metrics::{
@@ -828,20 +828,32 @@ impl Display for DiskCacheStore {
#[async_trait]
impl ObjectStore for DiskCacheStore {
- async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
- self.underlying_store.put(location, bytes).await
+ async fn put(&self, location: &Path, payload: PutPayload) ->
Result<PutResult> {
+ self.underlying_store.put(location, payload).await
}
- async fn put_multipart(
+ async fn put_opts(
&self,
location: &Path,
- ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+ payload: PutPayload,
+ opts: PutOptions,
+ ) -> Result<PutResult> {
+ self.underlying_store
+ .put_opts(location, payload, opts)
+ .await
+ }
+
+ async fn put_multipart(&self, location: &Path) -> Result<Box<dyn
MultipartUpload>> {
self.underlying_store.put_multipart(location).await
}
- async fn abort_multipart(&self, location: &Path, multipart_id:
&MultipartId) -> Result<()> {
+ async fn put_multipart_opts(
+ &self,
+ location: &Path,
+ opts: PutMultipartOpts,
+ ) -> Result<Box<dyn MultipartUpload>> {
self.underlying_store
- .abort_multipart(location, multipart_id)
+ .put_multipart_opts(location, opts)
.await
}
@@ -851,6 +863,10 @@ impl ObjectStore for DiskCacheStore {
self.underlying_store.get(location).await
}
+ async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
+ self.underlying_store.get_opts(location, options).await
+ }
+
async fn get_range(&self, location: &Path, range: Range<usize>) ->
Result<Bytes> {
let file_meta = self.fetch_file_meta(location).await?;
ensure!(
@@ -987,6 +1003,8 @@ impl ObjectStore for DiskCacheStore {
location: location.clone(),
last_modified: file_meta.last_modified,
size: file_meta.size,
+ e_tag: None,
+ version: None,
})
}
@@ -994,8 +1012,8 @@ impl ObjectStore for DiskCacheStore {
self.underlying_store.delete(location).await
}
- async fn list(&self, prefix: Option<&Path>) -> Result<BoxStream<'_,
Result<ObjectMeta>>> {
- self.underlying_store.list(prefix).await
+ fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>
{
+ self.underlying_store.list(prefix)
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
Result<ListResult> {
@@ -1068,7 +1086,7 @@ mod test {
let location = Path::from("out_of_range_test.sst");
let store = prepare_store(page_size, 32, 0, rt.clone()).await;
let buf = Bytes::from_static(data);
- store.inner.put(&location, buf.clone()).await.unwrap();
+ store.inner.put(&location, buf.into()).await.unwrap();
// Read one page out of range.
let res = store.inner.get_range(&location, 48..54).await;
@@ -1095,7 +1113,7 @@ mod test {
for _ in 0..4 {
buf.extend_from_slice(data);
}
- store.inner.put(&location, buf.freeze()).await.unwrap();
+ store.inner.put(&location, buf.freeze().into()).await.unwrap();
let testcases = vec![
(0..6, "a b c "),
@@ -1164,7 +1182,7 @@ mod test {
for _ in 0..4 {
buf.extend_from_slice(data);
}
- store.inner.put(&location, buf.freeze()).await.unwrap();
+ store.inner.put(&location, buf.freeze().into()).await.unwrap();
let testcases = [
(0..6, "a b c "),
@@ -1212,7 +1230,11 @@ mod test {
for _ in 0..4 {
buf.extend_from_slice(data);
}
- store.inner.put(&location, buf.freeze()).await.unwrap();
+ store
+ .inner
+ .put(&location, buf.freeze().into())
+ .await
+ .unwrap();
let _ = store.inner.get_range(&location, 0..16).await.unwrap();
let _ = store.inner.get_range(&location, 16..32).await.unwrap();
@@ -1247,7 +1269,11 @@ mod test {
for _ in 0..8 {
buf.extend_from_slice(data);
}
- store.inner.put(&location, buf.freeze()).await.unwrap();
+ store
+ .inner
+ .put(&location, buf.freeze().into())
+ .await
+ .unwrap();
// use seahash
// 0..16: partition 1
// 16..32 partition 1
@@ -1409,7 +1435,7 @@ mod test {
buf.extend_from_slice(data);
}
let buf = buf.freeze();
- store.put(&location, buf.clone()).await.unwrap();
+ store.put(&location, buf.clone().into()).await.unwrap();
let read_range = 16..100;
let bytes = store
.get_range(&location, read_range.clone())
@@ -1477,7 +1503,7 @@ mod test {
// Put data into store and get it to let the cache load the data.
store
- .put(&test_file_path, test_file_bytes.clone())
+ .put(&test_file_path, test_file_bytes.clone().into())
.await
.unwrap();
diff --git a/src/components/object_store/src/lib.rs
b/src/components/object_store/src/lib.rs
index 3436cdcf..350ccfa0 100644
--- a/src/components/object_store/src/lib.rs
+++ b/src/components/object_store/src/lib.rs
@@ -19,9 +19,11 @@
use std::sync::Arc;
+pub use multi_part::{ConcurrentMultipartUpload, MultiUploadWriter};
+use tokio::sync::Mutex;
pub use upstream::{
- local::LocalFileSystem, path::Path, Error as ObjectStoreError, GetResult,
ListResult,
- ObjectMeta, ObjectStore,
+ local::LocalFileSystem, path::Path, Error as ObjectStoreError, Error,
GetResult, ListResult,
+ ObjectMeta, ObjectStore, PutPayloadMut,
};
pub mod aliyun;
@@ -29,11 +31,13 @@ pub mod config;
pub mod disk_cache;
pub mod mem_cache;
pub mod metrics;
-pub mod multipart;
-pub mod obkv;
+mod multi_part;
pub mod prefix;
pub mod s3;
#[cfg(test)]
pub mod test_util;
pub type ObjectStoreRef = Arc<dyn ObjectStore>;
+
+// TODO: remove Mutex and make ConcurrentMultipartUpload thread-safe
+pub type WriteMultipartRef = Arc<Mutex<ConcurrentMultipartUpload>>;
diff --git a/src/components/object_store/src/mem_cache.rs
b/src/components/object_store/src/mem_cache.rs
index f602eee6..0fa8a912 100644
--- a/src/components/object_store/src/mem_cache.rs
+++ b/src/components/object_store/src/mem_cache.rs
@@ -34,10 +34,9 @@ use hash_ext::{ahash::RandomState,
build_fixed_seed_ahasher_builder};
use macros::define_result;
use partitioned_lock::PartitionedMutex;
use snafu::{OptionExt, Snafu};
-use tokio::io::AsyncWrite;
use upstream::{
- path::Path, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
- Result as ObjectStoreResult,
+ path::Path, GetOptions, GetResult, ListResult, MultipartUpload,
ObjectMeta, ObjectStore,
+ PutMultipartOpts, PutOptions, PutPayload, PutResult, Result as
ObjectStoreResult,
};
use crate::{
@@ -219,24 +218,32 @@ impl fmt::Debug for MemCacheStore {
#[async_trait]
impl ObjectStore for MemCacheStore {
- async fn put(&self, location: &Path, bytes: Bytes) ->
ObjectStoreResult<()> {
- self.underlying_store.put(location, bytes).await
+ async fn put(&self, location: &Path, payload: PutPayload) ->
ObjectStoreResult<PutResult> {
+ self.underlying_store.put(location, payload).await
}
- async fn put_multipart(
+ async fn put_opts(
&self,
location: &Path,
- ) -> ObjectStoreResult<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+ payload: PutPayload,
+ opts: PutOptions,
+ ) -> ObjectStoreResult<PutResult> {
+ self.underlying_store
+ .put_opts(location, payload, opts)
+ .await
+ }
+
+ async fn put_multipart(&self, location: &Path) ->
ObjectStoreResult<Box<dyn MultipartUpload>> {
self.underlying_store.put_multipart(location).await
}
- async fn abort_multipart(
+ async fn put_multipart_opts(
&self,
location: &Path,
- multipart_id: &MultipartId,
- ) -> ObjectStoreResult<()> {
+ opts: PutMultipartOpts,
+ ) -> ObjectStoreResult<Box<dyn MultipartUpload>> {
self.underlying_store
- .abort_multipart(location, multipart_id)
+ .put_multipart_opts(location, opts)
.await
}
@@ -247,6 +254,10 @@ impl ObjectStore for MemCacheStore {
self.underlying_store.get(location).await
}
+ async fn get_opts(&self, location: &Path, options: GetOptions) ->
ObjectStoreResult<GetResult> {
+ self.underlying_store.get_opts(location, options).await
+ }
+
async fn get_range(&self, location: &Path, range: Range<usize>) ->
ObjectStoreResult<Bytes> {
if self.readonly_cache {
self.get_range_with_ro_cache(location, range).await
@@ -263,11 +274,8 @@ impl ObjectStore for MemCacheStore {
self.underlying_store.delete(location).await
}
- async fn list(
- &self,
- prefix: Option<&Path>,
- ) -> ObjectStoreResult<BoxStream<'_, ObjectStoreResult<ObjectMeta>>> {
- self.underlying_store.list(prefix).await
+ fn list(&self, prefix: Option<&Path>) -> BoxStream<'_,
ObjectStoreResult<ObjectMeta>> {
+ self.underlying_store.list(prefix)
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
ObjectStoreResult<ListResult> {
@@ -307,7 +315,7 @@ mod test {
// write date
let location = Path::from("1.sst");
store
- .put(&location, Bytes::from_static(&[1; 1024]))
+ .put(&location, Bytes::from_static(&[1; 1024]).into())
.await
.unwrap();
@@ -358,7 +366,7 @@ mod test {
let store = prepare_store(2, 100);
let location = Path::from("partition.sst");
store
- .put(&location, Bytes::from_static(&[1; 1024]))
+ .put(&location, Bytes::from_static(&[1; 1024]).into())
.await
.unwrap();
diff --git a/src/components/object_store/src/metrics.rs
b/src/components/object_store/src/metrics.rs
index 8000a9ac..2847d2bf 100644
--- a/src/components/object_store/src/metrics.rs
+++ b/src/components/object_store/src/metrics.rs
@@ -27,10 +27,9 @@ use prometheus::{
};
use prometheus_static_metric::make_static_metric;
use runtime::Runtime;
-use tokio::io::AsyncWrite;
use upstream::{
- path::Path, Error as StoreError, GetResult, ListResult, MultipartId,
ObjectMeta, ObjectStore,
- Result,
+ path::Path, Error as StoreError, GetOptions, GetResult, ListResult,
MultipartUpload,
+ ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload,
PutResult, Result,
};
use crate::ObjectStoreRef;
@@ -39,9 +38,12 @@ make_static_metric! {
pub struct ObjectStoreDurationHistogram: Histogram {
"op" => {
put,
+ put_opts,
put_multipart,
+ put_multipart_opts,
abort_multipart,
get,
+ get_opts,
get_range,
get_ranges,
head,
@@ -58,6 +60,7 @@ make_static_metric! {
pub struct ObjectStoreThroughputHistogram: Histogram {
"op" => {
put,
+ put_opts,
get_range,
get_ranges,
},
@@ -142,16 +145,16 @@ impl Display for StoreWithMetrics {
#[async_trait]
impl ObjectStore for StoreWithMetrics {
- async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+ async fn put(&self, location: &Path, payload: PutPayload) ->
Result<PutResult> {
let _timer = OBJECT_STORE_DURATION_HISTOGRAM.put.start_timer();
OBJECT_STORE_THROUGHPUT_HISTOGRAM
.put
- .observe(bytes.len() as f64);
+ .observe(payload.content_length() as f64);
let loc = location.clone();
let store = self.store.clone();
self.runtime
- .spawn(async move { store.put(&loc, bytes).await })
+ .spawn(async move { store.put(&loc, payload).await })
.await
.map_err(|source| StoreError::Generic {
store: METRICS,
@@ -159,10 +162,29 @@ impl ObjectStore for StoreWithMetrics {
})?
}
- async fn put_multipart(
+ async fn put_opts(
&self,
location: &Path,
- ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+ payload: PutPayload,
+ opts: PutOptions,
+ ) -> Result<PutResult> {
+ let _timer = OBJECT_STORE_DURATION_HISTOGRAM.put_opts.start_timer();
+ OBJECT_STORE_THROUGHPUT_HISTOGRAM
+ .put_opts
+ .observe(payload.content_length() as f64);
+
+ let loc = location.clone();
+ let store = self.store.clone();
+ self.runtime
+ .spawn(async move { store.put_opts(&loc, payload, opts).await })
+ .await
+ .map_err(|source| StoreError::Generic {
+ store: METRICS,
+ source: Box::new(source),
+ })?
+ }
+
+ async fn put_multipart(&self, location: &Path) -> Result<Box<dyn
MultipartUpload>> {
let _timer =
OBJECT_STORE_DURATION_HISTOGRAM.put_multipart.start_timer();
let instant = Instant::now();
@@ -187,11 +209,35 @@ impl ObjectStore for StoreWithMetrics {
res
}
- async fn abort_multipart(&self, location: &Path, multipart_id:
&MultipartId) -> Result<()> {
+ async fn put_multipart_opts(
+ &self,
+ location: &Path,
+ opts: PutMultipartOpts,
+ ) -> Result<Box<dyn MultipartUpload>> {
let _timer = OBJECT_STORE_DURATION_HISTOGRAM
- .abort_multipart
+ .put_multipart_opts
.start_timer();
- self.store.abort_multipart(location, multipart_id).await
+
+ let instant = Instant::now();
+ let loc = location.clone();
+ let store = self.store.clone();
+ let res = self
+ .runtime
+ .spawn(async move { store.put_multipart_opts(&loc, opts).await })
+ .await
+ .map_err(|source| StoreError::Generic {
+ store: METRICS,
+ source: Box::new(source),
+ })?;
+
+ trace!(
+ "Object store with metrics put_multipart_opts cost:{}ms,
location:{}, thread:{}-{:?}",
+ instant.elapsed().as_millis(),
+ location,
+ thread::current().name().unwrap_or("noname").to_string(),
+ thread::current().id()
+ );
+ res
}
async fn get(&self, location: &Path) -> Result<GetResult> {
@@ -207,6 +253,19 @@ impl ObjectStore for StoreWithMetrics {
})?
}
+ async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
+ let _timer = OBJECT_STORE_DURATION_HISTOGRAM.get_opts.start_timer();
+ let store = self.store.clone();
+ let loc = location.clone();
+ self.runtime
+ .spawn(async move { store.get_opts(&loc, options).await })
+ .await
+ .map_err(|source| StoreError::Generic {
+ store: METRICS,
+ source: Box::new(source),
+ })?
+ }
+
async fn get_range(&self, location: &Path, range: Range<usize>) ->
Result<Bytes> {
let _timer = OBJECT_STORE_DURATION_HISTOGRAM.get_range.start_timer();
@@ -292,9 +351,9 @@ impl ObjectStore for StoreWithMetrics {
})?
}
- async fn list(&self, prefix: Option<&Path>) -> Result<BoxStream<'_,
Result<ObjectMeta>>> {
+ fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>
{
let _timer = OBJECT_STORE_DURATION_HISTOGRAM.list.start_timer();
- self.store.list(prefix).await
+ self.store.list(prefix)
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
Result<ListResult> {
diff --git a/src/components/object_store/src/multi_part.rs
b/src/components/object_store/src/multi_part.rs
new file mode 100644
index 00000000..871ffe2a
--- /dev/null
+++ b/src/components/object_store/src/multi_part.rs
@@ -0,0 +1,221 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+ io::Error as IoError,
+ pin::Pin,
+ sync::Arc,
+ task::{Context, Poll},
+};
+
+use bytes::Bytes;
+use futures::{future::BoxFuture, ready, Future, FutureExt};
+use tokio::{io::AsyncWrite, sync::Mutex, task::JoinSet};
+pub use upstream::PutPayloadMut;
+use upstream::{path::Path, Error, MultipartUpload, PutPayload, PutResult};
+
+use crate::{ObjectStoreRef, WriteMultipartRef};
+
+#[derive(Debug)]
+pub struct ConcurrentMultipartUpload {
+ upload: Box<dyn MultipartUpload>,
+
+ buffer: PutPayloadMut,
+
+ chunk_size: usize,
+
+ tasks: JoinSet<Result<(), Error>>,
+}
+
+impl ConcurrentMultipartUpload {
+ pub fn new(upload: Box<dyn MultipartUpload>, chunk_size: usize) -> Self {
+ Self {
+ upload,
+ chunk_size,
+ buffer: PutPayloadMut::new(),
+ tasks: Default::default(),
+ }
+ }
+
+ pub fn poll_tasks(
+ &mut self,
+ cx: &mut Context<'_>,
+ max_concurrency: usize,
+ ) -> Poll<Result<(), Error>> {
+ while !self.tasks.is_empty() && self.tasks.len() >= max_concurrency {
+ ready!(self.tasks.poll_join_next(cx)).unwrap()??
+ }
+ Poll::Ready(Ok(()))
+ }
+
+ fn put_part(&mut self, part: PutPayload) {
+ self.tasks.spawn(self.upload.put_part(part));
+ }
+
+ pub fn put(&mut self, mut bytes: Bytes) {
+ while !bytes.is_empty() {
+ let remaining = self.chunk_size - self.buffer.content_length();
+ if bytes.len() < remaining {
+ self.buffer.push(bytes);
+ return;
+ }
+ self.buffer.push(bytes.split_to(remaining));
+ let buffer = std::mem::take(&mut self.buffer);
+ self.put_part(buffer.into())
+ }
+ }
+
+ pub fn write(&mut self, mut buf: &[u8]) {
+ while !buf.is_empty() {
+ let remaining = self.chunk_size - self.buffer.content_length();
+ let to_read = buf.len().min(remaining);
+ self.buffer.extend_from_slice(&buf[..to_read]);
+ if to_read == remaining {
+ let buffer = std::mem::take(&mut self.buffer);
+ self.put_part(buffer.into())
+ }
+ buf = &buf[to_read..]
+ }
+ }
+
+ pub async fn flush(&mut self, max_concurrency: usize) -> Result<(), Error>
{
+ futures::future::poll_fn(|cx| self.poll_tasks(cx,
max_concurrency)).await
+ }
+
+ pub async fn finish(&mut self) -> Result<PutResult, Error> {
+ if !self.buffer.is_empty() {
+ let part = std::mem::take(&mut self.buffer);
+ self.put_part(part.into())
+ }
+
+ self.flush(0).await?;
+ self.upload.complete().await
+ }
+
+ pub async fn abort(&mut self) -> Result<(), Error> {
+ self.tasks.shutdown().await;
+ self.upload.abort().await
+ }
+}
+
+pub struct MultiUploadWriter {
+ pub multi_upload: WriteMultipartRef,
+ upload_task: Option<BoxFuture<'static, std::result::Result<usize,
IoError>>>,
+ flush_task: Option<BoxFuture<'static, std::result::Result<(), IoError>>>,
+ completion_task: Option<BoxFuture<'static, std::result::Result<(),
IoError>>>,
+}
+
+const CHUNK_SIZE: usize = 5 * 1024 * 1024;
+const MAX_CONCURRENCY: usize = 10;
+
+impl<'a> MultiUploadWriter {
+ pub async fn new(object_store: &'a ObjectStoreRef, location: &'a Path) ->
Result<Self, Error> {
+ let upload_writer = object_store.put_multipart(location).await?;
+
+ let multi_upload = Arc::new(Mutex::new(ConcurrentMultipartUpload::new(
+ upload_writer,
+ CHUNK_SIZE,
+ )));
+
+ let multi_upload = Self {
+ multi_upload,
+ upload_task: None,
+ flush_task: None,
+ completion_task: None,
+ };
+
+ Ok(multi_upload)
+ }
+
+ pub fn aborter(&self) -> WriteMultipartRef {
+ self.multi_upload.clone()
+ }
+}
+
+impl AsyncWrite for MultiUploadWriter {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ buf: &[u8],
+ ) -> Poll<Result<usize, IoError>> {
+ let multi_upload = self.multi_upload.clone();
+ let buf = buf.to_owned();
+
+ let upload_task = self.upload_task.insert(
+ async move {
+ multi_upload
+ .lock()
+ .await
+ .flush(MAX_CONCURRENCY)
+ .await
+ .map_err(IoError::other)?;
+
+ multi_upload.lock().await.write(&buf);
+ Ok(buf.len())
+ }
+ .boxed(),
+ );
+
+ Pin::new(upload_task).poll(cx)
+ }
+
+ fn poll_flush(
+ mut self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> Poll<Result<(), IoError>> {
+ let multi_upload = self.multi_upload.clone();
+
+ let flush_task = self.flush_task.insert(
+ async move {
+ multi_upload
+ .lock()
+ .await
+ .flush(0)
+ .await
+ .map_err(IoError::other)?;
+
+ Ok(())
+ }
+ .boxed(),
+ );
+
+ Pin::new(flush_task).poll(cx)
+ }
+
+ fn poll_shutdown(
+ mut self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> Poll<Result<(), IoError>> {
+ let multi_upload = self.multi_upload.clone();
+
+ let completion_task = self.completion_task.get_or_insert_with(|| {
+ async move {
+ multi_upload
+ .lock()
+ .await
+ .finish()
+ .await
+ .map_err(IoError::other)?;
+
+ Ok(())
+ }
+ .boxed()
+ });
+
+ Pin::new(completion_task).poll(cx)
+ }
+}
diff --git a/src/components/object_store/src/multipart.rs
b/src/components/object_store/src/multipart.rs
deleted file mode 100644
index cb5c7d7e..00000000
--- a/src/components/object_store/src/multipart.rs
+++ /dev/null
@@ -1,280 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Implement multipart upload of [ObjectStore](upstream::ObjectStore), and
most
-//! of the codes are forked from
`arrow-rs`:https://github.com/apache/arrow-rs/blob/master/object_store/src/multipart.rs
-
-use std::{io, pin::Pin, sync::Arc, task::Poll};
-
-use async_trait::async_trait;
-use futures::{stream::FuturesUnordered, Future, StreamExt};
-use tokio::io::AsyncWrite;
-use upstream::Result;
-
-type BoxedTryFuture<T> = Pin<Box<dyn Future<Output = Result<T, io::Error>> +
Send>>;
-
-/// A trait that can be implemented by cloud-based object stores
-/// and used in combination with [`CloudMultiPartUpload`] to provide
-/// multipart upload support.
-#[async_trait]
-pub trait CloudMultiPartUploadImpl: 'static {
- /// Upload a single part
- async fn put_multipart_part(
- &self,
- buf: Vec<u8>,
- part_idx: usize,
- ) -> Result<UploadPart, io::Error>;
-
- /// Complete the upload with the provided parts
- ///
- /// `completed_parts` is in order of part number
- async fn complete(&self, completed_parts: Vec<UploadPart>) -> Result<(),
io::Error>;
-}
-
-#[derive(Debug, Clone)]
-pub struct UploadPart {
- pub content_id: String,
-}
-
-pub struct CloudMultiPartUpload<T>
-where
- T: CloudMultiPartUploadImpl,
-{
- inner: Arc<T>,
- /// A list of completed parts, in sequential order.
- completed_parts: Vec<Option<UploadPart>>,
- /// Part upload tasks currently running.
- ///
- /// Every task uploads data with `part_size` to objectstore.
- tasks: FuturesUnordered<BoxedTryFuture<(usize, UploadPart)>>,
- /// Maximum number of upload tasks to run concurrently
- max_concurrency: usize,
- /// Buffer that will be sent in next upload.
- ///
- /// TODO: Maybe we can use a list of Vec<u8> to ensure every buffer is
- /// aligned with the part_size to avoid any extra copy in the future.
- current_buffer: Vec<u8>,
- /// Size of a part in bytes, size of last part may be smaller than
- /// `part_size`.
- part_size: usize,
- /// Index of current part
- current_part_idx: usize,
- /// The completion task
- completion_task: Option<BoxedTryFuture<()>>,
-}
-
-impl<T> CloudMultiPartUpload<T>
-where
- T: CloudMultiPartUploadImpl,
-{
- pub fn new(inner: T, max_concurrency: usize, part_size: usize) -> Self {
- Self {
- inner: Arc::new(inner),
- completed_parts: Vec::new(),
- tasks: FuturesUnordered::new(),
- max_concurrency,
- current_buffer: Vec::new(),
- part_size,
- current_part_idx: 0,
- completion_task: None,
- }
- }
-
- pub fn poll_tasks(
- mut self: Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> Result<(), io::Error> {
- if self.tasks.is_empty() {
- return Ok(());
- }
- while let Poll::Ready(Some(res)) = self.tasks.poll_next_unpin(cx) {
- let (part_idx, part) = res?;
- let total_parts = self.completed_parts.len();
- self.completed_parts
- .resize(std::cmp::max(part_idx + 1, total_parts), None);
- self.completed_parts[part_idx] = Some(part);
- }
- Ok(())
- }
-}
-
-/// **Note: Methods in this impl are added by horaedb, not included in the
-/// `object_store` crate.**
-impl<T> CloudMultiPartUpload<T>
-where
- T: CloudMultiPartUploadImpl + Send + Sync,
-{
- /// Send all buffer data to object store in final stage.
- fn final_flush_buffer(mut self: Pin<&mut Self>) {
- while !self.current_buffer.is_empty() {
- let size = self.part_size.min(self.current_buffer.len());
- let out_buffer =
self.current_buffer.drain(0..size).collect::<Vec<_>>();
-
- self.as_mut().submit_part_upload_task(out_buffer);
- }
- }
-
- /// Send buffer data to object store in write stage.
- fn flush_buffer(mut self: Pin<&mut Self>) {
- let part_size = self.part_size;
-
- // We will continuously submit tasks until size of the buffer is
smaller than
- // `part_size`.
- while self.current_buffer.len() >= part_size {
- let out_buffer =
self.current_buffer.drain(0..part_size).collect::<Vec<_>>();
- self.as_mut().submit_part_upload_task(out_buffer);
- }
- }
-
- fn submit_part_upload_task(mut self: Pin<&mut Self>, out_buffer: Vec<u8>) {
- let inner = Arc::clone(&self.inner);
- let part_idx = self.current_part_idx;
- self.tasks.push(Box::pin(async move {
- let upload_part = inner.put_multipart_part(out_buffer,
part_idx).await?;
-
- Ok((part_idx, upload_part))
- }));
- self.current_part_idx += 1;
- }
-}
-
-/// The process of ObjectStore write multipart upload is:
-/// - Obtain a `AsyncWrite` by `ObjectStore::multi_upload` to begin multipart
-/// upload;
-/// - Write all the data parts by `AsyncWrite::poll_write`;
-/// - Call `AsyncWrite::poll_shutdown` to finish current mulipart upload;
-///
-/// The `multi_upload` is used in
-/// [`analytic_engine::sst::parquet::writer::ParquetSstWriter::write`].
-impl<T> CloudMultiPartUpload<T>
-where
- T: CloudMultiPartUploadImpl + Send + Sync,
-{
- /// Compared with `poll_flush` which only flushes the in-progress tasks,
- /// `final_flush` is called during `poll_shutdown`, and will flush the
- /// `current_buffer` along with in-progress tasks.
- fn final_flush(
- mut self: Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> Poll<Result<(), io::Error>> {
- // Poll current tasks
- self.as_mut().poll_tasks(cx)?;
-
- // If current_buffer is not empty, see if it can be submitted
- if self.tasks.len() < self.max_concurrency {
- self.as_mut().final_flush_buffer();
- }
-
- self.as_mut().poll_tasks(cx)?;
-
- // If tasks and current_buffer are empty, return Ready
- if self.tasks.is_empty() && self.current_buffer.is_empty() {
- Poll::Ready(Ok(()))
- } else {
- Poll::Pending
- }
- }
-}
-
-impl<T> AsyncWrite for CloudMultiPartUpload<T>
-where
- T: CloudMultiPartUploadImpl + Send + Sync,
-{
- fn poll_write(
- mut self: Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- buf: &[u8],
- ) -> Poll<Result<usize, io::Error>> {
- // Poll current tasks
- self.as_mut().poll_tasks(cx)?;
-
- // If adding buf to pending buffer would trigger send, check
- // whether we have capacity for another task.
- let enough_to_send = (buf.len() + self.current_buffer.len()) >=
self.part_size;
- // The current buffer is not enough to send.
- if !enough_to_send {
- self.current_buffer.extend_from_slice(buf);
- return Poll::Ready(Ok(buf.len()));
- }
-
- if self.tasks.len() < self.max_concurrency {
- // If we do, copy into the buffer and submit the task, and return
ready.
- self.current_buffer.extend_from_slice(buf);
- // Flush buffer data, use custom method
- self.as_mut().flush_buffer();
- // We need to poll immediately after adding to setup waker
- self.as_mut().poll_tasks(cx)?;
-
- Poll::Ready(Ok(buf.len()))
- } else {
- // Waker registered by call to poll_tasks at beginning
- Poll::Pending
- }
- }
-
- fn poll_flush(
- mut self: Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> Poll<Result<(), io::Error>> {
- // Poll current tasks
- self.as_mut().poll_tasks(cx)?;
-
- // If tasks is empty, return Ready
- if self.tasks.is_empty() {
- Poll::Ready(Ok(()))
- } else {
- Poll::Pending
- }
- }
-
- fn poll_shutdown(
- mut self: Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> Poll<Result<(), io::Error>> {
- // First, poll flush all buffer data to object store.
- match self.as_mut().final_flush(cx) {
- Poll::Pending => return Poll::Pending,
- Poll::Ready(res) => res?,
- };
-
- // If shutdown task is not set, set it.
- let parts = std::mem::take(&mut self.completed_parts);
- let parts = parts
- .into_iter()
- .enumerate()
- .map(|(idx, part)| {
- part.ok_or_else(|| {
- io::Error::new(
- io::ErrorKind::Other,
- format!("Missing information for upload part {idx}"),
- )
- })
- })
- .collect::<Result<_, _>>()?;
-
- let inner = Arc::clone(&self.inner);
- // Last, do completion task in inner.
- let completion_task = self.completion_task.get_or_insert_with(|| {
- Box::pin(async move {
- inner.complete(parts).await?;
- Ok(())
- })
- });
-
- Pin::new(completion_task).poll(cx)
- }
-}
diff --git a/src/components/object_store/src/obkv/meta.rs
b/src/components/object_store/src/obkv/meta.rs
deleted file mode 100644
index ad97f9f6..00000000
--- a/src/components/object_store/src/obkv/meta.rs
+++ /dev/null
@@ -1,437 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::{ops::Range, str, sync::Arc, time};
-
-use generic_error::{BoxError, GenericError};
-use macros::define_result;
-use serde::{Deserialize, Serialize};
-use snafu::{ensure, Backtrace, ResultExt, Snafu};
-use table_kv::{ScanContext, ScanIter, TableKv, WriteBatch, WriteContext};
-use upstream::{path::Path, Error as StoreError, Result as StoreResult};
-
-use crate::obkv::{util, OBKV};
-
-pub const HEADER: u8 = 0x00_u8;
-
-pub const SCAN_TIMEOUT_SECS: u64 = 10;
-
-pub const SCAN_BATCH_SIZE: i32 = 1000;
-
-#[derive(Debug, Snafu)]
-pub enum Error {
- #[snafu(display("Invalid utf8 string,
err:{source}.\nBacktrace:\n{backtrace}"))]
- InvalidUtf8 {
- source: std::str::Utf8Error,
- backtrace: Backtrace,
- },
-
- #[snafu(display("Invalid json, err:{source},
json:{json}.\nBacktrace:\n{backtrace}"))]
- InvalidJson {
- json: String,
- source: serde_json::Error,
- backtrace: Backtrace,
- },
-
- #[snafu(display("Failed to encode json,
err:{source}.\nBacktrace:\n{backtrace}"))]
- EncodeJson {
- source: serde_json::Error,
- backtrace: Backtrace,
- },
-
- #[snafu(display("Failed to save meta, location:{location}, err:{source}"))]
- SaveMeta {
- location: String,
- source: GenericError,
- },
-
- #[snafu(display("Failed to delete meta, location:{location},
err:{source}"))]
- DeleteMeta {
- location: String,
- source: GenericError,
- },
-
- #[snafu(display("Failed to read meta, location:{location}, err:{source}"))]
- ReadMeta {
- location: String,
- source: GenericError,
- },
-
- #[snafu(display(
- "Invalid header found, header:{header},
expect:{expect}.\nBacktrace:\n{backtrace}"
- ))]
- InvalidHeader {
- header: u8,
- expect: u8,
- backtrace: Backtrace,
- },
-
- #[snafu(display(
- "Out of range occurs, end:{end},
object_size:{object_size}.\nBacktrace:\n{backtrace}"
- ))]
- OutOfRange {
- end: usize,
- object_size: usize,
- backtrace: Backtrace,
- },
-}
-
-define_result!(Error);
-
-pub const OBJECT_STORE_META: &str = "obkv_object_store_meta";
-
-/// The meta info of Obkv Object
-///
-/// **WARN: Do not change the field name, may lead to breaking changes!**
-#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
-#[serde(default)]
-pub struct ObkvObjectMeta {
- /// The full path to the object
- #[serde(rename = "location")]
- pub location: String,
- /// The last modified time in ms
- #[serde(rename = "last_modified")]
- pub last_modified: i64,
- /// The size in bytes of the object
- #[serde(rename = "size")]
- pub size: usize,
- /// The unique identifier for the object; For Obkv, it is composed with
- /// table_name @ path @ upload_id
- #[serde(rename = "unique_id")]
- pub unique_id: Option<String>,
- /// The size in bytes of one part. Note: maybe the size of last part less
- /// than part_size.
- #[serde(rename = "part_size")]
- pub part_size: usize,
- /// The paths of multi upload parts.
- #[serde(rename = "parts")]
- pub parts: Vec<String>,
- /// The version of object, Now we use the upload_id as version.
- #[serde(rename = "version")]
- pub version: String,
-}
-
-impl ObkvObjectMeta {
- #[inline]
- pub fn decode(data: &[u8]) -> Result<Self> {
- ensure!(
- data[0] == HEADER,
- InvalidHeader {
- header: data[0],
- expect: HEADER,
- }
- );
- let json = str::from_utf8(&data[1..]).context(InvalidUtf8)?;
- serde_json::from_str(json).context(InvalidJson { json })
- }
-
- #[inline]
- pub fn encode(&self) -> Result<Vec<u8>> {
- let size = self.estimate_size_of_json();
- let mut encode_bytes = Vec::with_capacity(size + 1);
- encode_bytes.push(HEADER);
- serde_json::to_writer(&mut encode_bytes, self).context(EncodeJson)?;
- Ok(encode_bytes)
- }
-
- /// Estimate the json string size of ObkvObjectMeta
- #[inline]
- pub fn estimate_size_of_json(&self) -> usize {
- // {}
- let mut size = 2;
- // size of key name, `,`, `""` and `:`
- size += (8 + 13 + 4 + 9 + 9 + 5 + 7) + 4 * 7;
- size += self.location.len() + 2;
- // last_modified
- size += 8;
- // size
- size += 8;
- // unique_id
- if let Some(id) = &self.unique_id {
- size += id.len() + 2;
- } else {
- size += 4;
- }
- // part_size
- size += 8;
- // parts
- for part in &self.parts {
- // part.len, `""`, `:`, and `,`
- size += part.len() + 4;
- }
- //{}
- size += 2;
- // version
- size += self.version.len();
- size
- }
-
- /// Compute the convered parts based on given range parameter
- pub fn compute_covered_parts(&self, range: Range<usize>) ->
Result<Option<ConveredParts>> {
- ensure!(
- range.end <= self.size,
- OutOfRange {
- end: range.end,
- object_size: self.size,
- }
- );
-
- // if the range is empty, return empty parts
- if range.is_empty() {
- return Ok(None);
- }
-
- let batch_size = self.part_size;
- let start_index = range.start / batch_size;
- let start_offset = range.start % batch_size;
-
- let inclusive_end = range.end - 1;
-
- let end_index = inclusive_end / batch_size;
- let end_offset = inclusive_end % batch_size;
-
- Ok(Some(ConveredParts {
- part_keys: &self.parts[start_index..=end_index],
- start_offset,
- end_offset,
- }))
- }
-}
-
-#[derive(Debug, Clone)]
-pub struct ConveredParts<'a> {
- /// The table kv client
- pub part_keys: &'a [String],
- pub start_offset: usize,
- pub end_offset: usize,
-}
-
-#[derive(Debug, Clone)]
-pub struct MetaManager<T> {
- /// The table kv client
- pub client: Arc<T>,
-}
-
-impl<T: TableKv> std::fmt::Display for MetaManager<T> {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "ObjectStore-Obkv-MetaManager({:?})", self.client)?;
- Ok(())
- }
-}
-
-impl<T: TableKv> MetaManager<T> {
- pub async fn save(&self, meta: ObkvObjectMeta) -> Result<()> {
- let mut batch = T::WriteBatch::default();
- let encode_bytes = meta.encode()?;
- batch.insert_or_update(meta.location.as_bytes(), &encode_bytes);
- self.client
- .as_ref()
- .write(WriteContext::default(), OBJECT_STORE_META, batch)
- .box_err()
- .with_context(|| SaveMeta {
- location: meta.location,
- })?;
- Ok(())
- }
-
- pub async fn read(&self, location: &Path) ->
Result<Option<ObkvObjectMeta>> {
- let value = self
- .client
- .as_ref()
- .get(OBJECT_STORE_META, location.as_ref().as_bytes())
- .box_err()
- .context(ReadMeta {
- location: location.as_ref().to_string(),
- })?;
-
- value.map(|v| ObkvObjectMeta::decode(&v)).transpose()
- }
-
- pub async fn delete(&self, meta: ObkvObjectMeta, location: &Path) ->
Result<()> {
- self.client
- .as_ref()
- .delete(OBJECT_STORE_META, location.as_ref().as_bytes())
- .box_err()
- .context(DeleteMeta {
- location: meta.location,
- })?;
-
- Ok(())
- }
-
- pub async fn delete_with_version(&self, location: &Path, version: &str) ->
Result<()> {
- let meta_result = self.read(location).await?;
- if let Some(meta) = meta_result {
- if meta.version == version {
- self.delete(meta, location).await?;
- }
- }
- Ok(())
- }
-
- pub async fn list(&self, prefix: &Path) ->
StoreResult<Vec<ObkvObjectMeta>, std::io::Error> {
- let scan_context: ScanContext = ScanContext {
- timeout: time::Duration::from_secs(SCAN_TIMEOUT_SECS),
- batch_size: SCAN_BATCH_SIZE,
- };
-
- let scan_request =
util::scan_request_with_prefix(prefix.as_ref().as_bytes());
-
- let mut iter = self
- .client
- .scan(scan_context, OBJECT_STORE_META, scan_request)
- .map_err(|source| StoreError::Generic {
- store: OBKV,
- source: Box::new(source),
- })?;
-
- let mut metas = vec![];
- while iter.valid() {
- let value = iter.value();
- let meta = ObkvObjectMeta::decode(value).map_err(|source|
StoreError::Generic {
- store: OBKV,
- source: Box::new(source),
- })?;
- metas.push(meta);
- iter.next().map_err(|source| StoreError::Generic {
- store: OBKV,
- source: Box::new(source),
- })?;
- }
- Ok(metas)
- }
-}
-
-#[cfg(test)]
-mod test {
-
- use std::ops::Range;
-
- use crate::obkv::meta::ObkvObjectMeta;
-
- #[test]
- fn test_estimate_size() {
- let meta = build_test_meta0();
-
- let expect = meta.estimate_size_of_json();
- let json = &serde_json::to_string(&meta).unwrap();
- let real = json.len();
- println!("expect:{expect},real:{real}");
- assert!(expect.abs_diff(real) as f32 / (real as f32) < 0.1);
- }
-
- #[test]
- fn test_compute_convered_parts() {
- let meta = build_test_meta0();
-
- let range1 = Range { start: 0, end: 1 };
- let expect = meta.compute_covered_parts(range1).unwrap().unwrap();
- assert!(expect.part_keys.len() == 1);
- assert!(expect.start_offset == 0);
- assert!(expect.end_offset == 0);
-
- let range1 = Range {
- start: 0,
- end: 1024,
- };
- let expect = meta.compute_covered_parts(range1).unwrap().unwrap();
- assert!(expect.part_keys.len() == 1);
- assert!(expect.start_offset == 0);
- assert!(expect.end_offset == 1023);
-
- let range1 = Range {
- start: 0,
- end: 8190,
- };
- let expect = meta.compute_covered_parts(range1).unwrap().unwrap();
- assert!(expect.part_keys.len() == 8);
- assert!(expect.start_offset == 0);
- assert!(expect.end_offset == 1021);
-
- let range1 = Range {
- start: 1023,
- end: 1025,
- };
- let expect = meta.compute_covered_parts(range1).unwrap().unwrap();
- assert!(expect.part_keys.len() == 2);
- assert!(expect.start_offset == 1023);
- assert!(expect.end_offset == 0);
-
- let range1 = Range {
- start: 8189,
- end: 8190,
- };
- let expect = meta.compute_covered_parts(range1).unwrap().unwrap();
- assert!(expect.part_keys.len() == 1);
- assert!(expect.start_offset == 1021);
- assert!(expect.end_offset == 1021);
-
- let range1 = Range {
- start: 8189,
- end: 8199,
- };
- let expect = meta.compute_covered_parts(range1);
- assert!(expect.is_err());
-
- let meta = build_test_meta1();
- let range1 = Range {
- start: 0,
- end: 1024,
- };
- let expect = meta.compute_covered_parts(range1).unwrap().unwrap();
- assert!(expect.part_keys.len() == 1);
- assert!(expect.start_offset == 0);
- assert!(expect.end_offset == 1023);
-
- let range1 = Range { start: 0, end: 0 };
- let expect = meta.compute_covered_parts(range1).unwrap();
- assert!(expect.is_none());
- }
-
- fn build_test_meta0() -> ObkvObjectMeta {
- ObkvObjectMeta {
- location:
String::from("/test/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxfdsfjlajflk"),
- last_modified: 123456789,
- size: 8190,
- unique_id: Some(String::from("1245689u438uferjalfjkda")),
- part_size: 1024,
- parts: vec![
- String::from("/test/xx/0"),
- String::from("/test/xx/1"),
- String::from("/test/xx/4"),
- String::from("/test/xx/5"),
- String::from("/test/xx/0"),
- String::from("/test/xx/1"),
- String::from("/test/xx/4"),
- String::from("/test/xx/5"),
- ],
- version: String::from("123456fsdalfkassa;l;kjfaklasadffsd"),
- }
- }
-
- fn build_test_meta1() -> ObkvObjectMeta {
- ObkvObjectMeta {
- location:
String::from("/test/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxfdsfjlajflk"),
- last_modified: 123456789,
- size: 1024,
- unique_id: Some(String::from("1245689u438uferjalfjkda")),
- part_size: 1024,
- parts: vec![String::from("/test/xx/0")],
- version: String::from("123456fsdalfkassa;l;kjfaklasadffsd"),
- }
- }
-}
diff --git a/src/components/object_store/src/obkv/mod.rs
b/src/components/object_store/src/obkv/mod.rs
deleted file mode 100644
index ecff9ea0..00000000
--- a/src/components/object_store/src/obkv/mod.rs
+++ /dev/null
@@ -1,1015 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::{
- collections::HashSet,
- hash::{Hash, Hasher},
- ops::Range,
- sync::{
- atomic::{AtomicU64, Ordering},
- Arc,
- },
- time,
- time::{SystemTime, UNIX_EPOCH},
-};
-
-use async_trait::async_trait;
-use bytes::Bytes;
-use chrono::{DateTime, TimeZone, Utc};
-use futures::{
- stream::{BoxStream, FuturesOrdered},
- StreamExt,
-};
-use generic_error::{BoxError, GenericError};
-use logger::debug;
-use snafu::{ensure, Backtrace, ResultExt, Snafu};
-use table_kv::{ScanContext, ScanIter, TableKv, WriteBatch, WriteContext};
-use tokio::{
- io::{AsyncWrite, AsyncWriteExt},
- time::Instant,
-};
-use twox_hash::XxHash64;
-use upstream::{
- path::{Path, DELIMITER},
- Error as StoreError, GetResult, ListResult, MultipartId, ObjectMeta,
ObjectStore, Result,
-};
-use uuid::Uuid;
-
-use crate::{
- multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart},
- obkv::meta::{MetaManager, ObkvObjectMeta, OBJECT_STORE_META},
-};
-
-mod meta;
-mod util;
-
-/// The object store type of obkv
-pub const OBKV: &str = "OBKV";
-
-/// Hash seed to build hasher. Modify the seed will result in different route
-/// result!
-const HASH_SEED: u64 = 0;
-
-#[derive(Debug, Snafu)]
-pub enum Error {
- #[snafu(display("Failed to scan data, namespace:{namespace},
err:{source}"))]
- ScanData {
- namespace: String,
- source: GenericError,
- },
-
- #[snafu(display("Failed to put data, path:{path}, err:{source}"))]
- PutData { path: String, source: GenericError },
-
- #[snafu(display("Failed to create shard table, table_name:{table_name},
err:{source}"))]
- CreateShardTable {
- table_name: String,
- source: GenericError,
- },
-
- #[snafu(display("Failed to read meta, path:{path}, err:{source}"))]
- ReadMeta { path: String, source: GenericError },
-
- #[snafu(display("Data part is not found in part_key:{part_key}.
\nBacktrace:\n{backtrace}"))]
- DataPartNotFound {
- part_key: String,
- backtrace: Backtrace,
- },
-
- #[snafu(display("No meta found, path:{path}. \nBacktrace:\n{backtrace}"))]
- MetaNotExists { path: String, backtrace: Backtrace },
-
- #[snafu(display(
- "Data is too large to put, size:{size}, limit:{limit}.
\nBacktrace:\n{backtrace}"
- ))]
- TooLargeData {
- size: usize,
- limit: usize,
- backtrace: Backtrace,
- },
-
- #[snafu(display(
- "Convert timestamp to date time fail, timestamp:{timestamp}.
\nBacktrace:\n{backtrace}"
- ))]
- ConvertTimestamp {
- timestamp: i64,
- backtrace: Backtrace,
- },
-
- #[snafu(display(
- "The length of data parts is inconsistent with the length of values,
parts length:{part_len}, values length:{value_len} \nBacktrace:\n{backtrace}"
- ))]
- DataPartsLength {
- part_len: usize,
- value_len: usize,
- backtrace: Backtrace,
- },
-}
-
-impl<T: TableKv> MetaManager<T> {
- fn try_new(client: Arc<T>) -> std::result::Result<Self, Error> {
- create_table_if_not_exists(&client, OBJECT_STORE_META)?;
- Ok(Self { client })
- }
-}
-
-/// If table not exists, create shard table; Else, do nothing.
-fn create_table_if_not_exists<T: TableKv>(
- table_kv: &Arc<T>,
- table_name: &str,
-) -> std::result::Result<(), Error> {
- let table_exists = table_kv
- .table_exists(table_name)
- .box_err()
- .context(CreateShardTable { table_name })?;
- if !table_exists {
- table_kv
- .create_table(table_name)
- .box_err()
- .context(CreateShardTable { table_name })?;
- }
-
- Ok(())
-}
-
-#[derive(Debug, Clone)]
-pub struct ShardManager {
- shard_num: usize,
- table_names: Vec<String>,
-}
-
-impl ShardManager {
- fn try_new<T: TableKv>(client: Arc<T>, shard_num: usize) ->
std::result::Result<Self, Error> {
- let mut table_names = Vec::with_capacity(shard_num);
-
- for shard_id in 0..shard_num {
- let table_name = format!("object_store_{shard_id}");
- create_table_if_not_exists(&client, &table_name)?;
- table_names.push(table_name);
- }
-
- Ok(Self {
- shard_num,
- table_names,
- })
- }
-
- #[inline]
- pub fn pick_shard_table(&self, path: &Path) -> &str {
- let mut hasher = XxHash64::with_seed(HASH_SEED);
- path.as_ref().as_bytes().hash(&mut hasher);
- let hash = hasher.finish();
- let index = hash % (self.table_names.len() as u64);
- &self.table_names[index as usize]
- }
-}
-
-impl std::fmt::Display for ShardManager {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "ObjectStore ObkvShardManager({})", self.shard_num)?;
- Ok(())
- }
-}
-
-#[derive(Debug)]
-pub struct ObkvObjectStore<T> {
- /// The manager to manage shard table in obkv
- shard_manager: ShardManager,
- /// The manager to manage object store meta, which persist in obkv
- meta_manager: Arc<MetaManager<T>>,
- client: Arc<T>,
- /// The size of one object part persited in obkv
- /// It may cause problem to save huge data in one obkv value, so we
- /// need to split data into small parts.
- part_size: usize,
- /// The max size of bytes, default is 1GB
- max_object_size: usize,
- /// Maximum number of upload tasks to run concurrently
- max_upload_concurrency: usize,
-}
-
-impl<T: TableKv> std::fmt::Display for ObkvObjectStore<T> {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(
- f,
- "ObkvObjectStore({:?},{:?})",
- self.client, self.shard_manager
- )?;
- Ok(())
- }
-}
-
-impl<T: TableKv> ObkvObjectStore<T> {
- pub fn try_new(
- client: Arc<T>,
- shard_num: usize,
- part_size: usize,
- max_object_size: usize,
- max_upload_concurrency: usize,
- ) -> Result<Self> {
- let shard_manager = ShardManager::try_new(client.clone(),
shard_num).map_err(|source| {
- StoreError::Generic {
- store: OBKV,
- source: Box::new(source),
- }
- })?;
- let meta_manager: MetaManager<T> =
- MetaManager::try_new(client.clone()).map_err(|source|
StoreError::Generic {
- store: OBKV,
- source: Box::new(source),
- })?;
- Ok(Self {
- shard_manager,
- meta_manager: Arc::new(meta_manager),
- client,
- part_size,
- max_object_size,
- max_upload_concurrency,
- })
- }
-
- #[inline]
- fn check_size(&self, bytes: &Bytes) -> std::result::Result<(), Error> {
- ensure!(
- bytes.len() < self.max_object_size,
- TooLargeData {
- size: bytes.len(),
- limit: self.max_object_size,
- }
- );
-
- Ok(())
- }
-
- #[inline]
- fn normalize_path(location: Option<&Path>) -> Path {
- if let Some(path) = location {
- if !path.as_ref().ends_with(DELIMITER) {
- return Path::from(format!("{}{DELIMITER}", path.as_ref()));
- }
- path.clone()
- } else {
- Path::from("")
- }
- }
-
- #[inline]
- pub fn pick_shard_table(&self, path: &Path) -> &str {
- self.shard_manager.pick_shard_table(path)
- }
-}
-
-impl<T: TableKv> ObkvObjectStore<T> {
- async fn read_meta(&self, location: &Path) ->
std::result::Result<ObkvObjectMeta, Error> {
- let meta = self
- .meta_manager
- .read(location)
- .await
- .box_err()
- .context(ReadMeta {
- path: location.as_ref().to_string(),
- })?;
-
- if let Some(m) = meta {
- Ok(m)
- } else {
- MetaNotExists {
- path: location.as_ref().to_string(),
- }
- .fail()
- }
- }
-
- async fn get_internal(&self, location: &Path) ->
std::result::Result<GetResult, Error> {
- let meta = self.read_meta(location).await?;
- let table_name = self.pick_shard_table(location);
- // TODO: Let table_kv provide a api `get_batch` to avoid extra IO
operations.
- let mut futures = FuturesOrdered::new();
- for part_key in meta.parts {
- let client = self.client.clone();
- let table_name = table_name.to_string();
- let future = async move {
- match client.get(&table_name, part_key.as_bytes()) {
- Ok(res) => Ok(Bytes::from(res.unwrap())),
- Err(err) => Err(StoreError::Generic {
- store: OBKV,
- source: Box::new(err),
- }),
- }
- };
- futures.push_back(future);
- }
-
- let boxed = futures.boxed();
-
- Ok(GetResult::Stream(boxed))
- }
-
- fn convert_datetime(&self, timestamp: i64) ->
std::result::Result<DateTime<Utc>, Error> {
- let timestamp_millis_opt = Utc.timestamp_millis_opt(timestamp);
- if let Some(dt) = timestamp_millis_opt.single() {
- Ok(dt)
- } else {
- ConvertTimestamp { timestamp }.fail()
- }
- }
-}
-
-#[async_trait]
-impl<T: TableKv> ObjectStore for ObkvObjectStore<T> {
- async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
- let instant = Instant::now();
-
- self.check_size(&bytes)
- .map_err(|source| StoreError::Generic {
- store: OBKV,
- source: Box::new(source),
- })?;
-
- // Use `put_multipart` to implement `put`.
- let (_upload_id, mut multipart) = self.put_multipart(location).await?;
- multipart
- .write(&bytes)
- .await
- .map_err(|source| StoreError::Generic {
- store: OBKV,
- source: Box::new(source),
- })?;
- // Complete stage: flush buffer data to obkv, and save meta data
- multipart
- .shutdown()
- .await
- .map_err(|source| StoreError::Generic {
- store: OBKV,
- source: Box::new(source),
- })?;
- debug!(
- "ObkvObjectStore put operation, location:{location}, cost:{:?}",
- instant.elapsed()
- );
- Ok(())
- }
-
- async fn put_multipart(
- &self,
- location: &Path,
- ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
- let instant = Instant::now();
-
- let upload_id = Uuid::new_v4().to_string();
- let table_name = self.pick_shard_table(location);
-
- let upload = ObkvMultiPartUpload {
- location: location.clone(),
- upload_id: upload_id.clone(),
- table_name: table_name.to_string(),
- size: AtomicU64::new(0),
- client: Arc::clone(&self.client),
- part_size: self.part_size,
- meta_manager: self.meta_manager.clone(),
- };
- let multi_part_upload =
- CloudMultiPartUpload::new(upload, self.max_upload_concurrency,
self.part_size);
-
- debug!(
- "ObkvObjectStore put_multipart operation, location:{location},
table_name:{table_name}, cost:{:?}",
- instant.elapsed()
- );
- Ok((upload_id, Box::new(multi_part_upload)))
- }
-
- async fn abort_multipart(&self, location: &Path, multipart_id:
&MultipartId) -> Result<()> {
- let instant = Instant::now();
-
- let table_name = self.pick_shard_table(location);
-
- // Before aborting multipart, we need to delete all data parts and
meta info.
- // Here to delete data with path `location` and multipart_id.
- let scan_context: ScanContext = ScanContext {
- timeout: time::Duration::from_secs(meta::SCAN_TIMEOUT_SECS),
- batch_size: meta::SCAN_BATCH_SIZE,
- };
-
- let prefix = PathKeyEncoder::part_key_prefix(location, multipart_id);
- let scan_request = util::scan_request_with_prefix(prefix.as_bytes());
-
- let mut iter = self
- .client
- .scan(scan_context, table_name, scan_request)
- .map_err(|source| StoreError::Generic {
- store: OBKV,
- source: Box::new(source),
- })?;
-
- let mut keys = vec![];
- while iter.valid() {
- keys.push(iter.key().to_vec());
- iter.next().map_err(|source| StoreError::Generic {
- store: OBKV,
- source: Box::new(source),
- })?;
- }
-
- self.client
- .delete_batch(table_name, keys)
- .map_err(|source| StoreError::Generic {
- store: OBKV,
- source: Box::new(source),
- })?;
-
- // Here to delete meta with path `location` and multipart_id
- self.meta_manager
- .delete_with_version(location, multipart_id)
- .await
- .map_err(|source| StoreError::Generic {
- store: OBKV,
- source: Box::new(source),
- })?;
-
- debug!(
- "ObkvObjectStore abort_multipart operation, location:{location},
table_name:{table_name}, cost:{:?}",
- instant.elapsed()
- );
- Ok(())
- }
-
- async fn get(&self, location: &Path) -> Result<GetResult> {
- let instant = Instant::now();
- let result = self.get_internal(location).await;
-
- debug!(
- "ObkvObjectStore get operation, location:{location}, cost:{:?}",
- instant.elapsed()
- );
- result.box_err().map_err(|source| StoreError::NotFound {
- path: location.to_string(),
- source,
- })
- }
-
- async fn get_range(&self, location: &Path, range: Range<usize>) ->
Result<Bytes> {
- let instant = Instant::now();
-
- let table_name = self.pick_shard_table(location);
- let meta =
- self.read_meta(location)
- .await
- .box_err()
- .map_err(|source| StoreError::NotFound {
- path: location.to_string(),
- source,
- })?;
-
- let covered_parts = meta
- .compute_covered_parts(range.clone())
- .box_err()
- .map_err(|source| StoreError::NotFound {
- path: location.to_string(),
- source,
- })?;
-
- if let Some(covered_parts) = covered_parts {
- let mut range_buffer = Vec::with_capacity(range.end - range.start);
- let keys: Vec<&[u8]> = covered_parts
- .part_keys
- .iter()
- .map(|key| key.as_bytes())
- .collect();
- let values =
- self.client
- .get_batch(table_name, keys)
- .map_err(|source| StoreError::NotFound {
- path: location.to_string(),
- source: Box::new(source),
- })?;
-
- if covered_parts.part_keys.len() != values.len() {
- DataPartsLength {
- part_len: covered_parts.part_keys.len(),
- value_len: values.len(),
- }
- .fail()
- .map_err(|source| StoreError::Generic {
- store: OBKV,
- source: Box::new(source),
- })?
- }
-
- for (index, (key, value)) in
covered_parts.part_keys.iter().zip(values).enumerate() {
- if let Some(bytes) = value {
- let mut begin = 0;
- let mut end = bytes.len() - 1;
- if index == 0 {
- begin = covered_parts.start_offset;
- }
- // the last one
- if index == covered_parts.part_keys.len() - 1 {
- end = covered_parts.end_offset;
- }
- range_buffer.extend_from_slice(&bytes[begin..=end]);
- } else {
- DataPartNotFound { part_key: key }
- .fail()
- .map_err(|source| StoreError::NotFound {
- path: location.to_string(),
- source: Box::new(source),
- })?;
- }
- }
-
- debug!("ObkvObjectStore get_range operation, location:{location},
table:{table_name}, cost:{:?}", instant.elapsed());
-
- return Ok(range_buffer.into());
- } else {
- return Ok(Bytes::new());
- }
- }
-
- /// Return the metadata for the specified location
- async fn head(&self, location: &Path) -> Result<ObjectMeta> {
- let instant = Instant::now();
-
- let meta =
- self.read_meta(location)
- .await
- .box_err()
- .map_err(|source| StoreError::NotFound {
- path: location.to_string(),
- source,
- })?;
-
- debug!(
- "ObkvObjectStore head operation, location:{location}, cost:{:?}",
- instant.elapsed()
- );
-
- let last_modified = self
- .convert_datetime(meta.last_modified)
- .box_err()
- .map_err(|source| StoreError::NotFound {
- path: location.to_string(),
- source,
- })?;
-
- Ok(ObjectMeta {
- location: (*location).clone(),
- last_modified,
- size: meta.size,
- })
- }
-
- /// Delete the object at the specified location.
- async fn delete(&self, location: &Path) -> Result<()> {
- let instant = Instant::now();
-
- // TODO: maybe coerruption here, should not delete data when data is
reading.
- let table_name = self.pick_shard_table(location);
- let meta =
- self.read_meta(location)
- .await
- .box_err()
- .map_err(|source| StoreError::NotFound {
- path: location.to_string(),
- source,
- })?;
-
- // delete every part of data
- for part in &meta.parts {
- let key = part.as_bytes();
- self.client
- .delete(table_name, key)
- .map_err(|source| StoreError::Generic {
- store: OBKV,
- source: Box::new(source),
- })?;
- }
- // delete meta info
- self.meta_manager
- .delete(meta, location)
- .await
- .map_err(|source| StoreError::Generic {
- store: OBKV,
- source: Box::new(source),
- })?;
-
- debug!(
- "ObkvObjectStore delete operation, location:{location},
table:{table_name}, cost:{:?}",
- instant.elapsed()
- );
-
- Ok(())
- }
-
- /// List all the objects with the given prefix.
- /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a
- /// prefix of `foo/bar/x` but not of `foo/bar_baz/x`.
- /// TODO: Currently this method may return lots of object meta, we should
- /// limit the count of return ojects in future. Maybe a better
- /// implementation is to fetch and send the list results in a stream way.
- async fn list(&self, prefix: Option<&Path>) -> Result<BoxStream<'_,
Result<ObjectMeta>>> {
- let instant = Instant::now();
-
- let path = Self::normalize_path(prefix);
- let raw_metas =
- self.meta_manager
- .list(&path)
- .await
- .map_err(|source| StoreError::Generic {
- store: OBKV,
- source: Box::new(source),
- })?;
-
- let mut meta_list = Vec::new();
- for meta in raw_metas {
- meta_list.push(Ok(ObjectMeta {
- location: Path::from(meta.location),
- last_modified:
Utc.timestamp_millis_opt(meta.last_modified).unwrap(),
- size: meta.size,
- }));
- }
-
- let iter = futures::stream::iter(meta_list);
- debug!(
- "ObkvObjectStore list operation, prefix:{path}, cost:{:?}",
- instant.elapsed()
- );
- Ok(iter.boxed())
- }
-
- /// List all the objects and common paths(directories) with the given
- /// prefix. Prefixes are evaluated on a path segment basis, i.e.
- /// `foo/bar/` is a prefix of `foo/bar/x` but not of `foo/bar_baz/x`.
- /// see detail in:
https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html
- async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
Result<ListResult> {
- let instant = Instant::now();
-
- let path = Self::normalize_path(prefix);
- let metas = self
- .meta_manager
- .list(&path)
- .await
- .map_err(|source| StoreError::Generic {
- store: OBKV,
- source: Box::new(source),
- })?;
-
- let mut common_prefixes = HashSet::new();
- let mut objects = Vec::new();
- for meta in metas {
- let location = meta.location;
- let subfix = &location[path.as_ref().len()..];
- if let Some(pos) = subfix.find(DELIMITER) {
- // common_prefix endswith '/'
- let common_prefix = &subfix[0..pos + 1];
- common_prefixes.insert(Path::from(common_prefix));
- } else {
- objects.push(ObjectMeta {
- location: Path::from(location),
- last_modified:
Utc.timestamp_millis_opt(meta.last_modified).unwrap(),
- size: meta.size,
- })
- }
- }
-
- let common_prefixes = Vec::from_iter(common_prefixes);
- debug!(
- "ObkvObjectStore list_with_delimiter operation, prefix:{path},
cost:{:?}",
- instant.elapsed()
- );
- Ok(ListResult {
- common_prefixes,
- objects,
- })
- }
-
- async fn copy(&self, _from: &Path, _to: &Path) -> Result<()> {
- // TODO:
- Err(StoreError::NotImplemented)
- }
-
- async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()>
{
- // TODO:
- Err(StoreError::NotImplemented)
- }
-}
-
-struct ObkvMultiPartUpload<T> {
- /// The full path to the object.
- location: Path,
- /// The id of multi upload tasks, we use this id as object version.
- upload_id: String,
- /// The table name of obkv to save data.
- table_name: String,
- /// The client of object store.
- client: Arc<T>,
- /// The size of object.
- size: AtomicU64,
- /// The size in bytes of one part. Note: maybe the size of last part less
- /// than part_size.
- part_size: usize,
- /// The mananger to process meta info.
- meta_manager: Arc<MetaManager<T>>,
-}
-
-struct PathKeyEncoder;
-
-impl PathKeyEncoder {
- #[inline]
- fn part_key(location: &Path, upload_id: &str, part_idx: usize) -> String {
- format!("{location}@{upload_id}@{part_idx}")
- }
-
- #[inline]
- fn part_key_prefix(location: &Path, upload_id: &str) -> String {
- format!("{location}@{upload_id}@")
- }
-
- #[inline]
- fn unique_id(table: &str, location: &Path, upload_id: &str) -> String {
- format!("{table}@{location}@{upload_id}")
- }
-}
-
-#[async_trait]
-impl<T: TableKv> CloudMultiPartUploadImpl for ObkvMultiPartUpload<T> {
- async fn put_multipart_part(
- &self,
- buf: Vec<u8>,
- part_idx: usize,
- ) -> Result<UploadPart, std::io::Error> {
- let mut batch = T::WriteBatch::default();
- let part_key = PathKeyEncoder::part_key(&self.location,
&self.upload_id, part_idx);
- batch.insert(part_key.as_bytes(), buf.as_ref());
-
- self.client
- .write(WriteContext::default(), &self.table_name, batch)
- .map_err(|source| StoreError::Generic {
- store: OBKV,
- source: Box::new(source),
- })?;
- // Record size of object.
- self.size.fetch_add(buf.len() as u64, Ordering::Relaxed);
- Ok(UploadPart {
- content_id: part_key,
- })
- }
-
- async fn complete(&self, completed_parts: Vec<UploadPart>) -> Result<(),
std::io::Error> {
- // We should save meta info after finish save data.
- let mut paths = Vec::with_capacity(completed_parts.len());
- for upload_part in completed_parts {
- paths.push(upload_part.content_id);
- }
-
- let now = SystemTime::now();
- let since_epoch = now.duration_since(UNIX_EPOCH).expect("Time went
backwards");
- let last_modified = since_epoch.as_millis() as i64;
-
- let meta = ObkvObjectMeta {
- location: self.location.as_ref().to_string(),
- last_modified,
- size: self.size.load(Ordering::SeqCst) as usize,
- unique_id: Some(PathKeyEncoder::unique_id(
- &self.table_name,
- &self.location,
- &self.upload_id,
- )),
- part_size: self.part_size,
- parts: paths,
- version: self.upload_id.clone(),
- };
-
- // Save meta info to specify obkv table.
- // TODO: We should remove the previous object data when update object.
- self.meta_manager
- .save(meta)
- .await
- .map_err(|source| StoreError::Generic {
- store: OBKV,
- source: Box::new(source),
- })?;
- Ok(())
- }
-}
-
-#[cfg(test)]
-mod test {
- use std::sync::Arc;
-
- use bytes::Bytes;
- use futures::StreamExt;
- use rand::{thread_rng, Rng};
- use runtime::{Builder, Runtime};
- use table_kv::memory::MemoryImpl;
- use tokio::io::AsyncWriteExt;
- use upstream::{path::Path, ObjectStore};
-
- use crate::obkv::ObkvObjectStore;
-
- const TEST_PART_SIZE: usize = 1024;
-
- fn new_runtime() -> Arc<Runtime> {
- let runtime = Builder::default()
- .worker_threads(4)
- .enable_all()
- .build()
- .unwrap();
-
- Arc::new(runtime)
- }
-
- #[test]
- #[warn(unused_must_use)]
- fn test_with_memory_table_kv() {
- let runtime = new_runtime();
- runtime.block_on(async move {
- let random_str1 = generate_random_string(1000);
- let input1 = random_str1.as_bytes();
- let random_str2 = generate_random_string(1000);
- let input2 = random_str2.as_bytes();
-
- let oss = init_object_store();
-
- // write data in multi part
- let location = Path::from("test/data/1");
- write_data(oss.clone(), &location, input1, input2).await;
- test_list(oss.clone(), 1).await;
-
- let mut expect = vec![];
- expect.extend_from_slice(input1);
- expect.extend_from_slice(input2);
-
- test_simple_read(oss.clone(), &location, &expect).await;
-
- test_get_range(oss.clone(), &location, &expect).await;
-
- test_head(oss.clone(), &location).await;
-
- // test list multi path
- let location2 = Path::from("test/data/2");
- write_data(oss.clone(), &location2, input1, input2).await;
- test_list(oss.clone(), 2).await;
-
- // test delete by path
- oss.delete(&location).await.unwrap();
- test_list(oss.clone(), 1).await;
-
- // test abort multi part
- test_abort_upload(oss.clone(), input1, input2).await;
-
- // test put data
- test_put_data(oss.clone()).await;
- });
- }
-
- async fn test_abort_upload(
- oss: Arc<ObkvObjectStore<MemoryImpl>>,
- input1: &[u8],
- input2: &[u8],
- ) {
- let location3 = Path::from("test/data/3");
- let multipart_id = write_data(oss.clone(), &location3, input1,
input2).await;
- test_list(oss.clone(), 2).await;
- oss.abort_multipart(&location3, &multipart_id)
- .await
- .unwrap();
- test_list(oss.clone(), 1).await;
- }
-
- async fn test_list(oss: Arc<ObkvObjectStore<MemoryImpl>>, expect_len:
usize) {
- let prefix = Path::from("test/");
- let stream = oss.list(Some(&prefix)).await.unwrap();
- let meta_vec = stream
- .fold(Vec::new(), |mut acc, item| async {
- let object_meta = item.unwrap();
-
assert!(object_meta.location.as_ref().starts_with(prefix.as_ref()));
- acc.push(object_meta);
- acc
- })
- .await;
-
- assert_eq!(meta_vec.len(), expect_len);
- }
-
- async fn test_head(oss: Arc<ObkvObjectStore<MemoryImpl>>, location: &Path)
{
- let object_meta = oss.head(location).await.unwrap();
- assert_eq!(object_meta.location.as_ref(), location.as_ref());
- assert_eq!(object_meta.size, 2000);
- }
-
- async fn test_get_range(oss: Arc<ObkvObjectStore<MemoryImpl>>, location:
&Path, expect: &[u8]) {
- let get = oss
- .get_range(
- location,
- std::ops::Range {
- start: 100,
- end: 200,
- },
- )
- .await
- .unwrap();
- assert!(get.len() == 100);
- assert_eq!(expect[100..200], get);
-
- let bytes = oss
- .get_range(
- location,
- std::ops::Range {
- start: 500,
- end: 1500,
- },
- )
- .await
- .unwrap();
- assert!(bytes.len() == 1000);
- assert_eq!(expect[500..1500], bytes);
- }
-
- async fn test_simple_read(
- oss: Arc<ObkvObjectStore<MemoryImpl>>,
- location: &Path,
- expect: &[u8],
- ) {
- // read data
- let get = oss.get(location).await.unwrap();
- assert_eq!(expect, get.bytes().await.unwrap());
- }
-
- #[allow(clippy::unused_io_amount)]
- async fn write_data(
- oss: Arc<dyn ObjectStore>,
- location: &Path,
- input1: &[u8],
- input2: &[u8],
- ) -> String {
- let (multipart_id, mut async_writer) =
oss.put_multipart(location).await.unwrap();
-
- async_writer.write(input1).await.unwrap();
- async_writer.write(input2).await.unwrap();
- async_writer.shutdown().await.unwrap();
- multipart_id
- }
-
- fn init_object_store() -> Arc<ObkvObjectStore<MemoryImpl>> {
- let table_kv = Arc::new(MemoryImpl::default());
- let obkv_object =
- ObkvObjectStore::try_new(table_kv, 128, TEST_PART_SIZE, 1024 *
1024 * 1024, 8).unwrap();
- Arc::new(obkv_object)
- }
-
- fn generate_random_string(length: usize) -> String {
- let mut rng = thread_rng();
- let chars: Vec<char> =
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
- .chars()
- .collect();
- (0..length)
- .map(|_| rng.gen::<char>())
- .map(|c| chars[(c as usize) % chars.len()])
- .collect()
- }
-
- async fn test_put_data(oss: Arc<ObkvObjectStore<MemoryImpl>>) {
- let length_vec = vec![
- TEST_PART_SIZE - 10,
- TEST_PART_SIZE,
- 2 * TEST_PART_SIZE,
- 4 * TEST_PART_SIZE,
- 4 * TEST_PART_SIZE + 10,
- ];
- for length in length_vec {
- let location = Path::from("test/data/4");
- let rand_str = generate_random_string(length);
- let buffer = Bytes::from(rand_str);
- oss.put(&location, buffer.clone()).await.unwrap();
- let meta = oss.head(&location).await.unwrap();
- assert_eq!(meta.location, location);
- assert_eq!(meta.size, length);
- let body = oss.get(&location).await.unwrap();
- assert_eq!(buffer, body.bytes().await.unwrap());
- let inner_meta = oss.meta_manager.read(&location).await.unwrap();
- assert!(inner_meta.is_some());
- if let Some(m) = inner_meta {
- assert_eq!(m.location, location.as_ref());
- assert_eq!(m.part_size, oss.part_size);
- let expect_size =
- length / TEST_PART_SIZE + if length % TEST_PART_SIZE != 0
{ 1 } else { 0 };
- assert_eq!(m.parts.len(), expect_size);
- }
- oss.delete(&location).await.unwrap();
- }
- }
-}
diff --git a/src/components/object_store/src/obkv/util.rs
b/src/components/object_store/src/obkv/util.rs
deleted file mode 100644
index 640198c4..00000000
--- a/src/components/object_store/src/obkv/util.rs
+++ /dev/null
@@ -1,122 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use table_kv::{KeyBoundary, ScanRequest};
-
-/// Generate ScanRequest with prefix
-pub fn scan_request_with_prefix(prefix_bytes: &[u8]) -> ScanRequest {
- let mut start_key = Vec::with_capacity(prefix_bytes.len());
- start_key.extend(prefix_bytes);
- let start = KeyBoundary::included(start_key.as_ref());
-
- let mut end_key = Vec::with_capacity(prefix_bytes.len());
- end_key.extend(prefix_bytes);
- let carry = inc_by_one(&mut end_key);
- // Check add one operation overflow.
- let end = if carry == 1 {
- KeyBoundary::MaxIncluded
- } else {
- KeyBoundary::excluded(end_key.as_ref())
- };
- table_kv::ScanRequest {
- start,
- end,
- reverse: false,
- }
-}
-
-/// Increment one to the byte array, and return the carry.
-fn inc_by_one(nums: &mut [u8]) -> u8 {
- let mut carry = 1;
- for i in (0..nums.len()).rev() {
- let sum = nums[i].wrapping_add(carry);
- nums[i] = sum;
- if sum == 0 {
- carry = 1;
- } else {
- carry = 0;
- break;
- }
- }
- carry
-}
-
-#[cfg(test)]
-mod test {
-
- use crate::obkv::util::{inc_by_one, scan_request_with_prefix};
-
- #[test]
- fn test_add_one() {
- let mut case0 = vec![0xff_u8, 0xff, 0xff];
- let case0_expect = vec![0x00, 0x00, 0x00];
- assert_eq!(1, inc_by_one(&mut case0));
- assert_eq!(case0, case0_expect);
-
- let mut case1 = vec![0x00_u8, 0xff, 0xff];
- let case1_expect = vec![0x01, 0x00, 0x00];
- assert_eq!(0, inc_by_one(&mut case1));
- assert_eq!(case1, case1_expect);
-
- let mut case2 = vec![0x00_u8, 0x00, 0x00];
- let case2_expect = vec![0x00, 0x00, 0x01];
- assert_eq!(0, inc_by_one(&mut case2));
- assert_eq!(case2, case2_expect);
- }
-
- #[test]
- fn test_scan_request_with_prefix() {
- let case0 = vec![0xff_u8, 0xff, 0xff];
- let case0_expect = table_kv::ScanRequest {
- start: table_kv::KeyBoundary::included(&case0),
- end: table_kv::KeyBoundary::MaxIncluded,
- reverse: false,
- };
- let case0_actual = scan_request_with_prefix(&case0);
- assert_eq!(case0_expect, case0_actual);
-
- let case1 = "abc".as_bytes();
- let case1_expect_bytes = "abd".as_bytes();
- let case1_expect = table_kv::ScanRequest {
- start: table_kv::KeyBoundary::included(case1),
- end: table_kv::KeyBoundary::excluded(case1_expect_bytes),
- reverse: false,
- };
- let case1_actual = scan_request_with_prefix(case1);
- assert_eq!(case1_expect, case1_actual);
-
- let case2 = vec![0x00_u8, 0x00, 0x00];
- let case2_expect_bytes = vec![0x00_u8, 0x00, 0x01];
- let case2_expect = table_kv::ScanRequest {
- start: table_kv::KeyBoundary::included(&case2),
- end: table_kv::KeyBoundary::excluded(&case2_expect_bytes),
- reverse: false,
- };
- let case2_actual = scan_request_with_prefix(&case2);
- assert_eq!(case2_expect, case2_actual);
-
- let case3 = vec![0x00_u8, 0x00, 0xff];
- let case3_expect_bytes = vec![0x00_u8, 0x01, 0x00];
- let case3_expect = table_kv::ScanRequest {
- start: table_kv::KeyBoundary::included(&case3),
- end: table_kv::KeyBoundary::excluded(&case3_expect_bytes),
- reverse: false,
- };
- let case3_actual = scan_request_with_prefix(&case3);
- assert_eq!(case3_expect, case3_actual);
- }
-}
diff --git a/src/components/object_store/src/prefix.rs
b/src/components/object_store/src/prefix.rs
index 42771c32..187b10a8 100644
--- a/src/components/object_store/src/prefix.rs
+++ b/src/components/object_store/src/prefix.rs
@@ -20,10 +20,10 @@ use std::{fmt::Display, ops::Range};
use async_trait::async_trait;
use bytes::Bytes;
use futures::{stream::BoxStream, StreamExt};
-use tokio::io::AsyncWrite;
use upstream::{
path::{self, Path, DELIMITER},
- Error, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result,
+ Error, GetOptions, GetResult, GetResultPayload, ListResult,
MultipartUpload, ObjectMeta,
+ ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result,
};
use crate::ObjectStoreRef;
@@ -96,28 +96,55 @@ impl StoreWithPrefix {
#[async_trait]
impl ObjectStore for StoreWithPrefix {
- async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+ async fn put(&self, location: &Path, payload: PutPayload) ->
Result<PutResult> {
let new_loc = self.add_prefix_to_loc(location);
- self.store.put(&new_loc, bytes).await
+ self.store.put(&new_loc, payload).await
}
- async fn put_multipart(
+ async fn put_opts(
&self,
location: &Path,
- ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+ payload: PutPayload,
+ opts: PutOptions,
+ ) -> Result<PutResult> {
+ let new_loc = self.add_prefix_to_loc(location);
+ self.store.put_opts(&new_loc, payload, opts).await
+ }
+
+ async fn put_multipart(&self, location: &Path) -> Result<Box<dyn
MultipartUpload>> {
let new_loc = self.add_prefix_to_loc(location);
self.store.put_multipart(&new_loc).await
}
- async fn abort_multipart(&self, location: &Path, multipart_id:
&MultipartId) -> Result<()> {
+ async fn put_multipart_opts(
+ &self,
+ location: &Path,
+ opts: PutMultipartOpts,
+ ) -> Result<Box<dyn MultipartUpload>> {
let new_loc = self.add_prefix_to_loc(location);
- self.store.abort_multipart(&new_loc, multipart_id).await
+ self.store.put_multipart_opts(&new_loc, opts).await
}
async fn get(&self, location: &Path) -> Result<GetResult> {
let new_loc = self.add_prefix_to_loc(location);
let res = self.store.get(&new_loc).await?;
- if let GetResult::File(_, _) = &res {
+ if let GetResultPayload::File(_, _) = &res.payload {
+ let err = ErrorWithMsg {
+ msg: "StoreWithPrefix doesn't support object store based on
local file system"
+ .to_string(),
+ };
+ return Err(Error::NotSupported {
+ source: Box::new(err),
+ });
+ }
+
+ Ok(res)
+ }
+
+ async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
+ let new_loc = self.add_prefix_to_loc(location);
+ let res = self.store.get_opts(&new_loc, options).await?;
+ if let GetResultPayload::File(_, _) = &res.payload {
let err = ErrorWithMsg {
msg: "StoreWithPrefix doesn't support object store based on
local file system"
.to_string(),
@@ -154,12 +181,12 @@ impl ObjectStore for StoreWithPrefix {
self.store.delete(&new_loc).await
}
- async fn list(&self, prefix: Option<&Path>) -> Result<BoxStream<'_,
Result<ObjectMeta>>> {
+ fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>
{
let objects = if let Some(loc) = prefix {
let new_loc = self.add_prefix_to_loc(loc);
- self.store.list(Some(&new_loc)).await?
+ self.store.list(Some(&new_loc))
} else {
- self.store.list(Some(&self.prefix)).await?
+ self.store.list(Some(&self.prefix))
};
let new_objects = objects.map(|mut obj| {
@@ -169,7 +196,7 @@ impl ObjectStore for StoreWithPrefix {
obj
});
- Ok(new_objects.boxed())
+ new_objects.boxed()
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
Result<ListResult> {
@@ -209,7 +236,7 @@ mod tests {
use std::sync::Arc;
use chrono::{DateTime, Utc};
- use futures::stream;
+ use futures::{stream, stream::StreamExt};
use tempfile::tempdir;
use upstream::local::LocalFileSystem;
@@ -242,24 +269,29 @@ mod tests {
#[async_trait]
impl ObjectStore for MockObjectStore {
- async fn put(&self, location: &Path, _bytes: Bytes) -> Result<()> {
+ async fn put(&self, location: &Path, _payload: PutPayload) ->
Result<PutResult> {
self.prefix_checker.check(location);
- Ok(())
+ Ok(PutResult {
+ e_tag: None,
+ version: None,
+ })
}
- async fn put_multipart(
+ async fn put_opts(
&self,
_location: &Path,
- ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
- todo!()
+ _payload: PutPayload,
+ _opts: PutOptions,
+ ) -> Result<PutResult> {
+ Err(Error::NotImplemented)
}
- async fn abort_multipart(
+ async fn put_multipart_opts(
&self,
_location: &Path,
- _multipart_id: &MultipartId,
- ) -> Result<()> {
- todo!()
+ _opts: PutMultipartOpts,
+ ) -> Result<Box<dyn MultipartUpload>> {
+ Err(Error::NotImplemented)
}
async fn get(&self, location: &Path) -> Result<GetResult> {
@@ -267,6 +299,10 @@ mod tests {
Err(Error::NotImplemented)
}
+ async fn get_opts(&self, _location: &Path, _options: GetOptions) ->
Result<GetResult> {
+ Err(Error::NotImplemented)
+ }
+
async fn get_range(&self, location: &Path, _range: Range<usize>) ->
Result<Bytes> {
self.prefix_checker.check(location);
Ok(self.content.clone())
@@ -279,6 +315,8 @@ mod tests {
location: location.clone(),
last_modified: DateTime::<Utc>::default(),
size: 0,
+ e_tag: None,
+ version: None,
})
}
@@ -288,7 +326,7 @@ mod tests {
Err(Error::NotImplemented)
}
- async fn list(&self, prefix: Option<&Path>) -> Result<BoxStream<'_,
Result<ObjectMeta>>> {
+ fn list(&self, prefix: Option<&Path>) -> BoxStream<'_,
Result<ObjectMeta>> {
if let Some(loc) = prefix {
self.prefix_checker.check(loc);
}
@@ -301,11 +339,13 @@ mod tests {
location: filepath,
last_modified: DateTime::<Utc>::default(),
size: 0,
+ e_tag: None,
+ version: None,
};
objects.push(Ok(object));
}
- Ok(stream::iter(objects).boxed())
+ stream::iter(objects).boxed()
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
Result<ListResult> {
@@ -346,7 +386,7 @@ mod tests {
// Ignore the result and let the `prefix_checker` in the
`MockObjectStore` to do
// the assertion.
let _ = prefix_store
- .put(&test_filepath, Bytes::from_static(b"1111"))
+ .put(&test_filepath, Bytes::from_static(b"1111").into())
.await;
let _ = prefix_store.get(&test_filepath).await;
@@ -360,8 +400,6 @@ mod tests {
for meta in prefix_store
.list(Some(&test_filepath))
- .await
- .unwrap()
.collect::<Vec<_>>()
.await
{
diff --git a/src/components/object_store/src/test_util.rs
b/src/components/object_store/src/test_util.rs
index 68631991..ca643e4e 100644
--- a/src/components/object_store/src/test_util.rs
+++ b/src/components/object_store/src/test_util.rs
@@ -20,8 +20,10 @@ use std::{collections::HashMap, fmt::Display, ops::Range,
sync::RwLock};
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::{self, BoxStream};
-use tokio::io::AsyncWrite;
-use upstream::{path::Path, GetResult, ListResult, MultipartId, ObjectMeta,
ObjectStore, Result};
+use upstream::{
+ path::Path, GetOptions, GetResult, GetResultPayload, ListResult,
MultipartUpload, ObjectMeta,
+ ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result,
+};
#[derive(Debug)]
struct StoreError {
@@ -64,19 +66,33 @@ impl MemoryStore {
#[async_trait]
impl ObjectStore for MemoryStore {
- async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+ async fn put(&self, location: &Path, payload: PutPayload) ->
Result<PutResult> {
let mut files = self.files.write().unwrap();
- files.insert(location.clone(), bytes);
- Ok(())
+ files.insert(location.clone(), Bytes::from(payload));
+ Ok(PutResult {
+ e_tag: None,
+ version: None,
+ })
}
async fn get(&self, location: &Path) -> Result<GetResult> {
let files = self.files.read().unwrap();
if let Some(bs) = files.get(location) {
let bs = bs.clone();
- Ok(GetResult::Stream(Box::pin(stream::once(
- async move { Ok(bs) },
- ))))
+ let size = bs.len();
+ let payload = GetResultPayload::Stream(Box::pin(stream::once(async
move { Ok(bs) })));
+ Ok(GetResult {
+ payload,
+ meta: ObjectMeta {
+ location: location.clone(),
+ last_modified: Default::default(),
+ size,
+ e_tag: None,
+ version: None,
+ },
+ range: Default::default(),
+ attributes: Default::default(),
+ })
} else {
let source = Box::new(StoreError {
msg: "not found".to_string(),
@@ -120,7 +136,9 @@ impl ObjectStore for MemoryStore {
Ok(ObjectMeta {
location: location.clone(),
size: bs.len(),
+ e_tag: None,
last_modified: Default::default(),
+ version: None,
})
} else {
let source = Box::new(StoreError {
@@ -134,14 +152,7 @@ impl ObjectStore for MemoryStore {
}
}
- async fn put_multipart(
- &self,
- _location: &Path,
- ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
- unimplemented!()
- }
-
- async fn abort_multipart(&self, _location: &Path, _multipart_id:
&MultipartId) -> Result<()> {
+ async fn put_multipart(&self, _location: &Path) -> Result<Box<dyn
MultipartUpload>> {
unimplemented!()
}
@@ -149,7 +160,7 @@ impl ObjectStore for MemoryStore {
unimplemented!()
}
- async fn list(&self, _prefix: Option<&Path>) -> Result<BoxStream<'_,
Result<ObjectMeta>>> {
+ fn list(&self, _prefix: Option<&Path>) -> BoxStream<'_,
Result<ObjectMeta>> {
unimplemented!()
}
@@ -172,4 +183,25 @@ impl ObjectStore for MemoryStore {
async fn rename_if_not_exists(&self, _from: &Path, _to: &Path) ->
Result<()> {
unimplemented!()
}
+
+ async fn put_opts(
+ &self,
+ _location: &Path,
+ _payload: PutPayload,
+ _opts: PutOptions,
+ ) -> Result<PutResult> {
+ unimplemented!()
+ }
+
+ async fn put_multipart_opts(
+ &self,
+ _location: &Path,
+ _opts: PutMultipartOpts,
+ ) -> Result<Box<dyn MultipartUpload>> {
+ unimplemented!()
+ }
+
+ async fn get_opts(&self, _location: &Path, _options: GetOptions) ->
Result<GetResult> {
+ unimplemented!()
+ }
}
diff --git a/src/tools/src/bin/sst-metadata.rs
b/src/tools/src/bin/sst-metadata.rs
index 4199ba8d..bf659960 100644
--- a/src/tools/src/bin/sst-metadata.rs
+++ b/src/tools/src/bin/sst-metadata.rs
@@ -145,7 +145,7 @@ async fn run(args: Args) -> Result<()> {
let storage: ObjectStoreRef = Arc::new(storage);
let mut join_set = JoinSet::new();
- let mut ssts = storage.list(None).await?;
+ let mut ssts = storage.list(None);
let verbose = args.verbose;
let page_indexes = args.page_indexes;
while let Some(object_meta) = ssts.next().await {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]