This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 00f4aca feat: support scan from oss (#217)
00f4aca is described below
commit 00f4acaa761cdf610ed2314783ebf40de5519e32
Author: yuxia Luo <[email protected]>
AuthorDate: Fri Jan 30 09:34:55 2026 +0800
feat: support scan from oss (#217)
---
crates/fluss/Cargo.toml | 5 ++--
crates/fluss/src/client/credentials.rs | 28 +++++++++++++++++-
crates/fluss/src/client/table/remote_log.rs | 1 +
crates/fluss/src/io/mod.rs | 5 ++++
crates/fluss/src/io/storage.rs | 15 +++++++++-
crates/fluss/src/io/storage_oss.rs | 45 +++++++++++++++++++++++++++++
6 files changed, 95 insertions(+), 4 deletions(-)
diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml
index 9aeee72..6b2707b 100644
--- a/crates/fluss/Cargo.toml
+++ b/crates/fluss/Cargo.toml
@@ -22,12 +22,13 @@ version = { workspace = true }
name = "fluss"
[features]
-default = ["storage-memory", "storage-fs", "storage-s3"]
-storage-all = ["storage-memory", "storage-fs", "storage-s3"]
+default = ["storage-memory", "storage-fs"]
+storage-all = ["storage-memory", "storage-fs", "storage-s3", "storage-oss"]
storage-memory = ["opendal/services-memory"]
storage-fs = ["opendal/services-fs"]
storage-s3 = ["opendal/services-s3"]
+storage-oss = ["opendal/services-oss"]
integration_tests = []
[dependencies]
diff --git a/crates/fluss/src/client/credentials.rs
b/crates/fluss/src/client/credentials.rs
index 93a5366..a954e2a 100644
--- a/crates/fluss/src/client/credentials.rs
+++ b/crates/fluss/src/client/credentials.rs
@@ -55,10 +55,14 @@ struct Credentials {
/// needs_inversion is true for path_style_access -> enable_virtual_host_style
conversion
fn convert_hadoop_key_to_opendal(hadoop_key: &str) -> Option<(String, bool)> {
match hadoop_key {
+ // S3 specific configurations
"fs.s3a.endpoint" => Some(("endpoint".to_string(), false)),
"fs.s3a.endpoint.region" => Some(("region".to_string(), false)),
"fs.s3a.path.style.access" =>
Some(("enable_virtual_host_style".to_string(), true)),
"fs.s3a.connection.ssl.enabled" => None,
+ // OSS specific configurations
+ "fs.oss.endpoint" => Some(("endpoint".to_string(), false)),
+ "fs.oss.region" => Some(("region".to_string(), false)),
_ => None,
}
}
@@ -74,11 +78,20 @@ fn build_remote_fs_props(
"access_key_id".to_string(),
credentials.access_key_id.clone(),
);
+
+ // S3 specific configurations
props.insert(
"secret_access_key".to_string(),
credentials.access_key_secret.clone(),
);
+ // OSS specific configurations, todo: consider refactor it
+ // to handle different conversion for different scheme in different method
+ props.insert(
+ "access_key_secret".to_string(),
+ credentials.access_key_secret.clone(),
+ );
+
if let Some(token) = &credentials.security_token {
props.insert("security_token".to_string(), token.clone());
}
@@ -342,6 +355,7 @@ mod tests {
#[test]
fn convert_hadoop_key_to_opendal_maps_known_keys() {
+ // S3 keys
let (key, invert) =
convert_hadoop_key_to_opendal("fs.s3a.endpoint").expect("key");
assert_eq!(key, "endpoint");
assert!(!invert);
@@ -351,6 +365,17 @@ mod tests {
assert!(invert);
assert!(convert_hadoop_key_to_opendal("fs.s3a.connection.ssl.enabled").is_none());
+
+ // OSS keys
+ let (key, invert) =
convert_hadoop_key_to_opendal("fs.oss.endpoint").expect("key");
+ assert_eq!(key, "endpoint");
+ assert!(!invert);
+
+ let (key, invert) =
convert_hadoop_key_to_opendal("fs.oss.region").expect("key");
+ assert_eq!(key, "region");
+ assert!(!invert);
+
+ // Unknown key
assert!(convert_hadoop_key_to_opendal("unknown.key").is_none());
}
@@ -401,7 +426,8 @@ mod tests {
let props = build_remote_fs_props(&credentials, &addition_infos);
assert_eq!(props.get("access_key_id"), Some(&"ak".to_string()));
- assert_eq!(props.get("secret_access_key"), Some(&"sk".to_string()));
+ assert_eq!(props.get("access_key_secret"), Some(&"sk".to_string()));
+ assert_eq!(props.get("access_key_secret"), Some(&"sk".to_string()));
assert_eq!(props.get("security_token"), Some(&"token".to_string()));
assert_eq!(
props.get("enable_virtual_host_style"),
diff --git a/crates/fluss/src/client/table/remote_log.rs
b/crates/fluss/src/client/table/remote_log.rs
index df74771..5583f89 100644
--- a/crates/fluss/src/client/table/remote_log.rs
+++ b/crates/fluss/src/client/table/remote_log.rs
@@ -869,6 +869,7 @@ impl RemoteLogDownloader {
// For S3/S3A URLs, inject S3 credentials from props
let file_io_builder = if remote_log_tablet_dir.starts_with("s3://")
|| remote_log_tablet_dir.starts_with("s3a://")
+ || remote_log_tablet_dir.starts_with("oss://")
{
file_io_builder.with_props(
remote_fs_props
diff --git a/crates/fluss/src/io/mod.rs b/crates/fluss/src/io/mod.rs
index a03a394..7426501 100644
--- a/crates/fluss/src/io/mod.rs
+++ b/crates/fluss/src/io/mod.rs
@@ -37,3 +37,8 @@ use storage_memory::*;
mod storage_s3;
#[cfg(feature = "storage-s3")]
use storage_s3::*;
+
+#[cfg(feature = "storage-oss")]
+mod storage_oss;
+#[cfg(feature = "storage-oss")]
+use storage_oss::*;
diff --git a/crates/fluss/src/io/storage.rs b/crates/fluss/src/io/storage.rs
index d90eaa5..a370861 100644
--- a/crates/fluss/src/io/storage.rs
+++ b/crates/fluss/src/io/storage.rs
@@ -19,7 +19,6 @@ use crate::error;
use crate::error::Result;
use crate::io::FileIOBuilder;
use opendal::{Operator, Scheme};
-use std::collections::HashMap;
/// The storage carries all supported storage services in fluss
#[derive(Debug)]
@@ -30,9 +29,12 @@ pub enum Storage {
LocalFs,
#[cfg(feature = "storage-s3")]
S3 { props: HashMap<String, String> },
+ #[cfg(feature = "storage-oss")]
+ Oss { props: HashMap<String, String> },
}
impl Storage {
+ #[allow(unused_variables)]
pub(crate) fn build(file_io_builder: FileIOBuilder) -> Result<Self> {
let (scheme_str, props) = file_io_builder.into_parts();
let scheme = Self::parse_scheme(&scheme_str)?;
@@ -44,6 +46,8 @@ impl Storage {
Scheme::Fs => Ok(Self::LocalFs),
#[cfg(feature = "storage-s3")]
Scheme::S3 => Ok(Self::S3 { props }),
+ #[cfg(feature = "storage-oss")]
+ Scheme::Oss => Ok(Self::Oss { props }),
_ => Err(error::Error::IoUnsupported {
message: format!("Unsupported storage feature {scheme_str}"),
}),
@@ -79,6 +83,14 @@ impl Storage {
let op = super::s3_config_build(&s3_props)?;
Ok((op, key))
}
+ #[cfg(feature = "storage-oss")]
+ Storage::Oss { props } => {
+ let (bucket, key) = super::parse_oss_path(path);
+ let mut oss_props = props.clone();
+ oss_props.insert("bucket".to_string(), bucket.to_string());
+ let op = super::oss_config_build(&oss_props)?;
+ Ok((op, key))
+ }
}
}
@@ -87,6 +99,7 @@ impl Storage {
"memory" => Ok(Scheme::Memory),
"file" | "" => Ok(Scheme::Fs),
"s3" | "s3a" => Ok(Scheme::S3),
+ "oss" => Ok(Scheme::Oss),
s => Ok(s.parse::<Scheme>()?),
}
}
diff --git a/crates/fluss/src/io/storage_oss.rs
b/crates/fluss/src/io/storage_oss.rs
new file mode 100644
index 0000000..3d5d054
--- /dev/null
+++ b/crates/fluss/src/io/storage_oss.rs
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::Result;
+use opendal::Configurator;
+use opendal::Operator;
+use opendal::layers::TimeoutLayer;
+use opendal::services::OssConfig;
+use std::collections::HashMap;
+use std::time::Duration;
+
+pub(crate) fn oss_config_build(props: &HashMap<String, String>) ->
Result<Operator> {
+ let config = OssConfig::from_iter(props.clone())?;
+ let op = Operator::from_config(config)?.finish();
+
+ // Add timeout layer to prevent hanging on OSS operations
+ let timeout_layer = TimeoutLayer::new()
+ .with_timeout(Duration::from_secs(10))
+ .with_io_timeout(Duration::from_secs(30));
+
+ Ok(op.layer(timeout_layer))
+}
+
+pub(crate) fn parse_oss_path(path: &str) -> (&str, &str) {
+ let path = path.strip_prefix("oss://").unwrap_or(path);
+
+ match path.find('/') {
+ Some(idx) => (&path[..idx], &path[idx + 1..]),
+ None => (path, ""),
+ }
+}