This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 24cebe9 feat: support s3 as remote segment (#93)
24cebe9 is described below
commit 24cebe9b07f3df34bf65bf670741570cbad21830
Author: AlexZhao <[email protected]>
AuthorDate: Sat Dec 20 11:32:10 2025 +0800
feat: support s3 as remote segment (#93)
---------
Co-authored-by: luoyuxia <[email protected]>
---
Cargo.toml | 5 +
crates/examples/Cargo.toml | 2 +-
crates/fluss/Cargo.toml | 11 +-
crates/fluss/src/client/credentials.rs | 165 +++++++++++++++++++++
crates/fluss/src/client/mod.rs | 2 +
crates/fluss/src/client/table/remote_log.rs | 69 +++++++--
crates/fluss/src/client/table/scanner.rs | 9 ++
crates/fluss/src/io/mod.rs | 7 +-
crates/fluss/src/io/storage.rs | 16 +-
crates/fluss/src/io/storage_s3.rs | 48 ++++++
crates/fluss/src/proto/fluss_api.proto | 15 ++
crates/fluss/src/record/arrow.rs | 67 ++++++++-
crates/fluss/src/rpc/api_key.rs | 3 +
crates/fluss/src/rpc/message/get_security_token.rs | 53 +++++++
crates/fluss/src/rpc/message/mod.rs | 2 +
15 files changed, 453 insertions(+), 21 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 284a836..4155ea8 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -33,4 +33,9 @@ fluss = { version = "0.1.0", path = "./crates/fluss" }
tokio = { version = "1.44.2", features = ["full"] }
clap = { version = "4.5.37", features = ["derive"] }
arrow = { version = "57.0.0", features = ["ipc_compression"] }
+chrono = { version = "0.4", features = ["clock", "std", "wasmbind"] }
+
+serde = { version = "1.0", features = ["derive"] }
+serde_json = "1.0"
+opendal = "0.53"
jiff = { version = "0.2" }
diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml
index dab85b6..e1fa531 100644
--- a/crates/examples/Cargo.toml
+++ b/crates/examples/Cargo.toml
@@ -26,7 +26,7 @@ version = { workspace = true }
[dependencies]
fluss = { workspace = true }
tokio = { workspace = true }
-clap = { workspace = true}
+clap = { workspace = true }
[[example]]
name = "example-table"
path = "src/example_table.rs"
\ No newline at end of file
diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml
index aa763d5..0cf0364 100644
--- a/crates/fluss/Cargo.toml
+++ b/crates/fluss/Cargo.toml
@@ -22,11 +22,12 @@ version = { workspace = true }
name = "fluss"
[features]
-default = ["storage-memory", "storage-fs"]
-storage-all = ["storage-memory", "storage-fs"]
+default = ["storage-memory", "storage-fs", "storage-s3"]
+storage-all = ["storage-memory", "storage-fs", "storage-s3"]
storage-memory = ["opendal/services-memory"]
storage-fs = ["opendal/services-fs"]
+storage-s3 = ["opendal/services-s3"]
integration_tests = []
[dependencies]
@@ -39,9 +40,9 @@ crc32c = "0.6.8"
linked-hash-map = "0.5.6"
prost = "0.14"
rand = "0.9.1"
-serde = { version = "1.0.219", features = ["derive", "rc"] }
-serde_json = "1.0.140"
-thiserror = "2"
+serde = { workspace = true, features = ["rc"] }
+serde_json = { workspace = true }
+thiserror = "1.0"
log = { version = "0.4", features = ["kv_std"] }
tokio = { workspace = true }
parking_lot = "0.12"
diff --git a/crates/fluss/src/client/credentials.rs
b/crates/fluss/src/client/credentials.rs
new file mode 100644
index 0000000..bd2a477
--- /dev/null
+++ b/crates/fluss/src/client/credentials.rs
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::client::metadata::Metadata;
+use crate::error::{Error, Result};
+use crate::rpc::RpcClient;
+use crate::rpc::message::GetSecurityTokenRequest;
+use parking_lot::RwLock;
+use serde::Deserialize;
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+
+const CACHE_TTL: Duration = Duration::from_secs(3600);
+
+#[derive(Debug, Deserialize)]
+struct Credentials {
+ access_key_id: String,
+ access_key_secret: String,
+ security_token: Option<String>,
+}
+
+struct CachedToken {
+ access_key_id: String,
+ secret_access_key: String,
+ security_token: Option<String>,
+ addition_infos: HashMap<String, String>,
+ cached_at: Instant,
+}
+
+impl CachedToken {
+ fn to_remote_fs_props(&self) -> HashMap<String, String> {
+ let mut props = HashMap::new();
+
+ props.insert("access_key_id".to_string(), self.access_key_id.clone());
+ props.insert(
+ "secret_access_key".to_string(),
+ self.secret_access_key.clone(),
+ );
+
+ if let Some(token) = &self.security_token {
+ props.insert("security_token".to_string(), token.clone());
+ }
+
+ for (key, value) in &self.addition_infos {
+ if let Some((opendal_key, transform)) =
convert_hadoop_key_to_opendal(key) {
+ let final_value = if transform {
+ // Invert boolean value (path_style_access ->
enable_virtual_host_style)
+ if value == "true" {
+ "false".to_string()
+ } else {
+ "true".to_string()
+ }
+ } else {
+ value.clone()
+ };
+ props.insert(opendal_key, final_value);
+ }
+ }
+
+ props
+ }
+}
+
+/// Returns (opendal_key, needs_inversion)
+/// needs_inversion is true for path_style_access -> enable_virtual_host_style
conversion
+fn convert_hadoop_key_to_opendal(hadoop_key: &str) -> Option<(String, bool)> {
+ match hadoop_key {
+ "fs.s3a.endpoint" => Some(("endpoint".to_string(), false)),
+ "fs.s3a.endpoint.region" => Some(("region".to_string(), false)),
+ "fs.s3a.path.style.access" =>
Some(("enable_virtual_host_style".to_string(), true)),
+ "fs.s3a.connection.ssl.enabled" => None,
+ _ => None,
+ }
+}
+
+pub struct CredentialsCache {
+ inner: RwLock<Option<CachedToken>>,
+}
+
+impl CredentialsCache {
+ pub fn new() -> Self {
+ Self {
+ inner: RwLock::new(None),
+ }
+ }
+
+ pub async fn get_or_refresh(
+ &self,
+ rpc_client: &Arc<RpcClient>,
+ metadata: &Arc<Metadata>,
+ ) -> Result<HashMap<String, String>> {
+ {
+ let guard = self.inner.read();
+ if let Some(cached) = guard.as_ref() {
+ if cached.cached_at.elapsed() < CACHE_TTL {
+ return Ok(cached.to_remote_fs_props());
+ }
+ }
+ }
+
+ self.refresh_from_server(rpc_client, metadata).await
+ }
+
+ async fn refresh_from_server(
+ &self,
+ rpc_client: &Arc<RpcClient>,
+ metadata: &Arc<Metadata>,
+ ) -> Result<HashMap<String, String>> {
+ let cluster = metadata.get_cluster();
+ let server_node = cluster.get_one_available_server();
+ let conn = rpc_client.get_connection(server_node).await?;
+
+ let request = GetSecurityTokenRequest::new();
+ let response = conn.request(request).await?;
+
+ // the token may be empty if the remote filesystem
+ // doesn't require token to access
+ if response.token.is_empty() {
+ return Ok(HashMap::new());
+ }
+
+ let credentials: Credentials =
serde_json::from_slice(&response.token).map_err(|e| {
+ Error::JsonSerdeError(format!("Error when parse token from server:
{e}"))
+ })?;
+
+ let mut addition_infos = HashMap::new();
+ for kv in &response.addition_info {
+ addition_infos.insert(kv.key.clone(), kv.value.clone());
+ }
+
+ let cached = CachedToken {
+ access_key_id: credentials.access_key_id,
+ secret_access_key: credentials.access_key_secret,
+ security_token: credentials.security_token,
+ addition_infos,
+ cached_at: Instant::now(),
+ };
+
+ let props = cached.to_remote_fs_props();
+ *self.inner.write() = Some(cached);
+
+ Ok(props)
+ }
+}
+
+impl Default for CredentialsCache {
+ fn default() -> Self {
+ Self::new()
+ }
+}
diff --git a/crates/fluss/src/client/mod.rs b/crates/fluss/src/client/mod.rs
index a971439..cff218b 100644
--- a/crates/fluss/src/client/mod.rs
+++ b/crates/fluss/src/client/mod.rs
@@ -17,12 +17,14 @@
mod admin;
mod connection;
+mod credentials;
mod metadata;
mod table;
mod write;
pub use admin::*;
pub use connection::*;
+pub use credentials::*;
pub use metadata::*;
pub use table::*;
pub use write::*;
diff --git a/crates/fluss/src/client/table/remote_log.rs
b/crates/fluss/src/client/table/remote_log.rs
index 65805d0..a2561f3 100644
--- a/crates/fluss/src/client/table/remote_log.rs
+++ b/crates/fluss/src/client/table/remote_log.rs
@@ -20,6 +20,7 @@ use crate::metadata::TableBucket;
use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment};
use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord};
use crate::util::delete_file;
+use parking_lot::RwLock;
use std::collections::HashMap;
use std::io;
use std::path::{Path, PathBuf};
@@ -115,11 +116,19 @@ impl RemoteLogDownloadFuture {
/// Downloader for remote log segment files
pub struct RemoteLogDownloader {
local_log_dir: TempDir,
+ remote_fs_props: RwLock<HashMap<String, String>>,
}
impl RemoteLogDownloader {
pub fn new(local_log_dir: TempDir) -> Result<Self> {
- Ok(Self { local_log_dir })
+ Ok(Self {
+ local_log_dir,
+ remote_fs_props: RwLock::new(HashMap::new()),
+ })
+ }
+
+ pub fn set_remote_fs_props(&self, props: HashMap<String, String>) {
+ *self.remote_fs_props.write() = props;
}
/// Request to fetch a remote log segment to local. This method is
non-blocking.
@@ -133,10 +142,16 @@ impl RemoteLogDownloader {
let local_file_path = self.local_log_dir.path().join(&local_file_name);
let remote_path = self.build_remote_path(remote_log_tablet_dir,
segment);
let remote_log_tablet_dir = remote_log_tablet_dir.to_string();
+ let remote_fs_props = self.remote_fs_props.read().clone();
// Spawn async download task
tokio::spawn(async move {
- let result =
- Self::download_file(&remote_log_tablet_dir, &remote_path,
&local_file_path).await;
+ let result = Self::download_file(
+ &remote_log_tablet_dir,
+ &remote_path,
+ &local_file_path,
+ &remote_fs_props,
+ )
+ .await;
let _ = sender.send(result);
});
Ok(RemoteLogDownloadFuture::new(receiver))
@@ -157,6 +172,7 @@ impl RemoteLogDownloader {
remote_log_tablet_dir: &str,
remote_path: &str,
local_path: &Path,
+ remote_fs_props: &HashMap<String, String>,
) -> Result<PathBuf> {
// Handle both URL (e.g., "s3://bucket/path") and local file paths
// If the path doesn't contain "://", treat it as a local file path
@@ -169,11 +185,27 @@ impl RemoteLogDownloader {
// Create FileIO from the remote log tablet dir URL to get the storage
let file_io_builder = FileIO::from_url(&remote_log_tablet_dir_url)?;
+ // For S3/S3A URLs, inject S3 credentials from props
+ let file_io_builder = if remote_log_tablet_dir.starts_with("s3://")
+ || remote_log_tablet_dir.starts_with("s3a://")
+ {
+ file_io_builder.with_props(
+ remote_fs_props
+ .iter()
+ .map(|(k, v)| (k.as_str(), v.as_str())),
+ )
+ } else {
+ file_io_builder
+ };
+
// Build storage and create operator directly
let storage = Storage::build(file_io_builder)?;
let (op, relative_path) = storage.create(remote_path)?;
- // Get file metadata to know the size
+ // Timeout for remote storage operations (30 seconds)
+ const REMOTE_OP_TIMEOUT: std::time::Duration =
std::time::Duration::from_secs(30);
+
+ // Get file metadata to know the size with timeout
let meta = op.stat(relative_path).await?;
let file_size = meta.content_length();
@@ -184,13 +216,32 @@ impl RemoteLogDownloader {
// opendal::Reader::read accepts a range, so we read in chunks
const CHUNK_SIZE: u64 = 8 * 1024 * 1024; // 8MB chunks for efficient
reading
let mut offset = 0u64;
+ let mut chunk_count = 0u64;
+ let total_chunks = file_size.div_ceil(CHUNK_SIZE);
while offset < file_size {
let end = std::cmp::min(offset + CHUNK_SIZE, file_size);
let range = offset..end;
-
- // Read chunk from remote storage
- let chunk =
op.read_with(relative_path).range(range.clone()).await?;
+ chunk_count += 1;
+
+ if chunk_count <= 3 || chunk_count % 10 == 0 {
+ log::debug!(
+ "Remote log download: reading chunk
{chunk_count}/{total_chunks} (offset {offset})"
+ );
+ }
+
+ // Read chunk from remote storage with timeout
+ let read_future = op.read_with(relative_path).range(range.clone());
+ let chunk = tokio::time::timeout(REMOTE_OP_TIMEOUT, read_future)
+ .await
+ .map_err(|_| {
+ Error::Io(io::Error::new(
+ io::ErrorKind::TimedOut,
+ format!(
+ "Timeout reading chunk from remote storage:
{remote_path} at offset {offset}"
+ ),
+ ))
+ })??;
let bytes = chunk.to_bytes();
// Write chunk to local file
@@ -254,10 +305,10 @@ impl RemotePendingFetch {
// delete the downloaded local file to free disk
delete_file(file_path).await;
- // Parse log records
+ // Parse log records (remote log contains full data, need client-side
projection)
let mut fetch_records = vec![];
for log_record in &mut LogRecordsBatchs::new(data) {
- fetch_records.extend(log_record.records(&self.read_context)?);
+
fetch_records.extend(log_record.records_for_remote_log(&self.read_context)?);
}
let mut result = HashMap::new();
diff --git a/crates/fluss/src/client/table/scanner.rs
b/crates/fluss/src/client/table/scanner.rs
index f6780d7..f66d7d7 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -16,6 +16,7 @@
// under the License.
use crate::client::connection::FlussConnection;
+use crate::client::credentials::CredentialsCache;
use crate::client::metadata::Metadata;
use crate::error::{Error, Result};
use crate::metadata::{TableBucket, TableInfo, TablePath};
@@ -194,6 +195,7 @@ struct LogFetcher {
log_scanner_status: Arc<LogScannerStatus>,
read_context: ReadContext,
remote_log_downloader: RemoteLogDownloader,
+ credentials_cache: CredentialsCache,
}
impl LogFetcher {
@@ -217,6 +219,7 @@ impl LogFetcher {
log_scanner_status,
read_context,
remote_log_downloader: RemoteLogDownloader::new(tmp_dir)?,
+ credentials_cache: CredentialsCache::new(),
})
}
@@ -256,6 +259,12 @@ impl LogFetcher {
if let Some(ref remote_log_fetch_info) =
fetch_log_for_bucket.remote_log_fetch_info
{
+ let remote_fs_props = self
+ .credentials_cache
+ .get_or_refresh(&self.conns, &self.metadata)
+ .await?;
+ self.remote_log_downloader
+ .set_remote_fs_props(remote_fs_props);
let remote_fetch_info = RemoteLogFetchInfo::from_proto(
remote_log_fetch_info,
table_bucket.clone(),
diff --git a/crates/fluss/src/io/mod.rs b/crates/fluss/src/io/mod.rs
index 3c9a165..a03a394 100644
--- a/crates/fluss/src/io/mod.rs
+++ b/crates/fluss/src/io/mod.rs
@@ -27,8 +27,13 @@ pub use storage::*;
mod storage_fs;
#[cfg(feature = "storage-fs")]
use storage_fs::*;
+
#[cfg(feature = "storage-memory")]
mod storage_memory;
-
#[cfg(feature = "storage-memory")]
use storage_memory::*;
+
+#[cfg(feature = "storage-s3")]
+mod storage_s3;
+#[cfg(feature = "storage-s3")]
+use storage_s3::*;
diff --git a/crates/fluss/src/io/storage.rs b/crates/fluss/src/io/storage.rs
index 361da7e..089670e 100644
--- a/crates/fluss/src/io/storage.rs
+++ b/crates/fluss/src/io/storage.rs
@@ -19,6 +19,7 @@ use crate::error;
use crate::error::Result;
use crate::io::FileIOBuilder;
use opendal::{Operator, Scheme};
+use std::collections::HashMap;
/// The storage carries all supported storage services in fluss
#[derive(Debug)]
@@ -27,11 +28,13 @@ pub enum Storage {
Memory,
#[cfg(feature = "storage-fs")]
LocalFs,
+ #[cfg(feature = "storage-s3")]
+ S3 { props: HashMap<String, String> },
}
impl Storage {
pub(crate) fn build(file_io_builder: FileIOBuilder) -> Result<Self> {
- let (scheme_str, _) = file_io_builder.into_parts();
+ let (scheme_str, props) = file_io_builder.into_parts();
let scheme = Self::parse_scheme(&scheme_str)?;
match scheme {
@@ -39,6 +42,8 @@ impl Storage {
Scheme::Memory => Ok(Self::Memory),
#[cfg(feature = "storage-fs")]
Scheme::Fs => Ok(Self::LocalFs),
+ #[cfg(feature = "storage-s3")]
+ Scheme::S3 => Ok(Self::S3 { props }),
_ => Err(error::Error::IoUnsupported(
"Unsupported storage feature".to_string(),
)),
@@ -66,6 +71,14 @@ impl Storage {
Ok((op, &path[1..]))
}
}
+ #[cfg(feature = "storage-s3")]
+ Storage::S3 { props } => {
+ let (bucket, key) = super::parse_s3_path(path);
+ let mut s3_props = props.clone();
+ s3_props.insert("bucket".to_string(), bucket.to_string());
+ let op = super::s3_config_build(&s3_props)?;
+ Ok((op, key))
+ }
}
}
@@ -73,6 +86,7 @@ impl Storage {
match scheme {
"memory" => Ok(Scheme::Memory),
"file" | "" => Ok(Scheme::Fs),
+ "s3" | "s3a" => Ok(Scheme::S3),
s => Ok(s.parse::<Scheme>()?),
}
}
diff --git a/crates/fluss/src/io/storage_s3.rs
b/crates/fluss/src/io/storage_s3.rs
new file mode 100644
index 0000000..8000d09
--- /dev/null
+++ b/crates/fluss/src/io/storage_s3.rs
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::Result;
+use opendal::Configurator;
+use opendal::Operator;
+use opendal::layers::TimeoutLayer;
+use opendal::services::S3Config;
+use std::collections::HashMap;
+use std::time::Duration;
+
+pub(crate) fn s3_config_build(props: &HashMap<String, String>) ->
Result<Operator> {
+ let config = S3Config::from_iter(props.clone())?;
+ let op = Operator::from_config(config)?.finish();
+
+ // Add timeout layer to prevent hanging on S3 operations
+ let timeout_layer = TimeoutLayer::new()
+ .with_timeout(Duration::from_secs(10))
+ .with_io_timeout(Duration::from_secs(30));
+
+ Ok(op.layer(timeout_layer))
+}
+
+pub(crate) fn parse_s3_path(path: &str) -> (&str, &str) {
+ let path = path
+ .strip_prefix("s3a://")
+ .or_else(|| path.strip_prefix("s3://"))
+ .unwrap_or(path);
+
+ match path.find('/') {
+ Some(idx) => (&path[..idx], &path[idx + 1..]),
+ None => (path, ""),
+ }
+}
diff --git a/crates/fluss/src/proto/fluss_api.proto
b/crates/fluss/src/proto/fluss_api.proto
index ef460fc..e59c2d9 100644
--- a/crates/fluss/src/proto/fluss_api.proto
+++ b/crates/fluss/src/proto/fluss_api.proto
@@ -297,4 +297,19 @@ message PbLakeSnapshotForBucket {
optional int64 partition_id = 1;
required int32 bucket_id = 2;
optional int64 log_offset = 3;
+}
+
+message PbKeyValue {
+ required string key = 1;
+ required string value = 2;
+}
+
+message GetFileSystemSecurityTokenRequest {
+}
+
+message GetFileSystemSecurityTokenResponse {
+ required string schema = 1;
+ required bytes token = 2;
+ optional int64 expiration_time = 3;
+ repeated PbKeyValue addition_info = 4;
}
\ No newline at end of file
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 806c9a5..f079f09 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -504,6 +504,30 @@ impl<'a> LogRecordBatch<'a> {
};
Ok(log_record_iterator)
}
+
+ pub fn records_for_remote_log(&self, read_context: &ReadContext) ->
Result<LogRecordIterator> {
+ if self.record_count() == 0 {
+ return Ok(LogRecordIterator::empty());
+ }
+
+ let data = &self.data[RECORDS_OFFSET..];
+
+ let record_batch = read_context.record_batch_for_remote_log(data)?;
+ let log_record_iterator = match record_batch {
+ None => LogRecordIterator::empty(),
+ Some(record_batch) => {
+ let arrow_reader = ArrowReader::new(Arc::new(record_batch));
+ LogRecordIterator::Arrow(ArrowLogRecordIterator {
+ reader: arrow_reader,
+ base_offset: self.base_log_offset(),
+ timestamp: self.commit_timestamp(),
+ row_id: 0,
+ change_type: ChangeType::AppendOnly,
+ })
+ }
+ };
+ Ok(log_record_iterator)
+ }
}
/// Parse an Arrow IPC message from a byte slice.
@@ -552,7 +576,8 @@ fn parse_ipc_message(
let message = root_as_message(metadata_bytes).ok()?;
let batch_metadata = message.header_as_record_batch()?;
- let body_start = 8 + metadata_size;
+ let metadata_padded_size = (metadata_size + 7) & !7;
+ let body_start = 8 + metadata_padded_size;
let body_data = &data[body_start..];
let body_buffer = Buffer::from(body_data);
@@ -677,7 +702,7 @@ pub fn to_arrow_type(fluss_type: &DataType) ->
ArrowDataType {
#[derive(Clone)]
pub struct ReadContext {
target_schema: SchemaRef,
-
+ full_schema: SchemaRef,
projection: Option<Projection>,
}
@@ -694,7 +719,8 @@ struct Projection {
impl ReadContext {
pub fn new(arrow_schema: SchemaRef) -> ReadContext {
ReadContext {
- target_schema: arrow_schema,
+ target_schema: arrow_schema.clone(),
+ full_schema: arrow_schema,
projection: None,
}
}
@@ -730,7 +756,10 @@ impl ReadContext {
}
} else {
Projection {
- ordered_schema: Self::project_schema(arrow_schema,
projected_fields.as_slice()),
+ ordered_schema: Self::project_schema(
+ arrow_schema.clone(),
+ projected_fields.as_slice(),
+ ),
ordered_fields: projected_fields.clone(),
projected_fields,
reordering_indexes: vec![],
@@ -741,6 +770,7 @@ impl ReadContext {
ReadContext {
target_schema,
+ full_schema: arrow_schema,
projection: Some(project),
}
}
@@ -809,6 +839,35 @@ impl ReadContext {
};
Ok(Some(record_batch))
}
+
+ pub fn record_batch_for_remote_log(&self, data: &[u8]) ->
Result<Option<RecordBatch>> {
+ let (batch_metadata, body_buffer, version) = match
parse_ipc_message(data) {
+ Some(result) => result,
+ None => return Ok(None),
+ };
+
+ let record_batch = read_record_batch(
+ &body_buffer,
+ batch_metadata,
+ self.full_schema.clone(),
+ &std::collections::HashMap::new(),
+ None,
+ &version,
+ )?;
+
+ let record_batch = match &self.projection {
+ Some(projection) => {
+ let projected_columns: Vec<_> = projection
+ .projected_fields
+ .iter()
+ .map(|&idx| record_batch.column(idx).clone())
+ .collect();
+ RecordBatch::try_new(self.target_schema.clone(),
projected_columns)?
+ }
+ None => record_batch,
+ };
+ Ok(Some(record_batch))
+ }
}
pub enum LogRecordIterator {
diff --git a/crates/fluss/src/rpc/api_key.rs b/crates/fluss/src/rpc/api_key.rs
index 215bb39..b11647f 100644
--- a/crates/fluss/src/rpc/api_key.rs
+++ b/crates/fluss/src/rpc/api_key.rs
@@ -32,6 +32,7 @@ pub enum ApiKey {
ProduceLog,
FetchLog,
ListOffsets,
+ GetFileSystemSecurityToken,
GetDatabaseInfo,
GetLatestLakeSnapshot,
Unknown(i16),
@@ -53,6 +54,7 @@ impl From<i16> for ApiKey {
1014 => ApiKey::ProduceLog,
1015 => ApiKey::FetchLog,
1021 => ApiKey::ListOffsets,
+ 1025 => ApiKey::GetFileSystemSecurityToken,
1032 => ApiKey::GetLatestLakeSnapshot,
1035 => ApiKey::GetDatabaseInfo,
_ => Unknown(key),
@@ -76,6 +78,7 @@ impl From<ApiKey> for i16 {
ApiKey::ProduceLog => 1014,
ApiKey::FetchLog => 1015,
ApiKey::ListOffsets => 1021,
+ ApiKey::GetFileSystemSecurityToken => 1025,
ApiKey::GetLatestLakeSnapshot => 1032,
ApiKey::GetDatabaseInfo => 1035,
Unknown(x) => x,
diff --git a/crates/fluss/src/rpc/message/get_security_token.rs
b/crates/fluss/src/rpc/message/get_security_token.rs
new file mode 100644
index 0000000..7995232
--- /dev/null
+++ b/crates/fluss/src/rpc/message/get_security_token.rs
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::proto::{GetFileSystemSecurityTokenRequest,
GetFileSystemSecurityTokenResponse};
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+use crate::{impl_read_version_type, impl_write_version_type};
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+#[derive(Debug)]
+pub struct GetSecurityTokenRequest {
+ pub inner_request: GetFileSystemSecurityTokenRequest,
+}
+
+impl GetSecurityTokenRequest {
+ pub fn new() -> Self {
+ Self {
+ inner_request: GetFileSystemSecurityTokenRequest {},
+ }
+ }
+}
+
+impl Default for GetSecurityTokenRequest {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl RequestBody for GetSecurityTokenRequest {
+ type ResponseBody = GetFileSystemSecurityTokenResponse;
+ const API_KEY: ApiKey = ApiKey::GetFileSystemSecurityToken;
+ const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(GetSecurityTokenRequest);
+impl_read_version_type!(GetFileSystemSecurityTokenResponse);
diff --git a/crates/fluss/src/rpc/message/mod.rs
b/crates/fluss/src/rpc/message/mod.rs
index 230d971..0ed5b7c 100644
--- a/crates/fluss/src/rpc/message/mod.rs
+++ b/crates/fluss/src/rpc/message/mod.rs
@@ -28,6 +28,7 @@ mod drop_table;
mod fetch;
mod get_database_info;
mod get_latest_lake_snapshot;
+mod get_security_token;
mod get_table;
mod header;
mod list_databases;
@@ -45,6 +46,7 @@ pub use drop_table::*;
pub use fetch::*;
pub use get_database_info::*;
pub use get_latest_lake_snapshot::*;
+pub use get_security_token::*;
pub use get_table::*;
pub use header::*;
pub use list_databases::*;