luoyuxia commented on code in PR #5: URL: https://github.com/apache/fluss-rust/pull/5#discussion_r2333543417
########## crates/fluss/src/rpc/message/list_offsets.rs: ########## @@ -0,0 +1,136 @@ +use crate::{impl_read_version_type, impl_write_version_type, proto}; + +use crate::error::Result as FlussResult; +use crate::proto::ListOffsetsResponse; +use crate::rpc::api_key::ApiKey; +use crate::rpc::api_version::ApiVersion; +use crate::rpc::message::{RequestBody, ReadVersionedType, WriteVersionedType}; +use crate::rpc::frame::{ReadError, WriteError}; +use futures::future::join_all; +use std::collections::HashMap; +use tokio::sync::{oneshot}; + +use bytes::{Buf, BufMut}; +use prost::Message; + +/// Offset type constants as per proto comments +pub const LIST_EARLIEST_OFFSET: i32 = 0; +pub const LIST_LATEST_OFFSET: i32 = 1; +pub const LIST_OFFSET_FROM_TIMESTAMP: i32 = 2; + +/// Client follower server id constant +pub const CLIENT_FOLLOWER_SERVER_ID: i32 = -1; + +/// Offset specification for list offsets request +#[derive(Debug, Clone)] +pub enum OffsetSpec { + /// Earliest offset spec + Earliest, + /// Latest offset spec + Latest, + /// Timestamp offset spec + Timestamp(i64), +} + +impl OffsetSpec { + pub fn offset_type(&self) -> i32 { + match self { + OffsetSpec::Earliest => LIST_EARLIEST_OFFSET, + OffsetSpec::Latest => LIST_LATEST_OFFSET, + OffsetSpec::Timestamp(_) => LIST_OFFSET_FROM_TIMESTAMP, + } + } + + pub fn start_timestamp(&self) -> Option<i64> { + match self { + OffsetSpec::Timestamp(ts) => Some(*ts), + _ => None, + } + } +} + +/// Result container for list offsets operation +pub struct ListOffsetsResult { + futures: HashMap<i32, oneshot::Receiver<FlussResult<i64>>>, +} + +impl ListOffsetsResult { + pub fn new(futures: HashMap<i32, oneshot::Receiver<FlussResult<i64>>>) -> Self { + Self { futures } + } + + /// Get the offset result for a specific bucket + pub async fn bucket_result(&mut self, bucket: i32) -> FlussResult<i64> { + if let Some(receiver) = self.futures.remove(&bucket) { + receiver.await + .map_err(|_| crate::error::Error::WriteError("Channel closed".to_string()))? + } else { + Err(crate::error::Error::IllegalArgument( + format!("Bucket {} not found", bucket) + )) + } + } + + /// Wait for all bucket results to complete and return a map + pub async fn all(self) -> FlussResult<HashMap<i32, i64>> { + let mut results = HashMap::new(); + let mut tasks = Vec::new(); + + // Collect all futures + for (bucket_id, receiver) in self.futures { + let task = async move { + let result = receiver.await + .map_err(|_| crate::error::Error::WriteError("Channel closed".to_string()))?; + Ok::<(i32, i64), crate::error::Error>((bucket_id, result?)) + }; + tasks.push(task); + } + + // Wait for all futures to complete + let task_results = join_all(tasks).await; + + // Collect results + for task_result in task_results { + let (bucket_id, offset) = task_result?; + results.insert(bucket_id, offset); + } + + Ok(results) + } +} + +#[derive(Debug)] +pub struct ListOffsetsRequest { + pub inner_request: proto::ListOffsetsRequest, +} + +impl ListOffsetsRequest { Review Comment: Is the `ListOffsetsRequest` a must for first version of our python client? Or is it possible make it a seperate pr. Just feel it a little of complex which needs careful review. ########## .gitignore: ########## @@ -17,4 +17,15 @@ Cargo.lock # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. .idea/ -.vscode/ \ No newline at end of file +.vscode/ +.venv/ Review Comment: No need to introduce these gitignore in this pr. ########## crates/fluss/src/client/admin.rs: ########## @@ -90,4 +124,245 @@ impl FlussAdmin { modified_time, )) } + + /// List all tables in the given database + pub async fn list_tables(&self, database_name: &str) -> Result<Vec<String>> { + let response = self + .admin_gateway + .request(ListTablesRequest::new(database_name)?) + .await?; + Ok(response.table_name) + } + + /// Check if a table exists + pub async fn table_exists(&self, table_path: &TablePath) -> Result<bool> { + let response = self + .admin_gateway + .request(TableExistsRequest::new(table_path)?) + .await?; + Ok(response.exists) + } + + /// Drop a database + pub async fn drop_database( + &self, + database_name: &str, + ignore_if_not_exists: bool, + cascade: bool, + ) -> Result<()> { + let _response = self + .admin_gateway + .request(DropDatabaseRequest::new(database_name, ignore_if_not_exists, cascade)?) Review Comment: dito ########## crates/fluss/src/metadata/table.rs: ########## @@ -600,6 +612,18 @@ impl Display for KvFormat { } } +impl KvFormat { + pub fn parse(s: &str) -> Result<Self> { + match s.to_uppercase().as_str() { + "INDEXED" => Ok(KvFormat::INDEXED), + "COMPACTED" => Ok(KvFormat::COMPACTED), + _ => Err(crate::error::Error::InvalidTableError( Review Comment: nit: ```suggestion _ => Err(InvalidTableError( ``` ########## crates/fluss/src/record/mod.rs: ########## @@ -147,6 +147,10 @@ impl ScanRecords { Self { records } } + pub fn into_records(self) -> HashMap<TableBucket, Vec<ScanRecord>> { Review Comment: we don't need to change here in this pr, right? ########## crates/fluss/src/metadata/database.rs: ########## @@ -0,0 +1,240 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Result; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use crate::metadata::JsonSerde; +use serde_json::{json, Value}; +use crate::error::Error::JsonSerdeError; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct DatabaseDescriptor { + comment: Option<String>, + custom_properties: HashMap<String, String>, +} + +#[derive(Debug, Clone)] +pub struct DatabaseInfo { + database_name: String, + database_descriptor: DatabaseDescriptor, + created_time: i64, + modified_time: i64, +} + +impl DatabaseInfo { + pub fn new( + database_name: String, + database_descriptor: DatabaseDescriptor, + created_time: i64, + modified_time: i64, + ) -> Self { + Self { + database_name, + database_descriptor, + created_time, + modified_time, + } + } + + pub fn database_name(&self) -> &str { + &self.database_name + } + + pub fn database_descriptor(&self) -> &DatabaseDescriptor { + &self.database_descriptor + } + + pub fn created_time(&self) -> i64 { + self.created_time + } + + pub fn modified_time(&self) -> i64 { + self.modified_time + } +} + +#[derive(Debug, Default)] +pub struct DatabaseDescriptorBuilder { + comment: Option<String>, + custom_properties: HashMap<String, String>, +} + +impl DatabaseDescriptor { + pub fn builder() -> DatabaseDescriptorBuilder { + DatabaseDescriptorBuilder::default() + } + + pub fn comment(&self) -> Option<&str> { + self.comment.as_deref() + } + + pub fn custom_properties(&self) -> &HashMap<String, String> { + &self.custom_properties + } +} + +impl DatabaseDescriptorBuilder { + pub fn new() -> Self { + Self::default() + } + + pub fn comment(mut self, comment: &str) -> Self { + self.comment = Some(comment.to_string()); + self + } + + pub fn custom_properties(mut self, properties: HashMap<String, String>) -> Self { + self.custom_properties = properties; + self + } + + pub fn custom_property(mut self, key: &str, value: &str) -> Self { + self.custom_properties.insert(key.to_string(), value.to_string()); + self + } + + pub fn build(self) -> Result<DatabaseDescriptor> { + Ok(DatabaseDescriptor { + comment: self.comment, + custom_properties: self.custom_properties, + }) + } +} + +impl DatabaseDescriptor { + const CUSTOM_PROPERTIES_NAME: &'static str = "custom_properties"; + const COMMENT_NAME: &'static str = "comment"; + const VERSION_KEY: &'static str = "version"; + const VERSION: u32 = 1; + + fn deserialize_properties(node: &Value) -> Result<HashMap<String, String>> { Review Comment: Can we put it to `json_serde`? ########## crates/fluss/src/client/table/mod.rs: ########## @@ -29,6 +27,9 @@ mod append; mod scanner; mod writer; +pub use append::{TableAppend, AppendWriter}; Review Comment: We don't need to introduce `AppendWriter`, `LogScanner`, right? ########## crates/fluss/src/metadata/table.rs: ########## @@ -584,6 +584,18 @@ impl Display for LogFormat { } } +impl LogFormat { + pub fn parse(s: &str) -> Result<Self> { + match s.to_uppercase().as_str() { + "ARROW" => Ok(LogFormat::ARROW), + "INDEXED" => Ok(LogFormat::INDEXED), + _ => Err(crate::error::Error::InvalidTableError( Review Comment: nit: ```suggestion _ => Err(InvalidTableError( ``` ########## crates/fluss/src/client/admin.rs: ########## @@ -66,6 +85,21 @@ impl FlussAdmin { Ok(()) } + pub async fn drop_table( + &self, + table_path: &TablePath, + ignore_if_exists: bool, + ) -> Result<()> { + let _response = self + .admin_gateway + .request(DropTableRequest::new( + table_path, + ignore_if_exists, + )?) Review Comment: we don't need to use `?`. Make `DropTableRequest#new` just return `Self` ########## crates/fluss/src/client/admin.rs: ########## @@ -90,4 +124,245 @@ impl FlussAdmin { modified_time, )) } + + /// List all tables in the given database + pub async fn list_tables(&self, database_name: &str) -> Result<Vec<String>> { + let response = self + .admin_gateway + .request(ListTablesRequest::new(database_name)?) + .await?; + Ok(response.table_name) + } + + /// Check if a table exists + pub async fn table_exists(&self, table_path: &TablePath) -> Result<bool> { + let response = self + .admin_gateway + .request(TableExistsRequest::new(table_path)?) Review Comment: dito ########## crates/fluss/src/client/admin.rs: ########## @@ -90,4 +124,245 @@ impl FlussAdmin { modified_time, )) } + + /// List all tables in the given database + pub async fn list_tables(&self, database_name: &str) -> Result<Vec<String>> { + let response = self + .admin_gateway + .request(ListTablesRequest::new(database_name)?) + .await?; + Ok(response.table_name) + } + + /// Check if a table exists + pub async fn table_exists(&self, table_path: &TablePath) -> Result<bool> { + let response = self + .admin_gateway + .request(TableExistsRequest::new(table_path)?) + .await?; + Ok(response.exists) + } + + /// Drop a database + pub async fn drop_database( + &self, + database_name: &str, + ignore_if_not_exists: bool, + cascade: bool, + ) -> Result<()> { + let _response = self + .admin_gateway + .request(DropDatabaseRequest::new(database_name, ignore_if_not_exists, cascade)?) + .await?; + Ok(()) + } + + /// List all databases + pub async fn list_databases(&self) -> Result<Vec<String>> { + let response = self + .admin_gateway + .request(ListDatabasesRequest::new()?) + .await?; + Ok(response.database_name) + } + + /// Check if a database exists + pub async fn database_exists(&self, database_name: &str) -> Result<bool> { + let response = self + .admin_gateway + .request(DatabaseExistsRequest::new(database_name)?) Review Comment: dito ########## crates/fluss/src/metadata/database.rs: ########## @@ -0,0 +1,240 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Result; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use crate::metadata::JsonSerde; +use serde_json::{json, Value}; +use crate::error::Error::JsonSerdeError; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct DatabaseDescriptor { + comment: Option<String>, + custom_properties: HashMap<String, String>, +} + +#[derive(Debug, Clone)] +pub struct DatabaseInfo { + database_name: String, + database_descriptor: DatabaseDescriptor, + created_time: i64, + modified_time: i64, +} + +impl DatabaseInfo { + pub fn new( + database_name: String, + database_descriptor: DatabaseDescriptor, + created_time: i64, + modified_time: i64, + ) -> Self { + Self { + database_name, + database_descriptor, + created_time, + modified_time, + } + } + + pub fn database_name(&self) -> &str { + &self.database_name + } + + pub fn database_descriptor(&self) -> &DatabaseDescriptor { + &self.database_descriptor + } + + pub fn created_time(&self) -> i64 { + self.created_time + } + + pub fn modified_time(&self) -> i64 { + self.modified_time + } +} + +#[derive(Debug, Default)] +pub struct DatabaseDescriptorBuilder { + comment: Option<String>, + custom_properties: HashMap<String, String>, +} + +impl DatabaseDescriptor { + pub fn builder() -> DatabaseDescriptorBuilder { + DatabaseDescriptorBuilder::default() + } + + pub fn comment(&self) -> Option<&str> { + self.comment.as_deref() + } + + pub fn custom_properties(&self) -> &HashMap<String, String> { + &self.custom_properties + } +} + +impl DatabaseDescriptorBuilder { + pub fn new() -> Self { Review Comment: I think we can remove this method, since it already derives `Default`, right? ########## crates/fluss/src/client/admin.rs: ########## @@ -90,4 +124,245 @@ impl FlussAdmin { modified_time, )) } + + /// List all tables in the given database + pub async fn list_tables(&self, database_name: &str) -> Result<Vec<String>> { + let response = self + .admin_gateway + .request(ListTablesRequest::new(database_name)?) + .await?; + Ok(response.table_name) + } + + /// Check if a table exists + pub async fn table_exists(&self, table_path: &TablePath) -> Result<bool> { + let response = self + .admin_gateway + .request(TableExistsRequest::new(table_path)?) + .await?; + Ok(response.exists) + } + + /// Drop a database + pub async fn drop_database( + &self, + database_name: &str, + ignore_if_not_exists: bool, + cascade: bool, + ) -> Result<()> { + let _response = self + .admin_gateway + .request(DropDatabaseRequest::new(database_name, ignore_if_not_exists, cascade)?) + .await?; + Ok(()) + } + + /// List all databases + pub async fn list_databases(&self) -> Result<Vec<String>> { + let response = self + .admin_gateway + .request(ListDatabasesRequest::new()?) + .await?; + Ok(response.database_name) + } + + /// Check if a database exists + pub async fn database_exists(&self, database_name: &str) -> Result<bool> { + let response = self + .admin_gateway + .request(DatabaseExistsRequest::new(database_name)?) + .await?; + Ok(response.exists) + } + + /// Get database information + pub async fn get_database_info(&self, database_name: &str) -> Result<DatabaseInfo> { + let request = GetDatabaseInfoRequest::new(database_name)?; Review Comment: dito ########## crates/fluss/src/client/admin.rs: ########## @@ -90,4 +124,245 @@ impl FlussAdmin { modified_time, )) } + + /// List all tables in the given database + pub async fn list_tables(&self, database_name: &str) -> Result<Vec<String>> { + let response = self + .admin_gateway + .request(ListTablesRequest::new(database_name)?) + .await?; + Ok(response.table_name) + } + + /// Check if a table exists + pub async fn table_exists(&self, table_path: &TablePath) -> Result<bool> { + let response = self + .admin_gateway + .request(TableExistsRequest::new(table_path)?) + .await?; + Ok(response.exists) + } + + /// Drop a database + pub async fn drop_database( + &self, + database_name: &str, + ignore_if_not_exists: bool, + cascade: bool, + ) -> Result<()> { + let _response = self + .admin_gateway + .request(DropDatabaseRequest::new(database_name, ignore_if_not_exists, cascade)?) + .await?; + Ok(()) + } + + /// List all databases + pub async fn list_databases(&self) -> Result<Vec<String>> { + let response = self + .admin_gateway + .request(ListDatabasesRequest::new()?) Review Comment: dito ########## crates/fluss/src/record/arrow.rs: ########## @@ -438,7 +439,7 @@ pub fn to_arrow_type(fluss_type: &DataType) -> ArrowDataType { DataType::Time(_) => todo!(), DataType::Timestamp(_) => todo!(), DataType::TimestampLTz(_) => todo!(), - DataType::Bytes(_) => todo!(), + DataType::Bytes(_) => ArrowDataType::Binary, Review Comment: we don't need to change here in this pr, right? ########## crates/fluss/src/row/column.rs: ########## @@ -45,6 +45,14 @@ impl ColumnarRow { pub fn set_row_id(&mut self, row_id: usize) { self.row_id = row_id } + + pub fn get_row_id(&self) -> usize { Review Comment: we don't need to change here in this pr, right? ########## crates/fluss/src/row/column.rs: ########## @@ -45,6 +45,14 @@ impl ColumnarRow { pub fn set_row_id(&mut self, row_id: usize) { self.row_id = row_id } + + pub fn get_row_id(&self) -> usize { + self.row_id + } + + pub fn get_record_batch(&self) -> &Arc<RecordBatch> { Review Comment: we don't need to change here in this pr, right? ########## crates/fluss/src/row/datum.rs: ########## @@ -47,6 +47,8 @@ pub enum Datum<'a> { #[display("{0}")] Int64(i64), #[display("{0}")] + Float32(F32), Review Comment: we can remove ``` #[allow(dead_code)] ``` in https://github.com/apache/fluss-rust/pull/5/files#diff-621e8ce7912c46642b6d7212bd082f135c08353cb4869bdb2ec95dfbdc8e897aL195 right? ########## crates/fluss/src/row/datum.rs: ########## @@ -126,45 +142,53 @@ pub trait ToArrow { impl Datum<'_> { pub fn append_to(&self, builder: &mut dyn ArrayBuilder) -> Result<()> { + macro_rules! append_by_type { Review Comment: Will it be more clear to use two macro?: ``` macro_rules! append_null_to_arrow { ($builder_type:ty) => { if let Some(b) = builder.as_any_mut().downcast_mut::<$builder_type>() { b.append_null(); return Ok(()); } }; } macro_rules! append_value_arrow { ($builder_type:ty, $value:expr) => { if let Some(b) = builder.as_any_mut().downcast_mut::<$builder_type>() { b.append_value($value); return Ok(()); } }; } match self { Datum::Null => { append_null_to_arrow!(BooleanBuilder); append_null_to_arrow!(Int16Builder); append_null_to_arrow!(Int32Builder); append_null_to_arrow!(Int64Builder); append_null_to_arrow!(Float32Builder); append_null_to_arrow!(Float64Builder); append_null_to_arrow!(StringBuilder); append_null_to_arrow!(BinaryBuilder); } Datum::Bool(v) => append_value_arrow!(BooleanBuilder, *v), Datum::Int16(v) => append_value_arrow!(Int16Builder, *v), Datum::Int32(v) => append_value_arrow!(Int32Builder, *v), Datum::Int64(v) => append_value_arrow!(Int64Builder, *v), Datum::Float32(v) => append_value_arrow!(Float32Builder, v.into_inner()), Datum::Float64(v) => append_value_arrow!(Float64Builder, v.into_inner()), Datum::String(v) => append_value_arrow!(StringBuilder, *v), Datum::Blob(v) => append_value_arrow!(BinaryBuilder, v.as_ref()), Datum::Decimal(_) | Datum::Date(_) | Datum::Timestamp(_) | Datum::TimestampTz(_) => { return Err(RowConvertError(format!( "Type {:?} is not yet supported for Arrow conversion", std::mem::discriminant(self) ))); } } ``` ########## crates/fluss/src/client/admin.rs: ########## @@ -90,4 +124,245 @@ impl FlussAdmin { modified_time, )) } + + /// List all tables in the given database + pub async fn list_tables(&self, database_name: &str) -> Result<Vec<String>> { + let response = self + .admin_gateway + .request(ListTablesRequest::new(database_name)?) Review Comment: dito -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@fluss.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org