This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new 9321505f feat: use opendal to access underlying storage (#1557)
9321505f is described below
commit 9321505f7adca61abef8b0c4b3529e8e9fba097e
Author: 鲍金日 <[email protected]>
AuthorDate: Tue Aug 27 11:59:28 2024 +0800
feat: use opendal to access underlying storage (#1557)
## Rationale
Use opendal to access the object store, thus unifying the access method
of the underlying storage.
## Detailed Changes
- use opendal to access s3/oss/local file
## Test Plan
- Existed tests
---
Cargo.lock | 265 +++++++++++++++------
Cargo.toml | 13 +-
Makefile | 4 +-
src/analytic_engine/src/manifest/details.rs | 6 +-
src/analytic_engine/src/setup.rs | 31 ++-
src/analytic_engine/src/sst/meta_data/cache.rs | 6 +-
src/analytic_engine/src/sst/parquet/writer.rs | 16 +-
src/analytic_engine/src/tests/util.rs | 8 +
src/benchmarks/src/merge_memtable_bench.rs | 5 +-
src/benchmarks/src/merge_sst_bench.rs | 5 +-
src/benchmarks/src/parquet_bench.rs | 4 +-
src/benchmarks/src/scan_memtable_bench.rs | 4 +-
src/benchmarks/src/sst_bench.rs | 4 +-
src/benchmarks/src/sst_tools.rs | 9 +-
src/benchmarks/src/util.rs | 6 +
src/components/object_store/Cargo.toml | 9 +-
src/components/object_store/src/aliyun.rs | 60 ++---
src/components/object_store/src/config.rs | 47 +++-
src/components/object_store/src/disk_cache.rs | 27 +--
src/components/object_store/src/lib.rs | 13 +-
.../object_store/src/{lib.rs => local_file.rs} | 45 ++--
src/components/object_store/src/mem_cache.rs | 6 +-
src/components/object_store/src/multi_part.rs | 15 +-
src/components/object_store/src/prefix.rs | 6 +-
src/components/object_store/src/s3.rs | 56 +++--
src/tools/src/bin/sst-convert.rs | 9 +-
src/tools/src/bin/sst-metadata.rs | 9 +-
27 files changed, 441 insertions(+), 247 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 66815f7a..0a2d3a0f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -876,6 +876,18 @@ dependencies = [
"tower-service",
]
+[[package]]
+name = "backon"
+version = "0.4.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d67782c3f868daa71d3533538e98a8e13713231969def7536e8039606fc46bf0"
+dependencies = [
+ "fastrand 2.1.0",
+ "futures-core",
+ "pin-project",
+ "tokio",
+]
+
[[package]]
name = "backtrace"
version = "0.3.67"
@@ -1208,9 +1220,9 @@ checksum =
"14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "bytes"
-version = "1.5.0"
+version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223"
+checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50"
[[package]]
name = "bytes_ext"
@@ -1713,9 +1725,9 @@ checksum =
"9cace84e55f07e7301bae1c519df89cdad8cc3cd868413d3fdbdeca9ff3db484"
[[package]]
name = "crc32c"
-version = "0.6.3"
+version = "0.6.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3dfea2db42e9927a3845fb268a10a72faed6d416065f77873f05e411457c363e"
+checksum = "3a47af21622d091a8f0fb295b88bc886ac74efcc613efc19f5d0b21de5c89e47"
dependencies = [
"rustc_version",
]
@@ -2381,6 +2393,7 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
dependencies = [
"block-buffer",
+ "const-oid",
"crypto-common",
"subtle",
]
@@ -2406,6 +2419,15 @@ dependencies = [
"winapi",
]
+[[package]]
+name = "dlv-list"
+version = "0.5.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f"
+dependencies = [
+ "const-random",
+]
+
[[package]]
name = "doc-comment"
version = "0.3.3"
@@ -2548,6 +2570,12 @@ dependencies = [
"instant",
]
+[[package]]
+name = "fastrand"
+version = "2.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a"
+
[[package]]
name = "filedescriptor"
version = "0.8.2"
@@ -2577,6 +2605,12 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
+[[package]]
+name = "flagset"
+version = "0.4.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b3ea1ec5f8307826a5b71094dd91fc04d4ae75d5709b20ad351c7fb4815c86ec"
+
[[package]]
name = "flatbuffers"
version = "23.1.21"
@@ -2777,7 +2811,7 @@ version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48"
dependencies = [
- "fastrand",
+ "fastrand 1.9.0",
"futures-core",
"futures-io",
"memchr",
@@ -2878,8 +2912,10 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "190092ea657667030ac6a35e305e62fc4dd69fd98ac98631e5d3a2b1575a12b5"
dependencies = [
"cfg-if 1.0.0",
+ "js-sys",
"libc",
"wasi 0.11.0+wasi-snapshot-preview1",
+ "wasm-bindgen",
]
[[package]]
@@ -3069,6 +3105,15 @@ dependencies = [
"digest",
]
+[[package]]
+name = "home"
+version = "0.5.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5"
+dependencies = [
+ "windows-sys 0.52.0",
+]
+
[[package]]
name = "horaectl"
version = "2.0.0"
@@ -3078,7 +3123,7 @@ dependencies = [
"clap",
"lazy_static",
"prettytable",
- "reqwest 0.11.24",
+ "reqwest 0.12.4",
"serde",
"shell-words",
"tokio",
@@ -3143,7 +3188,7 @@ dependencies = [
"async-trait",
"horaedb-client",
"local-ip-address",
- "reqwest 0.11.24",
+ "reqwest 0.12.4",
"serde",
"sqlness",
"tokio",
@@ -4111,7 +4156,7 @@ dependencies = [
"logger",
"macros",
"prost 0.11.8",
- "reqwest 0.11.24",
+ "reqwest 0.12.4",
"serde",
"serde_json",
"snafu 0.6.10",
@@ -4620,22 +4665,13 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbebfd32c213ba1907fa7a9c9138015a8de2b43e30c5aa45b18f7deb46786ad6"
dependencies = [
"async-trait",
- "base64 0.22.1",
"bytes",
"chrono",
"futures 0.3.28",
"humantime 2.1.0",
- "hyper 1.3.1",
"itertools 0.12.0",
- "md-5",
"parking_lot 0.12.1",
"percent-encoding",
- "quick-xml 0.31.0",
- "rand 0.8.5",
- "reqwest 0.12.4",
- "ring 0.17.7",
- "serde",
- "serde_json",
"snafu 0.7.4",
"tokio",
"tracing",
@@ -4662,11 +4698,14 @@ dependencies = [
"macros",
"notifier",
"object_store 0.10.1",
+ "object_store_opendal",
+ "opendal",
"partitioned_lock",
"prometheus 0.12.0",
"prometheus-static-metric",
"prost 0.11.8",
"rand 0.8.5",
+ "reqwest 0.12.4",
"runtime",
"serde",
"serde_json",
@@ -4680,6 +4719,23 @@ dependencies = [
"uuid",
]
+[[package]]
+name = "object_store_opendal"
+version = "0.46.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f7e5902fc99e9fb9e32c93f6a67dc5cc0772dc0fb348e2ef4ce258b03666d034"
+dependencies = [
+ "async-trait",
+ "bytes",
+ "flagset",
+ "futures 0.3.28",
+ "futures-util",
+ "object_store 0.10.1",
+ "opendal",
+ "pin-project",
+ "tokio",
+]
+
[[package]]
name = "obkv-table-client-rs"
version = "0.1.0"
@@ -4739,6 +4795,36 @@ version = "11.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
+[[package]]
+name = "opendal"
+version = "0.49.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "39d516adf7db912c38af382c3e92c27cd62fbbc240e630920555d784c2ab1494"
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "backon",
+ "base64 0.22.1",
+ "bytes",
+ "chrono",
+ "crc32c",
+ "flagset",
+ "futures 0.3.28",
+ "getrandom",
+ "http 1.1.0",
+ "log",
+ "md-5",
+ "once_cell",
+ "percent-encoding",
+ "quick-xml 0.36.1",
+ "reqsign",
+ "reqwest 0.12.4",
+ "serde",
+ "serde_json",
+ "tokio",
+ "uuid",
+]
+
[[package]]
name = "opensrv-mysql"
version = "0.1.0"
@@ -4753,12 +4839,6 @@ dependencies = [
"tokio",
]
-[[package]]
-name = "openssl-probe"
-version = "0.1.5"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
-
[[package]]
name = "ordered-float"
version = "2.10.0"
@@ -4768,6 +4848,16 @@ dependencies = [
"num-traits",
]
+[[package]]
+name = "ordered-multimap"
+version = "0.7.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "49203cdcae0030493bad186b28da2fa25645fa276a51b6fec8010d281e02ef79"
+dependencies = [
+ "dlv-list",
+ "hashbrown 0.14.0",
+]
+
[[package]]
name = "overload"
version = "0.1.1"
@@ -5101,22 +5191,22 @@ dependencies = [
[[package]]
name = "pin-project"
-version = "1.0.12"
+version = "1.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ad29a609b6bcd67fee905812e544992d216af9d755757c05ed2d0e15a74c6ecc"
+checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
-version = "1.0.12"
+version = "1.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55"
+checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965"
dependencies = [
"proc-macro2",
"quote",
- "syn 1.0.109",
+ "syn 2.0.48",
]
[[package]]
@@ -5768,9 +5858,19 @@ dependencies = [
[[package]]
name = "quick-xml"
-version = "0.31.0"
+version = "0.35.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33"
+checksum = "86e446ed58cef1bbfe847bc2fda0e2e4ea9f0e57b90c507d4781292590d72a4e"
+dependencies = [
+ "memchr",
+ "serde",
+]
+
+[[package]]
+name = "quick-xml"
+version = "0.36.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "96a05e2e8efddfa51a84ca47cec303fac86c8541b686d37cac5efc0e094417bc"
dependencies = [
"memchr",
"serde",
@@ -6015,6 +6115,35 @@ dependencies = [
"bytecheck",
]
+[[package]]
+name = "reqsign"
+version = "0.16.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "03dd4ba7c3901dd43e6b8c7446a760d45bc1ea4301002e1a6fa48f97c3a796fa"
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "base64 0.22.1",
+ "chrono",
+ "form_urlencoded",
+ "getrandom",
+ "hex",
+ "hmac",
+ "home",
+ "http 1.1.0",
+ "log",
+ "once_cell",
+ "percent-encoding",
+ "quick-xml 0.35.0",
+ "rand 0.8.5",
+ "reqwest 0.12.4",
+ "rust-ini",
+ "serde",
+ "serde_json",
+ "sha1",
+ "sha2",
+]
+
[[package]]
name = "reqwest"
version = "0.11.24"
@@ -6081,7 +6210,6 @@ dependencies = [
"percent-encoding",
"pin-project-lite",
"rustls 0.22.2",
- "rustls-native-certs",
"rustls-pemfile 2.1.2",
"rustls-pki-types",
"serde",
@@ -6097,6 +6225,7 @@ dependencies = [
"wasm-bindgen-futures",
"wasm-streams",
"web-sys",
+ "webpki-roots 0.26.3",
"winreg 0.52.0",
]
@@ -6241,6 +6370,17 @@ dependencies = [
"time 0.1.43",
]
+[[package]]
+name = "rust-ini"
+version = "0.21.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4e310ef0e1b6eeb79169a1171daf9abcb87a2e17c03bee2c4bb100b55c75409f"
+dependencies = [
+ "cfg-if 1.0.0",
+ "ordered-multimap",
+ "trim-in-place",
+]
+
[[package]]
name = "rust-sdk-test"
version = "2.0.0"
@@ -6346,19 +6486,6 @@ dependencies = [
"zeroize",
]
-[[package]]
-name = "rustls-native-certs"
-version = "0.7.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792"
-dependencies = [
- "openssl-probe",
- "rustls-pemfile 2.1.2",
- "rustls-pki-types",
- "schannel",
- "security-framework",
-]
-
[[package]]
name = "rustls-pemfile"
version = "0.2.1"
@@ -6464,15 +6591,6 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ece8e78b2f38ec51c51f5d475df0a7187ba5111b2a28bdc761ee05b075d40a71"
-[[package]]
-name = "schannel"
-version = "0.1.23"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fbc91545643bcf3a0bbb6569265615222618bdf33ce4ffbbd13c4bbd4c093534"
-dependencies = [
- "windows-sys 0.52.0",
-]
-
[[package]]
name = "scheduled-thread-pool"
version = "0.2.7"
@@ -6529,29 +6647,6 @@ version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b"
-[[package]]
-name = "security-framework"
-version = "2.10.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "770452e37cad93e0a50d5abc3990d2bc351c36d0328f86cefec2f2fb206eaef6"
-dependencies = [
- "bitflags 1.3.2",
- "core-foundation",
- "core-foundation-sys",
- "libc",
- "security-framework-sys",
-]
-
-[[package]]
-name = "security-framework-sys"
-version = "2.11.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "317936bbbd05227752583946b9e66d7ce3b489f84e11a94a510b4437fef407d7"
-dependencies = [
- "core-foundation-sys",
- "libc",
-]
-
[[package]]
name = "semver"
version = "1.0.17"
@@ -7311,7 +7406,7 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9fbec84f381d5795b08656e4912bec604d162bff9291d6189a78f4c8ab87998"
dependencies = [
"cfg-if 1.0.0",
- "fastrand",
+ "fastrand 1.9.0",
"redox_syscall 0.3.5",
"rustix",
"windows-sys 0.45.0",
@@ -7921,6 +8016,12 @@ dependencies = [
"tracing-subscriber",
]
+[[package]]
+name = "trim-in-place"
+version = "0.1.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "343e926fc669bc8cde4fa3129ab681c63671bae288b1f1081ceee6d9d37904fc"
+
[[package]]
name = "triomphe"
version = "0.1.8"
@@ -8063,6 +8164,7 @@ checksum =
"5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560"
dependencies = [
"getrandom",
"rand 0.8.5",
+ "serde",
"uuid-macro-internal",
]
@@ -8326,6 +8428,15 @@ version = "0.25.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1"
+[[package]]
+name = "webpki-roots"
+version = "0.26.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bd7c23921eeb1713a4e851530e9b9756e4fb0e89978582942612524cf09f01cd"
+dependencies = [
+ "rustls-pki-types",
+]
+
[[package]]
name = "which"
version = "4.4.0"
diff --git a/Cargo.toml b/Cargo.toml
index 4563acf5..c0e62ebf 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -160,9 +160,10 @@ query_frontend = { path = "src/query_frontend" }
rand = "0.8.5"
regex = "1"
remote_engine_client = { path = "src/remote_engine_client" }
-reqwest = { version = "0.11", default-features = false, features = [
+reqwest = { version = "0.12.4", default-features = false, features = [
"rustls-tls",
"json",
+ "http2",
] }
router = { path = "src/router" }
runtime = { path = "src/components/runtime" }
@@ -200,14 +201,14 @@ zstd = { version = "0.12", default-features = false }
# This profile optimizes for good runtime performance.
[profile.release]
# reference:
https://doc.rust-lang.org/rustc/codegen-options/index.html#codegen-units
-codegen-units = 1
-debug = true
+codegen-units = 1
+debug = true
overflow-checks = true
# This profile is used to produce a smaller (no symbols) binary with a little
bit poorer performance,
# but with a faster speed and low memory consumption required by compiling.
[profile.release-slim]
-inherits = "release"
+inherits = "release"
codegen-units = 16
-debug = false
-strip = true
+debug = false
+strip = true
diff --git a/Makefile b/Makefile
index 6569ebf0..4480eede 100644
--- a/Makefile
+++ b/Makefile
@@ -126,7 +126,7 @@ dev-setup:
echo "Installing dependencies using Homebrew..."
HOMEBREW_NO_AUTO_UPDATE=1 brew install git openssl protobuf cmake
pre-commit
cargo install cargo-udeps
- cargo install cargo-sort
+ cargo install --git https://github.com/DevinR528/cargo-sort --rev
55ec890 --locked
else ifeq ($(shell uname), Linux)
dev-setup:
echo "Detecting Linux system..."
@@ -137,7 +137,7 @@ dev-setup:
sudo apt-get update; \
sudo apt install -y git gcc g++ libssl-dev pkg-config
protobuf-compiler cmake pre-commit; \
cargo install cargo-udeps; \
- cargo install cargo-sort; \
+ cargo install --git https://github.com/DevinR528/cargo-sort
--rev 55ec890 --locked; \
else \
echo "Error: Unsupported Linux distribution. Exiting..."; \
exit 1; \
diff --git a/src/analytic_engine/src/manifest/details.rs
b/src/analytic_engine/src/manifest/details.rs
index 7df80a4f..e49c3d82 100644
--- a/src/analytic_engine/src/manifest/details.rs
+++ b/src/analytic_engine/src/manifest/details.rs
@@ -653,7 +653,7 @@ mod tests {
column_schema, datum::DatumKind, schema, schema::Schema,
table::DEFAULT_SHARD_ID,
};
use futures::future::BoxFuture;
- use object_store::LocalFileSystem;
+ use object_store::local_file;
use runtime::Runtime;
use table_engine::table::{SchemaId, TableId, TableSeqGenerator};
use wal::rocksdb_impl::manager::Builder as WalBuilder;
@@ -836,7 +836,9 @@ mod tests {
.build()
.unwrap();
- let object_store =
LocalFileSystem::new_with_prefix(&self.dir).unwrap();
+ let local_path = self.dir.to_string_lossy().to_string();
+ let object_store =
local_file::try_new_with_default(local_path).unwrap();
+
ManifestImpl::open(
self.options.clone(),
Arc::new(manifest_wal),
diff --git a/src/analytic_engine/src/setup.rs b/src/analytic_engine/src/setup.rs
index be0d9354..ee167729 100644
--- a/src/analytic_engine/src/setup.rs
+++ b/src/analytic_engine/src/setup.rs
@@ -25,10 +25,11 @@ use object_store::{
aliyun,
config::{ObjectStoreOptions, StorageOptions},
disk_cache::DiskCacheStore,
+ local_file,
mem_cache::{MemCache, MemCacheStore},
metrics::StoreWithMetrics,
prefix::StoreWithPrefix,
- s3, LocalFileSystem, ObjectStoreRef,
+ s3, ObjectStoreRef,
};
use snafu::{ResultExt, Snafu};
use table_engine::engine::{EngineRuntimes, TableEngineRef};
@@ -61,6 +62,9 @@ pub enum Error {
source: object_store::ObjectStoreError,
},
+ #[snafu(display("Failed to access object store by openDal , err:{}",
source))]
+ OpenDal { source: object_store::OpenDalError },
+
#[snafu(display("Failed to create dir for {}, err:{}", path, source))]
CreateDir {
path: String,
@@ -192,27 +196,32 @@ fn open_storage(
) -> Pin<Box<dyn Future<Output = Result<OpenedStorages>> + Send>> {
Box::pin(async move {
let mut store = match opts.object_store {
- ObjectStoreOptions::Local(local_opts) => {
+ ObjectStoreOptions::Local(mut local_opts) => {
let data_path = Path::new(&local_opts.data_dir);
- let sst_path = data_path.join(STORE_DIR_NAME);
+ let sst_path = data_path
+ .join(STORE_DIR_NAME)
+ .to_string_lossy()
+ .into_owned();
tokio::fs::create_dir_all(&sst_path)
.await
.context(CreateDir {
- path: sst_path.to_string_lossy().into_owned(),
+ path: sst_path.clone(),
})?;
- let store =
LocalFileSystem::new_with_prefix(sst_path).context(OpenObjectStore)?;
+ local_opts.data_dir = sst_path;
+
+ let store: ObjectStoreRef =
+
Arc::new(local_file::try_new(&local_opts).context(OpenDal)?);
Arc::new(store) as _
}
ObjectStoreOptions::Aliyun(aliyun_opts) => {
- let oss: ObjectStoreRef =
-
Arc::new(aliyun::try_new(&aliyun_opts).context(OpenObjectStore)?);
- let store_with_prefix =
StoreWithPrefix::new(aliyun_opts.prefix, oss);
+ let store: ObjectStoreRef =
+ Arc::new(aliyun::try_new(&aliyun_opts).context(OpenDal)?);
+ let store_with_prefix =
StoreWithPrefix::new(aliyun_opts.prefix, store);
Arc::new(store_with_prefix.context(OpenObjectStore)?) as _
}
ObjectStoreOptions::S3(s3_option) => {
- let oss: ObjectStoreRef =
-
Arc::new(s3::try_new(&s3_option).context(OpenObjectStore)?);
- let store_with_prefix = StoreWithPrefix::new(s3_option.prefix,
oss);
+ let store: ObjectStoreRef =
Arc::new(s3::try_new(&s3_option).context(OpenDal)?);
+ let store_with_prefix = StoreWithPrefix::new(s3_option.prefix,
store);
Arc::new(store_with_prefix.context(OpenObjectStore)?) as _
}
};
diff --git a/src/analytic_engine/src/sst/meta_data/cache.rs
b/src/analytic_engine/src/sst/meta_data/cache.rs
index 8ddaf487..d90e71b0 100644
--- a/src/analytic_engine/src/sst/meta_data/cache.rs
+++ b/src/analytic_engine/src/sst/meta_data/cache.rs
@@ -180,7 +180,7 @@ mod tests {
schema::Builder as CustomSchemaBuilder,
time::{TimeRange, Timestamp},
};
- use object_store::{LocalFileSystem, ObjectStoreRef};
+ use object_store::{local_file, ObjectStoreRef};
use parquet::{arrow::ArrowWriter, file::footer};
use parquet_ext::ParquetMetaData;
@@ -329,7 +329,9 @@ mod tests {
parquet_filter: None,
column_values: None,
};
- let store =
Arc::new(LocalFileSystem::new_with_prefix(temp_dir.path()).unwrap());
+
+ let local_path = temp_dir.as_ref().to_string_lossy().to_string();
+ let store =
Arc::new(local_file::try_new_with_default(local_path).unwrap());
write_parquet_file_with_metadata(
store.clone(),
parquet_file_path.as_path(),
diff --git a/src/analytic_engine/src/sst/parquet/writer.rs
b/src/analytic_engine/src/sst/parquet/writer.rs
index 5fe669c5..732753b7 100644
--- a/src/analytic_engine/src/sst/parquet/writer.rs
+++ b/src/analytic_engine/src/sst/parquet/writer.rs
@@ -28,7 +28,10 @@ use datafusion::parquet::basic::Compression;
use futures::StreamExt;
use generic_error::BoxError;
use logger::{debug, error};
-use object_store::{MultiUploadWriter, ObjectStore, ObjectStoreRef, Path,
WriteMultipartRef};
+use object_store::{
+ multi_part::{MultiUploadRef, MultiUploadWriter},
+ ObjectStore, ObjectStoreRef, Path,
+};
use snafu::{OptionExt, ResultExt};
use tokio::io::AsyncWrite;
@@ -417,7 +420,7 @@ async fn write_metadata(
Ok(buf_size)
}
-async fn multi_upload_abort(aborter: WriteMultipartRef) {
+async fn multi_upload_abort(aborter: MultiUploadRef) {
// The uploading file will be leaked if failed to abort. A repair command
// will be provided to clean up the leaked files.
if let Err(e) = aborter.lock().await.abort().await {
@@ -589,7 +592,7 @@ mod tests {
time::{TimeRange, Timestamp},
};
use futures::stream;
- use object_store::LocalFileSystem;
+ use object_store::local_file;
use runtime::{self, Runtime};
use table_engine::predicate::Predicate;
use tempfile::tempdir;
@@ -613,7 +616,7 @@ mod tests {
fn test_parquet_build_and_read() {
test_util::init_log_for_test();
- let runtime = Arc::new(runtime::Builder::default().build().unwrap());
+ let runtime =
Arc::new(runtime::Builder::default().enable_all().build().unwrap());
parquet_write_and_then_read_back(runtime.clone(), 2, vec![2, 2, 2, 2,
2, 2, 2, 2, 2, 2]);
parquet_write_and_then_read_back(runtime.clone(), 3, vec![3, 3, 3, 3,
3, 3, 2]);
parquet_write_and_then_read_back(runtime.clone(), 4, vec![4, 4, 4, 4,
4]);
@@ -635,9 +638,8 @@ mod tests {
column_stats: Default::default(),
};
- let dir = tempdir().unwrap();
- let root = dir.path();
- let store: ObjectStoreRef =
Arc::new(LocalFileSystem::new_with_prefix(root).unwrap());
+ let root =
tempdir().unwrap().as_ref().to_string_lossy().to_string();
+ let store: ObjectStoreRef =
Arc::new(local_file::try_new_with_default(root).unwrap());
let store_picker: ObjectStorePickerRef = Arc::new(store);
let sst_file_path = Path::from("data.par");
diff --git a/src/analytic_engine/src/tests/util.rs
b/src/analytic_engine/src/tests/util.rs
index 7eab3c1b..8fe07106 100644
--- a/src/analytic_engine/src/tests/util.rs
+++ b/src/analytic_engine/src/tests/util.rs
@@ -510,6 +510,8 @@ impl Builder {
disk_cache_partition_bits: 0,
object_store: ObjectStoreOptions::Local(LocalOptions {
data_dir: dir.path().to_str().unwrap().to_string(),
+ max_retries: 3,
+ timeout: Default::default(),
}),
},
wal: WalConfig {
@@ -588,6 +590,8 @@ impl Default for RocksDBEngineBuildContext {
disk_cache_partition_bits: 0,
object_store: ObjectStoreOptions::Local(LocalOptions {
data_dir: dir.path().to_str().unwrap().to_string(),
+ max_retries: 3,
+ timeout: Default::default(),
}),
},
wal: WalConfig {
@@ -621,6 +625,8 @@ impl Clone for RocksDBEngineBuildContext {
disk_cache_partition_bits: 0,
object_store: ObjectStoreOptions::Local(LocalOptions {
data_dir: dir.path().to_str().unwrap().to_string(),
+ max_retries: 3,
+ timeout: Default::default(),
}),
};
@@ -685,6 +691,8 @@ impl Default for MemoryEngineBuildContext {
disk_cache_partition_bits: 0,
object_store: ObjectStoreOptions::Local(LocalOptions {
data_dir: dir.path().to_str().unwrap().to_string(),
+ max_retries: 3,
+ timeout: Default::default(),
}),
},
wal: WalConfig {
diff --git a/src/benchmarks/src/merge_memtable_bench.rs
b/src/benchmarks/src/merge_memtable_bench.rs
index 7c9d9ba4..abf5f8f4 100644
--- a/src/benchmarks/src/merge_memtable_bench.rs
+++ b/src/benchmarks/src/merge_memtable_bench.rs
@@ -45,7 +45,7 @@ use common_types::{
projected_schema::ProjectedSchema, request_id::RequestId, schema::Schema,
time::TimeRange,
};
use logger::info;
-use object_store::{LocalFileSystem, ObjectStoreRef};
+use object_store::{local_file, ObjectStoreRef};
use runtime::Runtime;
use table_engine::{predicate::Predicate, table::TableId};
@@ -69,7 +69,8 @@ impl MergeMemTableBench {
pub fn new(config: MergeMemTableBenchConfig) -> Self {
assert!(!config.sst_file_ids.is_empty());
- let store =
Arc::new(LocalFileSystem::new_with_prefix(config.store_path).unwrap()) as _;
+ let store =
Arc::new(local_file::try_new_with_default(config.store_path).unwrap()) as _;
+
let runtime = Arc::new(util::new_runtime(config.runtime_thread_num));
let space_id = config.space_id;
let table_id = config.table_id;
diff --git a/src/benchmarks/src/merge_sst_bench.rs
b/src/benchmarks/src/merge_sst_bench.rs
index 0e7280f8..9a949438 100644
--- a/src/benchmarks/src/merge_sst_bench.rs
+++ b/src/benchmarks/src/merge_sst_bench.rs
@@ -38,7 +38,7 @@ use analytic_engine::{
};
use common_types::{projected_schema::ProjectedSchema, request_id::RequestId,
schema::Schema};
use logger::info;
-use object_store::{LocalFileSystem, ObjectStoreRef};
+use object_store::{local_file, ObjectStoreRef};
use runtime::Runtime;
use table_engine::{predicate::Predicate, table::TableId};
use tokio::sync::mpsc::{self, UnboundedReceiver};
@@ -65,7 +65,8 @@ impl MergeSstBench {
pub fn new(config: MergeSstBenchConfig) -> Self {
assert!(!config.sst_file_ids.is_empty());
- let store =
Arc::new(LocalFileSystem::new_with_prefix(config.store_path).unwrap()) as _;
+ let store =
Arc::new(local_file::try_new_with_default(config.store_path).unwrap()) as _;
+
let runtime = Arc::new(util::new_runtime(config.runtime_thread_num));
let space_id = config.space_id;
let table_id = config.table_id;
diff --git a/src/benchmarks/src/parquet_bench.rs
b/src/benchmarks/src/parquet_bench.rs
index 9ff18608..5bec32ba 100644
--- a/src/benchmarks/src/parquet_bench.rs
+++ b/src/benchmarks/src/parquet_bench.rs
@@ -23,7 +23,7 @@ use analytic_engine::sst::meta_data::cache::MetaCacheRef;
use common_types::schema::Schema;
use futures::StreamExt;
use logger::info;
-use object_store::{LocalFileSystem, ObjectStoreRef, Path};
+use object_store::{local_file, ObjectStoreRef, Path};
use parquet::arrow::{
arrow_reader::ParquetRecordBatchReaderBuilder,
ParquetRecordBatchStreamBuilder,
};
@@ -46,7 +46,7 @@ pub struct ParquetBench {
impl ParquetBench {
pub fn new(config: SstBenchConfig) -> Self {
- let store =
Arc::new(LocalFileSystem::new_with_prefix(&config.store_path).unwrap()) as _;
+ let store =
Arc::new(local_file::try_new_with_default(config.store_path).unwrap()) as _;
let runtime = util::new_runtime(config.runtime_thread_num);
diff --git a/src/benchmarks/src/scan_memtable_bench.rs
b/src/benchmarks/src/scan_memtable_bench.rs
index a51ceeb0..edd1ad32 100644
--- a/src/benchmarks/src/scan_memtable_bench.rs
+++ b/src/benchmarks/src/scan_memtable_bench.rs
@@ -33,7 +33,7 @@ use common_types::{
time::TimeRange,
};
use logger::info;
-use object_store::{LocalFileSystem, Path};
+use object_store::{local_file, Path};
use crate::{config::ScanMemTableBenchConfig, util};
@@ -45,7 +45,7 @@ pub struct ScanMemTableBench {
impl ScanMemTableBench {
pub fn new(config: ScanMemTableBenchConfig) -> Self {
- let store =
Arc::new(LocalFileSystem::new_with_prefix(config.store_path).unwrap()) as _;
+ let store =
Arc::new(local_file::try_new_with_default(config.store_path).unwrap()) as _;
let runtime = Arc::new(util::new_runtime(config.runtime_thread_num));
let meta_cache: Option<MetaCacheRef> = None;
diff --git a/src/benchmarks/src/sst_bench.rs b/src/benchmarks/src/sst_bench.rs
index 2577c0a1..29afdd7f 100644
--- a/src/benchmarks/src/sst_bench.rs
+++ b/src/benchmarks/src/sst_bench.rs
@@ -31,7 +31,7 @@ use common_types::{
schema::Schema,
};
use logger::info;
-use object_store::{LocalFileSystem, ObjectStoreRef, Path};
+use object_store::{local_file, ObjectStoreRef, Path};
use runtime::Runtime;
use crate::{config::SstBenchConfig, util};
@@ -50,7 +50,7 @@ impl SstBench {
pub fn new(config: SstBenchConfig) -> Self {
let runtime = Arc::new(util::new_runtime(config.runtime_thread_num));
- let store =
Arc::new(LocalFileSystem::new_with_prefix(config.store_path).unwrap()) as _;
+ let store =
Arc::new(local_file::try_new_with_default(config.store_path).unwrap()) as _;
let sst_path = Path::from(config.sst_file_name.clone());
let meta_cache: Option<MetaCacheRef> = config
.sst_meta_cache_cap
diff --git a/src/benchmarks/src/sst_tools.rs b/src/benchmarks/src/sst_tools.rs
index 664f89b0..4e274929 100644
--- a/src/benchmarks/src/sst_tools.rs
+++ b/src/benchmarks/src/sst_tools.rs
@@ -48,7 +48,7 @@ use common_types::{
};
use generic_error::BoxError;
use logger::info;
-use object_store::{LocalFileSystem, ObjectStoreRef, Path};
+use object_store::{local_file, ObjectStoreRef, Path};
use runtime::Runtime;
use serde::Deserialize;
use table_engine::{predicate::Predicate, table::TableId};
@@ -81,7 +81,7 @@ async fn create_sst_from_stream(config: SstConfig,
record_batch_stream: RecordBa
);
let store: ObjectStoreRef =
- Arc::new(LocalFileSystem::new_with_prefix(config.store_path).unwrap());
+ Arc::new(local_file::try_new_with_default(config.store_path).unwrap());
let store_picker: ObjectStorePickerRef = Arc::new(store);
let sst_file_path = Path::from(config.sst_file_name);
@@ -115,7 +115,7 @@ pub struct RebuildSstConfig {
pub async fn rebuild_sst(config: RebuildSstConfig, runtime: Arc<Runtime>) {
info!("Start rebuild sst, config:{:?}", config);
- let store =
Arc::new(LocalFileSystem::new_with_prefix(config.store_path.clone()).unwrap())
as _;
+ let store =
Arc::new(local_file::try_new_with_default(config.store_path.clone()).unwrap())
as _;
let input_path = Path::from(config.input_file_name);
let parquet_metadata = util::parquet_metadata(&store, &input_path).await;
@@ -210,7 +210,8 @@ pub async fn merge_sst(config: MergeSstConfig, runtime:
Arc<Runtime>) {
let space_id = config.space_id;
let table_id = config.table_id;
- let store =
Arc::new(LocalFileSystem::new_with_prefix(config.store_path.clone()).unwrap())
as _;
+ let store =
Arc::new(local_file::try_new_with_default(config.store_path).unwrap()) as _;
+
let (tx, _rx) = mpsc::unbounded_channel();
let purge_queue = FilePurgeQueue::new(space_id, table_id, tx);
diff --git a/src/benchmarks/src/util.rs b/src/benchmarks/src/util.rs
index cb6d8de9..a7f86f08 100644
--- a/src/benchmarks/src/util.rs
+++ b/src/benchmarks/src/util.rs
@@ -308,6 +308,8 @@ impl Builder {
disk_cache_partition_bits: 0,
object_store: ObjectStoreOptions::Local(LocalOptions {
data_dir: dir.path().to_str().unwrap().to_string(),
+ max_retries: 3,
+ timeout: Default::default(),
}),
},
wal: WalConfig {
@@ -386,6 +388,8 @@ impl Default for RocksDBEngineBuildContext {
disk_cache_partition_bits: 0,
object_store: ObjectStoreOptions::Local(LocalOptions {
data_dir: dir.path().to_str().unwrap().to_string(),
+ max_retries: 3,
+ timeout: Default::default(),
}),
},
wal: WalConfig {
@@ -419,6 +423,8 @@ impl Clone for RocksDBEngineBuildContext {
disk_cache_partition_bits: 0,
object_store: ObjectStoreOptions::Local(LocalOptions {
data_dir: dir.path().to_str().unwrap().to_string(),
+ max_retries: 3,
+ timeout: Default::default(),
}),
};
diff --git a/src/components/object_store/Cargo.toml
b/src/components/object_store/Cargo.toml
index f9221e1d..926e85f9 100644
--- a/src/components/object_store/Cargo.toml
+++ b/src/components/object_store/Cargo.toml
@@ -45,11 +45,18 @@ logger = { workspace = true }
lru = { workspace = true }
macros = { workspace = true }
notifier = { workspace = true }
+object_store_opendal = "0.46.0"
+opendal = { version = "0.49.0", features = [
+ "services-oss",
+ "services-s3",
+ "services-fs",
+] }
partitioned_lock = { workspace = true }
prometheus = { workspace = true }
prometheus-static-metric = { workspace = true }
prost = { workspace = true }
rand = { workspace = true }
+reqwest = { workspace = true }
runtime = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
@@ -59,7 +66,7 @@ table_kv = { workspace = true }
time_ext = { workspace = true }
tokio = { workspace = true }
twox-hash = "1.6"
-upstream = { package = "object_store", version = "0.10.1", features = [ "aws"
] }
+upstream = { package = "object_store", version = "0.10.1" }
uuid = { version = "1.3.3", features = ["v4"] }
[dev-dependencies]
diff --git a/src/components/object_store/src/aliyun.rs
b/src/components/object_store/src/aliyun.rs
index f2432d51..736c8755 100644
--- a/src/components/object_store/src/aliyun.rs
+++ b/src/components/object_store/src/aliyun.rs
@@ -15,9 +15,12 @@
// specific language governing permissions and limitations
// under the License.
-use upstream::{
- aws::{AmazonS3, AmazonS3Builder},
- ClientOptions, RetryConfig,
+use object_store_opendal::OpendalStore;
+use opendal::{
+ layers::{RetryLayer, TimeoutLayer},
+ raw::HttpClient,
+ services::Oss,
+ Operator, Result,
};
use crate::config::AliyunOptions;
@@ -34,36 +37,35 @@ fn normalize_endpoint(endpoint: &str, bucket: &str) ->
String {
}
}
-pub fn try_new(aliyun_opts: &AliyunOptions) -> upstream::Result<AmazonS3> {
- let cli_opt = ClientOptions::new()
- .with_allow_http(true)
- .with_pool_max_idle_per_host(aliyun_opts.http.pool_max_idle_per_host)
- .with_http2_keep_alive_timeout(aliyun_opts.http.keep_alive_timeout.0)
- .with_http2_keep_alive_while_idle()
- .with_http2_keep_alive_interval(aliyun_opts.http.keep_alive_interval.0)
- .with_timeout(aliyun_opts.http.timeout.0);
- let retry_config = RetryConfig {
- max_retries: aliyun_opts.retry.max_retries,
- retry_timeout: aliyun_opts.retry.retry_timeout.0,
- ..Default::default()
- };
+pub fn try_new(aliyun_opts: &AliyunOptions) -> Result<OpendalStore> {
+ let http_builder = reqwest::ClientBuilder::new()
+ .pool_max_idle_per_host(aliyun_opts.http.pool_max_idle_per_host)
+ .http2_keep_alive_timeout(aliyun_opts.http.keep_alive_timeout.0)
+ .http2_keep_alive_while_idle(true)
+ .http2_keep_alive_interval(aliyun_opts.http.keep_alive_interval.0)
+ .timeout(aliyun_opts.http.timeout.0);
+ let http_client = HttpClient::build(http_builder)?;
let endpoint = &aliyun_opts.endpoint;
let bucket = &aliyun_opts.bucket;
let endpoint = normalize_endpoint(endpoint, bucket);
- AmazonS3Builder::new()
- .with_virtual_hosted_style_request(true)
- // region is not used when virtual_hosted_style is true,
- // but is required, so dummy is used here
- // https://github.com/apache/arrow-rs/issues/3827
- .with_region("dummy")
- .with_access_key_id(&aliyun_opts.key_id)
- .with_secret_access_key(&aliyun_opts.key_secret)
- .with_endpoint(endpoint)
- .with_bucket_name(bucket)
- .with_client_options(cli_opt)
- .with_retry(retry_config)
- .build()
+
+ let builder = Oss::default()
+ .access_key_id(&aliyun_opts.key_id)
+ .access_key_secret(&aliyun_opts.key_secret)
+ .endpoint(&endpoint)
+ .bucket(bucket)
+ .http_client(http_client);
+ let op = Operator::new(builder)?
+ .layer(
+ TimeoutLayer::new()
+ .with_timeout(aliyun_opts.timeout.timeout.0)
+ .with_io_timeout(aliyun_opts.timeout.io_timeout.0),
+ )
+ .layer(RetryLayer::new().with_max_times(aliyun_opts.max_retries))
+ .finish();
+
+ Ok(OpendalStore::new(op))
}
#[cfg(test)]
diff --git a/src/components/object_store/src/config.rs
b/src/components/object_store/src/config.rs
index d0ecbfb0..072b9159 100644
--- a/src/components/object_store/src/config.rs
+++ b/src/components/object_store/src/config.rs
@@ -49,9 +49,7 @@ impl Default for StorageOptions {
disk_cache_capacity: ReadableSize::gb(0),
disk_cache_page_size: ReadableSize::mb(2),
disk_cache_partition_bits: 4,
- object_store: ObjectStoreOptions::Local(LocalOptions {
- data_dir: root_path,
- }),
+ object_store:
ObjectStoreOptions::Local(LocalOptions::new_with_default(root_path)),
}
}
}
@@ -68,6 +66,20 @@ pub enum ObjectStoreOptions {
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct LocalOptions {
pub data_dir: String,
+ #[serde(default = "default_max_retries")]
+ pub max_retries: usize,
+ #[serde(default)]
+ pub timeout: TimeoutOptions,
+}
+
+impl LocalOptions {
+ pub fn new_with_default(data_dir: String) -> Self {
+ Self {
+ data_dir,
+ max_retries: default_max_retries(),
+ timeout: Default::default(),
+ }
+ }
}
#[derive(Debug, Clone, Deserialize, Serialize)]
@@ -77,10 +89,12 @@ pub struct AliyunOptions {
pub endpoint: String,
pub bucket: String,
pub prefix: String,
+ #[serde(default = "default_max_retries")]
+ pub max_retries: usize,
#[serde(default)]
pub http: HttpOptions,
#[serde(default)]
- pub retry: RetryOptions,
+ pub timeout: TimeoutOptions,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
@@ -91,10 +105,12 @@ pub struct S3Options {
pub endpoint: String,
pub bucket: String,
pub prefix: String,
+ #[serde(default = "default_max_retries")]
+ pub max_retries: usize,
#[serde(default)]
pub http: HttpOptions,
#[serde(default)]
- pub retry: RetryOptions,
+ pub timeout: TimeoutOptions,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
@@ -117,16 +133,25 @@ impl Default for HttpOptions {
}
#[derive(Debug, Clone, Deserialize, Serialize)]
-pub struct RetryOptions {
- pub max_retries: usize,
- pub retry_timeout: ReadableDuration,
+pub struct TimeoutOptions {
+ // Non IO Operation like stat and delete, they operate on a single file,
we control them by
+ // setting timeout.
+ pub timeout: ReadableDuration,
+ // IO Operation like read and write, they operate on data directly, we
control them by setting
+ // io_timeout.
+ pub io_timeout: ReadableDuration,
}
-impl Default for RetryOptions {
+impl Default for TimeoutOptions {
fn default() -> Self {
Self {
- max_retries: 3,
- retry_timeout: ReadableDuration::from(Duration::from_secs(3 * 60)),
+ timeout: ReadableDuration::from(Duration::from_secs(10)),
+ io_timeout: ReadableDuration::from(Duration::from_secs(10)),
}
}
}
+
+#[inline]
+fn default_max_retries() -> usize {
+ 3
+}
diff --git a/src/components/object_store/src/disk_cache.rs
b/src/components/object_store/src/disk_cache.rs
index 33ab7776..a89fd428 100644
--- a/src/components/object_store/src/disk_cache.rs
+++ b/src/components/object_store/src/disk_cache.rs
@@ -1033,10 +1033,9 @@ impl ObjectStore for DiskCacheStore {
mod test {
use runtime::{Builder, RuntimeRef};
use tempfile::{tempdir, TempDir};
- use upstream::local::LocalFileSystem;
use super::*;
- use crate::test_util::MemoryStore;
+ use crate::{local_file, test_util::MemoryStore};
struct StoreWithCacheDir {
inner: DiskCacheStore,
@@ -1334,9 +1333,10 @@ mod test {
let page_size = 8;
let first_create_time = {
let _store = {
- let local_path = tempdir().unwrap();
+ let local_path =
tempdir().unwrap().as_ref().to_string_lossy().to_string();
let local_store =
-
Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());
+
Arc::new(local_file::try_new_with_default(local_path).unwrap());
+
DiskCacheStore::try_new(
cache_root_dir.clone(),
160,
@@ -1361,9 +1361,9 @@ mod test {
// open again
{
let _store = {
- let local_path = tempdir().unwrap();
+ let local_path =
tempdir().unwrap().as_ref().to_string_lossy().to_string();
let local_store =
-
Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());
+
Arc::new(local_file::try_new_with_default(local_path).unwrap());
DiskCacheStore::try_new(
cache_root_dir.clone(),
160,
@@ -1387,9 +1387,8 @@ mod test {
// open again, but with different page_size
{
- let local_path = tempdir().unwrap();
- let local_store =
-
Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());
+ let local_path =
tempdir().unwrap().as_ref().to_string_lossy().to_string();
+ let local_store =
Arc::new(local_file::try_new_with_default(local_path).unwrap());
let store = DiskCacheStore::try_new(
cache_dir.as_ref().to_string_lossy().to_string(),
160,
@@ -1407,7 +1406,7 @@ mod test {
#[test]
fn test_disk_cache_recovery() {
- let rt = Arc::new(Builder::default().build().unwrap());
+ let rt = Arc::new(Builder::default().enable_all().build().unwrap());
rt.block_on(async {
let cache_dir = tempdir().unwrap();
let cache_root_dir =
cache_dir.as_ref().to_string_lossy().to_string();
@@ -1415,9 +1414,9 @@ mod test {
let location = Path::from("recovery.sst");
{
let store = {
- let local_path = tempdir().unwrap();
+ let local_path =
tempdir().unwrap().as_ref().to_string_lossy().to_string();
let local_store =
-
Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());
+
Arc::new(local_file::try_new_with_default(local_path).unwrap());
DiskCacheStore::try_new(
cache_root_dir.clone(),
10240,
@@ -1448,9 +1447,9 @@ mod test {
// recover
{
let store = {
- let local_path = tempdir().unwrap();
+ let local_path =
tempdir().unwrap().as_ref().to_string_lossy().to_string();
let local_store =
-
Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());
+
Arc::new(local_file::try_new_with_default(local_path).unwrap());
DiskCacheStore::try_new(
cache_root_dir.clone(),
160,
diff --git a/src/components/object_store/src/lib.rs
b/src/components/object_store/src/lib.rs
index 350ccfa0..4627dbae 100644
--- a/src/components/object_store/src/lib.rs
+++ b/src/components/object_store/src/lib.rs
@@ -19,25 +19,22 @@
use std::sync::Arc;
-pub use multi_part::{ConcurrentMultipartUpload, MultiUploadWriter};
-use tokio::sync::Mutex;
+pub use opendal::Error as OpenDalError;
pub use upstream::{
- local::LocalFileSystem, path::Path, Error as ObjectStoreError, Error,
GetResult, ListResult,
- ObjectMeta, ObjectStore, PutPayloadMut,
+ path::Path, Error as ObjectStoreError, GetResult, ListResult, ObjectMeta,
ObjectStore,
+ PutPayloadMut,
};
pub mod aliyun;
pub mod config;
pub mod disk_cache;
+pub mod local_file;
pub mod mem_cache;
pub mod metrics;
-mod multi_part;
+pub mod multi_part;
pub mod prefix;
pub mod s3;
#[cfg(test)]
pub mod test_util;
pub type ObjectStoreRef = Arc<dyn ObjectStore>;
-
-// TODO: remove Mutex and make ConcurrentMultipartUpload thread-safe
-pub type WriteMultipartRef = Arc<Mutex<ConcurrentMultipartUpload>>;
diff --git a/src/components/object_store/src/lib.rs
b/src/components/object_store/src/local_file.rs
similarity index 50%
copy from src/components/object_store/src/lib.rs
copy to src/components/object_store/src/local_file.rs
index 350ccfa0..4070b004 100644
--- a/src/components/object_store/src/lib.rs
+++ b/src/components/object_store/src/local_file.rs
@@ -15,29 +15,30 @@
// specific language governing permissions and limitations
// under the License.
-//! Re-export of [object_store] crate.
-
-use std::sync::Arc;
-
-pub use multi_part::{ConcurrentMultipartUpload, MultiUploadWriter};
-use tokio::sync::Mutex;
-pub use upstream::{
- local::LocalFileSystem, path::Path, Error as ObjectStoreError, Error,
GetResult, ListResult,
- ObjectMeta, ObjectStore, PutPayloadMut,
+use object_store_opendal::OpendalStore;
+use opendal::{
+ layers::{RetryLayer, TimeoutLayer},
+ services::Fs,
+ Operator, Result,
};
-pub mod aliyun;
-pub mod config;
-pub mod disk_cache;
-pub mod mem_cache;
-pub mod metrics;
-mod multi_part;
-pub mod prefix;
-pub mod s3;
-#[cfg(test)]
-pub mod test_util;
+use crate::config::LocalOptions;
+
+pub fn try_new(local_opts: &LocalOptions) -> Result<OpendalStore> {
+ let builder = Fs::default().root(&local_opts.data_dir);
+ let op = Operator::new(builder)?
+ .layer(
+ TimeoutLayer::new()
+ .with_timeout(local_opts.timeout.timeout.0)
+ .with_io_timeout(local_opts.timeout.io_timeout.0),
+ )
+ .layer(RetryLayer::new().with_max_times(local_opts.max_retries))
+ .finish();
-pub type ObjectStoreRef = Arc<dyn ObjectStore>;
+ Ok(OpendalStore::new(op))
+}
-// TODO: remove Mutex and make ConcurrentMultipartUpload thread-safe
-pub type WriteMultipartRef = Arc<Mutex<ConcurrentMultipartUpload>>;
+pub fn try_new_with_default(data_dir: String) -> Result<OpendalStore> {
+ let local_opts = LocalOptions::new_with_default(data_dir);
+ try_new(&local_opts)
+}
diff --git a/src/components/object_store/src/mem_cache.rs
b/src/components/object_store/src/mem_cache.rs
index 0fa8a912..9e40fb8e 100644
--- a/src/components/object_store/src/mem_cache.rs
+++ b/src/components/object_store/src/mem_cache.rs
@@ -294,13 +294,13 @@ impl ObjectStore for MemCacheStore {
#[cfg(test)]
mod test {
use tempfile::tempdir;
- use upstream::local::LocalFileSystem;
use super::*;
+ use crate::local_file;
fn prepare_store(bits: usize, mem_cap: usize) -> MemCacheStore {
- let local_path = tempdir().unwrap();
- let local_store =
Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());
+ let local_path =
tempdir().unwrap().as_ref().to_string_lossy().to_string();
+ let local_store =
Arc::new(local_file::try_new_with_default(local_path).unwrap());
let mem_cache =
Arc::new(MemCache::try_new(bits,
NonZeroUsize::new(mem_cap).unwrap()).unwrap());
diff --git a/src/components/object_store/src/multi_part.rs
b/src/components/object_store/src/multi_part.rs
index 871ffe2a..fb5b9dd9 100644
--- a/src/components/object_store/src/multi_part.rs
+++ b/src/components/object_store/src/multi_part.rs
@@ -28,7 +28,13 @@ use tokio::{io::AsyncWrite, sync::Mutex, task::JoinSet};
pub use upstream::PutPayloadMut;
use upstream::{path::Path, Error, MultipartUpload, PutPayload, PutResult};
-use crate::{ObjectStoreRef, WriteMultipartRef};
+use crate::ObjectStoreRef;
+
+// TODO: remove Mutex and make ConcurrentMultipartUpload thread-safe
+pub type MultiUploadRef = Arc<Mutex<ConcurrentMultipartUpload>>;
+
+const CHUNK_SIZE: usize = 5 * 1024 * 1024;
+const MAX_CONCURRENCY: usize = 10;
#[derive(Debug)]
pub struct ConcurrentMultipartUpload {
@@ -113,15 +119,12 @@ impl ConcurrentMultipartUpload {
}
pub struct MultiUploadWriter {
- pub multi_upload: WriteMultipartRef,
+ pub multi_upload: MultiUploadRef,
upload_task: Option<BoxFuture<'static, std::result::Result<usize,
IoError>>>,
flush_task: Option<BoxFuture<'static, std::result::Result<(), IoError>>>,
completion_task: Option<BoxFuture<'static, std::result::Result<(),
IoError>>>,
}
-const CHUNK_SIZE: usize = 5 * 1024 * 1024;
-const MAX_CONCURRENCY: usize = 10;
-
impl<'a> MultiUploadWriter {
pub async fn new(object_store: &'a ObjectStoreRef, location: &'a Path) ->
Result<Self, Error> {
let upload_writer = object_store.put_multipart(location).await?;
@@ -141,7 +144,7 @@ impl<'a> MultiUploadWriter {
Ok(multi_upload)
}
- pub fn aborter(&self) -> WriteMultipartRef {
+ pub fn aborter(&self) -> MultiUploadRef {
self.multi_upload.clone()
}
}
diff --git a/src/components/object_store/src/prefix.rs
b/src/components/object_store/src/prefix.rs
index 187b10a8..24233eeb 100644
--- a/src/components/object_store/src/prefix.rs
+++ b/src/components/object_store/src/prefix.rs
@@ -238,9 +238,9 @@ mod tests {
use chrono::{DateTime, Utc};
use futures::{stream, stream::StreamExt};
use tempfile::tempdir;
- use upstream::local::LocalFileSystem;
use super::*;
+ use crate::local_file;
#[derive(Debug, Clone)]
struct PathPrefixChecker {
@@ -423,8 +423,8 @@ mod tests {
("/0/1/", "100/101.sst", "0/1/100/101.sst"),
];
- let local_path = tempdir().unwrap();
- let local_store =
Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());
+ let local_path =
tempdir().unwrap().as_ref().to_string_lossy().to_string();
+ let local_store =
Arc::new(local_file::try_new_with_default(local_path).unwrap());
for (prefix, filename, expect_loc) in cases.clone() {
let prefix_store =
StoreWithPrefix::new(prefix.to_string(),
local_store.clone()).unwrap();
diff --git a/src/components/object_store/src/s3.rs
b/src/components/object_store/src/s3.rs
index fdbce027..2b81521f 100644
--- a/src/components/object_store/src/s3.rs
+++ b/src/components/object_store/src/s3.rs
@@ -15,34 +15,40 @@
// specific language governing permissions and limitations
// under the License.
-use upstream::{
- aws::{AmazonS3, AmazonS3Builder},
- ClientOptions, RetryConfig,
+use object_store_opendal::OpendalStore;
+use opendal::{
+ layers::{RetryLayer, TimeoutLayer},
+ raw::HttpClient,
+ services::S3,
+ Operator, Result,
};
use crate::config::S3Options;
-pub fn try_new(s3_option: &S3Options) -> upstream::Result<AmazonS3> {
- let cli_opt = ClientOptions::new()
- .with_allow_http(true)
- .with_pool_max_idle_per_host(s3_option.http.pool_max_idle_per_host)
- .with_http2_keep_alive_timeout(s3_option.http.keep_alive_timeout.0)
- .with_http2_keep_alive_while_idle()
- .with_http2_keep_alive_interval(s3_option.http.keep_alive_interval.0)
- .with_timeout(s3_option.http.timeout.0);
- let retry_config = RetryConfig {
- max_retries: s3_option.retry.max_retries,
- retry_timeout: s3_option.retry.retry_timeout.0,
- ..Default::default()
- };
+pub fn try_new(s3_option: &S3Options) -> Result<OpendalStore> {
+ let http_builder = reqwest::ClientBuilder::new()
+ .pool_max_idle_per_host(s3_option.http.pool_max_idle_per_host)
+ .http2_keep_alive_timeout(s3_option.http.keep_alive_timeout.0)
+ .http2_keep_alive_while_idle(true)
+ .http2_keep_alive_interval(s3_option.http.keep_alive_interval.0)
+ .timeout(s3_option.http.timeout.0);
+ let http_client = HttpClient::build(http_builder)?;
- AmazonS3Builder::new()
- .with_region(&s3_option.region)
- .with_access_key_id(&s3_option.key_id)
- .with_secret_access_key(&s3_option.key_secret)
- .with_endpoint(&s3_option.endpoint)
- .with_bucket_name(&s3_option.bucket)
- .with_client_options(cli_opt)
- .with_retry(retry_config)
- .build()
+ let builder = S3::default()
+ .region(&s3_option.region)
+ .access_key_id(&s3_option.key_id)
+ .secret_access_key(&s3_option.key_secret)
+ .endpoint(&s3_option.endpoint)
+ .bucket(&s3_option.bucket)
+ .http_client(http_client);
+ let op = Operator::new(builder)?
+ .layer(
+ TimeoutLayer::new()
+ .with_timeout(s3_option.timeout.timeout.0)
+ .with_io_timeout(s3_option.timeout.io_timeout.0),
+ )
+ .layer(RetryLayer::new().with_max_times(s3_option.max_retries))
+ .finish();
+
+ Ok(OpendalStore::new(op))
}
diff --git a/src/tools/src/bin/sst-convert.rs b/src/tools/src/bin/sst-convert.rs
index c4c9935c..7c7856be 100644
--- a/src/tools/src/bin/sst-convert.rs
+++ b/src/tools/src/bin/sst-convert.rs
@@ -37,7 +37,7 @@ use common_types::{
request_id::RequestId,
};
use generic_error::BoxError;
-use object_store::{LocalFileSystem, Path};
+use object_store::{config::LocalOptions, local_file, Path};
use runtime::Runtime;
use table_engine::predicate::Predicate;
use tools::sst_util;
@@ -91,7 +91,12 @@ fn main() {
}
async fn run(args: Args, runtime: Arc<Runtime>) -> Result<()> {
- let storage =
LocalFileSystem::new_with_prefix(args.store_path).expect("invalid path");
+ let local_opts = LocalOptions {
+ data_dir: args.store_path,
+ max_retries: 3,
+ timeout: Default::default(),
+ };
+ let storage = local_file::try_new(&local_opts).expect("invalid path");
let store = Arc::new(storage) as _;
let input_path = Path::from(args.input);
let sst_meta = sst_util::meta_from_sst(&store, &input_path).await;
diff --git a/src/tools/src/bin/sst-metadata.rs
b/src/tools/src/bin/sst-metadata.rs
index bf659960..b48ca929 100644
--- a/src/tools/src/bin/sst-metadata.rs
+++ b/src/tools/src/bin/sst-metadata.rs
@@ -23,7 +23,7 @@ use analytic_engine::sst::{meta_data::cache::MetaData,
parquet::async_reader::Ch
use anyhow::{Context, Result};
use clap::Parser;
use futures::StreamExt;
-use object_store::{LocalFileSystem, ObjectMeta, ObjectStoreRef, Path};
+use object_store::{config::LocalOptions, local_file, ObjectMeta,
ObjectStoreRef, Path};
use parquet_ext::{meta_data::fetch_parquet_metadata,
reader::ObjectStoreReader};
use runtime::Runtime;
use time_ext::format_as_ymdhms;
@@ -141,7 +141,12 @@ fn main() {
async fn run(args: Args) -> Result<()> {
let handle = Handle::current();
- let storage = LocalFileSystem::new_with_prefix(&args.dir)?;
+ let local_opts = LocalOptions {
+ data_dir: args.dir,
+ max_retries: 3,
+ timeout: Default::default(),
+ };
+ let storage = local_file::try_new(&local_opts)?;
let storage: ObjectStoreRef = Arc::new(storage);
let mut join_set = JoinSet::new();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]