This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new b1b56a6 feat: introduce lookup support for primary key tables (#159)
b1b56a6 is described below
commit b1b56a6e1778a92f0845eb1f31013fb39861cb3f
Author: Andrea Bozzo <[email protected]>
AuthorDate: Sat Jan 17 02:45:00 2026 +0100
feat: introduce lookup support for primary key tables (#159)
---
crates/fluss/src/client/table/lookup.rs | 252 +++++++++++++++++++++
crates/fluss/src/client/table/mod.rs | 38 +++-
crates/fluss/src/client/write/write_format.rs | 4 +-
crates/fluss/src/metadata/table.rs | 1 +
crates/fluss/src/proto/fluss_api.proto | 28 +++
crates/fluss/src/record/kv/kv_record.rs | 12 +-
crates/fluss/src/record/kv/kv_record_batch.rs | 15 +-
.../fluss/src/record/kv/kv_record_batch_builder.rs | 30 ++-
crates/fluss/src/row/compacted/compacted_row.rs | 58 +++--
.../src/row/compacted/compacted_row_reader.rs | 18 +-
.../fluss/src/row/encode/compacted_row_encoder.rs | 19 +-
crates/fluss/src/row/encode/mod.rs | 8 +-
crates/fluss/src/row/mod.rs | 4 +-
crates/fluss/src/rpc/api_key.rs | 4 +
crates/fluss/src/rpc/message/lookup.rs | 67 ++++++
crates/fluss/src/rpc/message/mod.rs | 2 +
crates/fluss/src/util/varint.rs | 14 +-
17 files changed, 478 insertions(+), 96 deletions(-)
diff --git a/crates/fluss/src/client/table/lookup.rs
b/crates/fluss/src/client/table/lookup.rs
new file mode 100644
index 0000000..1d32ebd
--- /dev/null
+++ b/crates/fluss/src/client/table/lookup.rs
@@ -0,0 +1,252 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::bucketing::BucketingFunction;
+use crate::client::connection::FlussConnection;
+use crate::client::metadata::Metadata;
+use crate::error::{Error, Result};
+use crate::metadata::{RowType, TableBucket, TableInfo};
+use crate::row::InternalRow;
+use crate::row::compacted::CompactedRow;
+use crate::row::encode::KeyEncoder;
+use crate::rpc::ApiError;
+use crate::rpc::message::LookupRequest;
+use std::sync::Arc;
+
+/// The result of a lookup operation.
+///
+/// Contains the rows returned from a lookup. For primary key lookups,
+/// this will contain at most one row. For prefix key lookups (future),
+/// this may contain multiple rows.
+pub struct LookupResult<'a> {
+ rows: Vec<Vec<u8>>,
+ row_type: &'a RowType,
+}
+
+impl<'a> LookupResult<'a> {
+ /// Creates a new LookupResult from a list of row bytes.
+ fn new(rows: Vec<Vec<u8>>, row_type: &'a RowType) -> Self {
+ Self { rows, row_type }
+ }
+
+ /// Creates an empty LookupResult.
+ fn empty(row_type: &'a RowType) -> Self {
+ Self {
+ rows: Vec::new(),
+ row_type,
+ }
+ }
+
+ /// Returns the only row in the result set as a [`CompactedRow`].
+ ///
+ /// This method provides a zero-copy view of the row data, which means the
returned
+ /// `CompactedRow` borrows from this result set and cannot outlive it.
+ ///
+ /// # Returns
+ /// - `Ok(Some(row))`: If exactly one row exists.
+ /// - `Ok(None)`: If the result set is empty.
+ /// - `Err(Error::UnexpectedError)`: If the result set contains more than
one row.
+ ///
+ pub fn get_single_row(&self) -> Result<Option<CompactedRow<'_>>> {
+ match self.rows.len() {
+ 0 => Ok(None),
+ 1 => Ok(Some(CompactedRow::from_bytes(self.row_type,
&self.rows[0]))),
+ _ => Err(Error::UnexpectedError {
+ message: "LookupResult contains multiple rows, use get_rows()
instead".to_string(),
+ source: None,
+ }),
+ }
+ }
+
+ /// Returns all rows as CompactedRows.
+ pub fn get_rows(&self) -> Vec<CompactedRow<'_>> {
+ self.rows
+ .iter()
+ .map(|bytes| CompactedRow::from_bytes(self.row_type, bytes))
+ .collect()
+ }
+}
+
+/// Configuration and factory struct for creating lookup operations.
+///
+/// `TableLookup` follows the same pattern as `TableScan` and `TableAppend`,
+/// providing a builder-style API for configuring lookup operations before
+/// creating the actual `Lookuper`.
+///
+/// # Example
+/// ```ignore
+/// let table = conn.get_table(&table_path).await?;
+/// let lookuper = table.new_lookup()?.create_lookuper()?;
+/// let result = lookuper.lookup(&row).await?;
+/// if let Some(value) = result.get_single_row() {
+/// println!("Found: {:?}", value);
+/// }
+/// ```
+// TODO: Add lookup_by(column_names) for prefix key lookups (PrefixKeyLookuper)
+// TODO: Add create_typed_lookuper<T>() for typed lookups with POJO mapping
+pub struct TableLookup<'a> {
+ conn: &'a FlussConnection,
+ table_info: TableInfo,
+ metadata: Arc<Metadata>,
+}
+
+impl<'a> TableLookup<'a> {
+ pub(super) fn new(
+ conn: &'a FlussConnection,
+ table_info: TableInfo,
+ metadata: Arc<Metadata>,
+ ) -> Self {
+ Self {
+ conn,
+ table_info,
+ metadata,
+ }
+ }
+
+ /// Creates a `Lookuper` for performing key-based lookups.
+ ///
+ /// The lookuper will automatically encode the key and compute the bucket
+ /// for each lookup using the appropriate bucketing function.
+ pub fn create_lookuper(self) -> Result<Lookuper<'a>> {
+ let num_buckets = self.table_info.get_num_buckets();
+
+ // Get data lake format from table config for bucketing function
+ let data_lake_format =
self.table_info.get_table_config().get_datalake_format()?;
+ let bucketing_function = <dyn
BucketingFunction>::of(data_lake_format.as_ref());
+
+ // Create key encoder for the primary key fields
+ let pk_fields = self.table_info.get_physical_primary_keys().to_vec();
+ let key_encoder =
+ <dyn KeyEncoder>::of(self.table_info.row_type(), pk_fields,
data_lake_format)?;
+
+ Ok(Lookuper {
+ conn: self.conn,
+ table_info: self.table_info,
+ metadata: self.metadata,
+ bucketing_function,
+ key_encoder,
+ num_buckets,
+ })
+ }
+}
+
+/// Performs key-based lookups against a primary key table.
+///
+/// The `Lookuper` automatically encodes the lookup key, computes the target
+/// bucket, finds the appropriate tablet server, and retrieves the value.
+///
+/// # Example
+/// ```ignore
+/// let lookuper = table.new_lookup()?.create_lookuper()?;
+/// let row = GenericRow::new(vec![Datum::Int32(42)]); // lookup key
+/// let result = lookuper.lookup(&row).await?;
+/// ```
+// TODO: Support partitioned tables (extract partition from key)
+pub struct Lookuper<'a> {
+ conn: &'a FlussConnection,
+ table_info: TableInfo,
+ metadata: Arc<Metadata>,
+ bucketing_function: Box<dyn BucketingFunction>,
+ key_encoder: Box<dyn KeyEncoder>,
+ num_buckets: i32,
+}
+
+impl<'a> Lookuper<'a> {
+ /// Looks up a value by its primary key.
+ ///
+ /// The key is encoded and the bucket is automatically computed using
+ /// the table's bucketing function.
+ ///
+ /// # Arguments
+ /// * `row` - The row containing the primary key field values
+ ///
+ /// # Returns
+ /// * `Ok(LookupResult)` - The lookup result (may be empty if key not
found)
+ /// * `Err(Error)` - If the lookup fails
+ pub async fn lookup(&mut self, row: &dyn InternalRow) ->
Result<LookupResult<'_>> {
+ // todo: support batch lookup
+ // Encode the key from the row
+ let encoded_key = self.key_encoder.encode_key(row)?;
+ let key_bytes = encoded_key.to_vec();
+
+ // Compute bucket from encoded key
+ let bucket_id = self
+ .bucketing_function
+ .bucketing(&key_bytes, self.num_buckets)?;
+
+ let table_id = self.table_info.get_table_id();
+ let table_bucket = TableBucket::new(table_id, bucket_id);
+
+ // Find the leader for this bucket
+ let cluster = self.metadata.get_cluster();
+ let leader =
+ cluster
+ .leader_for(&table_bucket)
+ .ok_or_else(|| Error::LeaderNotAvailable {
+ message: format!("No leader found for table bucket:
{table_bucket}"),
+ })?;
+
+ // Get connection to the tablet server
+ let tablet_server =
+ cluster
+ .get_tablet_server(leader.id())
+ .ok_or_else(|| Error::LeaderNotAvailable {
+ message: format!(
+ "Tablet server {} is not found in metadata cache",
+ leader.id()
+ ),
+ })?;
+
+ let connections = self.conn.get_connections();
+ let connection = connections.get_connection(tablet_server).await?;
+
+ // Send lookup request
+ let request = LookupRequest::new(table_id, None, bucket_id,
vec![key_bytes]);
+ let response = connection.request(request).await?;
+
+ // Extract the values from response
+ if let Some(bucket_resp) = response.buckets_resp.into_iter().next() {
+ // Check for errors
+ if let Some(error_code) = bucket_resp.error_code {
+ if error_code != 0 {
+ return Err(Error::FlussAPIError {
+ api_error: ApiError {
+ code: error_code,
+ message:
bucket_resp.error_message.unwrap_or_default(),
+ },
+ });
+ }
+ }
+
+ // Collect all values
+ let rows: Vec<Vec<u8>> = bucket_resp
+ .values
+ .into_iter()
+ .filter_map(|pb_value| pb_value.values)
+ .collect();
+
+ return Ok(LookupResult::new(rows, self.table_info.row_type()));
+ }
+
+ Ok(LookupResult::empty(self.table_info.row_type()))
+ }
+
+ /// Returns a reference to the table info.
+ pub fn table_info(&self) -> &TableInfo {
+ &self.table_info
+ }
+}
diff --git a/crates/fluss/src/client/table/mod.rs
b/crates/fluss/src/client/table/mod.rs
index 26341d7..7356be2 100644
--- a/crates/fluss/src/client/table/mod.rs
+++ b/crates/fluss/src/client/table/mod.rs
@@ -17,14 +17,14 @@
use crate::client::connection::FlussConnection;
use crate::client::metadata::Metadata;
+use crate::error::{Error, Result};
use crate::metadata::{TableInfo, TablePath};
use std::sync::Arc;
-use crate::error::Result;
-
pub const EARLIEST_OFFSET: i64 = -2;
mod append;
+mod lookup;
mod log_fetch_buffer;
mod remote_log;
@@ -32,6 +32,7 @@ mod scanner;
mod writer;
pub use append::{AppendWriter, TableAppend};
+pub use lookup::{LookupResult, Lookuper, TableLookup};
pub use scanner::{LogScanner, RecordBatchLogScanner, TableScan};
#[allow(dead_code)]
@@ -85,6 +86,39 @@ impl<'a> FlussTable<'a> {
pub fn has_primary_key(&self) -> bool {
self.has_primary_key
}
+
+ /// Creates a new `TableLookup` for configuring lookup operations.
+ ///
+ /// This follows the same pattern as `new_scan()` and `new_append()`,
+ /// returning a configuration object that can be used to create a
`Lookuper`.
+ ///
+ /// The table must have a primary key (be a primary key table).
+ ///
+ /// # Returns
+ /// * `Ok(TableLookup)` - A lookup configuration object
+ /// * `Err(Error)` - If the table doesn't have a primary key
+ ///
+ /// # Example
+ /// ```ignore
+ /// let table = conn.get_table(&table_path).await?;
+ /// let lookuper = table.new_lookup()?.create_lookuper()?;
+ /// let key = vec![1, 2, 3]; // encoded primary key bytes
+ /// if let Some(value) = lookuper.lookup(key).await? {
+ /// println!("Found value: {:?}", value);
+ /// }
+ /// ```
+ pub fn new_lookup(&self) -> Result<TableLookup<'_>> {
+ if !self.has_primary_key {
+ return Err(Error::UnsupportedOperation {
+ message: "Lookup is only supported for primary key
tables".to_string(),
+ });
+ }
+ Ok(TableLookup::new(
+ self.conn,
+ self.table_info.clone(),
+ self.metadata.clone(),
+ ))
+ }
}
impl<'a> Drop for FlussTable<'a> {
diff --git a/crates/fluss/src/client/write/write_format.rs
b/crates/fluss/src/client/write/write_format.rs
index d65e42d..4a0c0d8 100644
--- a/crates/fluss/src/client/write/write_format.rs
+++ b/crates/fluss/src/client/write/write_format.rs
@@ -39,7 +39,7 @@ impl WriteFormat {
match self {
WriteFormat::CompactedKv => Ok(KvFormat::COMPACTED),
other => Err(IllegalArgument {
- message: format!("WriteFormat `{}` is not a KvFormat", other),
+ message: format!("WriteFormat `{other}` is not a KvFormat"),
}),
}
}
@@ -48,7 +48,7 @@ impl WriteFormat {
match kv_format {
KvFormat::COMPACTED => Ok(WriteFormat::CompactedKv),
other => Err(IllegalArgument {
- message: format!("Unknown KvFormat: `{}`", other),
+ message: format!("Unknown KvFormat: `{other}`"),
}),
}
}
diff --git a/crates/fluss/src/metadata/table.rs
b/crates/fluss/src/metadata/table.rs
index b1e8a90..da85b0c 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -729,6 +729,7 @@ impl TableConfig {
ArrowCompressionInfo::from_conf(&self.properties)
}
+ /// Returns the data lake format if configured, or None if not set.
pub fn get_datalake_format(&self) -> Result<Option<DataLakeFormat>> {
self.properties
.get("table.datalake.format")
diff --git a/crates/fluss/src/proto/fluss_api.proto
b/crates/fluss/src/proto/fluss_api.proto
index dbbb45d..b4ae840 100644
--- a/crates/fluss/src/proto/fluss_api.proto
+++ b/crates/fluss/src/proto/fluss_api.proto
@@ -317,4 +317,32 @@ message GetFileSystemSecurityTokenResponse {
required bytes token = 2;
optional int64 expiration_time = 3;
repeated PbKeyValue addition_info = 4;
+}
+
+// lookup request and response
+message LookupRequest {
+ required int64 table_id = 1;
+ repeated PbLookupReqForBucket buckets_req = 2;
+}
+
+message LookupResponse {
+ repeated PbLookupRespForBucket buckets_resp = 1;
+}
+
+message PbLookupReqForBucket {
+ optional int64 partition_id = 1;
+ required int32 bucket_id = 2;
+ repeated bytes key = 3;
+}
+
+message PbLookupRespForBucket {
+ optional int64 partition_id = 1;
+ required int32 bucket_id = 2;
+ optional int32 error_code = 3;
+ optional string error_message = 4;
+ repeated PbValue values = 5;
+}
+
+message PbValue {
+ optional bytes values = 1;
}
\ No newline at end of file
diff --git a/crates/fluss/src/record/kv/kv_record.rs
b/crates/fluss/src/record/kv/kv_record.rs
index 8c30713..ab8c2ac 100644
--- a/crates/fluss/src/record/kv/kv_record.rs
+++ b/crates/fluss/src/record/kv/kv_record.rs
@@ -101,7 +101,7 @@ impl KvRecord {
let size_i32 = i32::try_from(size_in_bytes).map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidInput,
- format!("Record size {} exceeds i32::MAX", size_in_bytes),
+ format!("Record size {size_in_bytes} exceeds i32::MAX"),
)
})?;
buf.put_i32_le(size_i32);
@@ -141,7 +141,7 @@ impl KvRecord {
if size_in_bytes_i32 < 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
- format!("Invalid record length: {}", size_in_bytes_i32),
+ format!("Invalid record length: {size_in_bytes_i32}"),
));
}
@@ -150,10 +150,7 @@ impl KvRecord {
let total_size =
size_in_bytes.checked_add(LENGTH_LENGTH).ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidData,
- format!(
- "Record size overflow: {} + {}",
- size_in_bytes, LENGTH_LENGTH
- ),
+ format!("Record size overflow: {size_in_bytes} +
{LENGTH_LENGTH}"),
)
})?;
@@ -162,8 +159,7 @@ impl KvRecord {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
format!(
- "Not enough bytes to read record: expected {}, available
{}",
- total_size, available
+ "Not enough bytes to read record: expected {total_size},
available {available}"
),
));
}
diff --git a/crates/fluss/src/record/kv/kv_record_batch.rs
b/crates/fluss/src/record/kv/kv_record_batch.rs
index 6ead642..eb3c09a 100644
--- a/crates/fluss/src/record/kv/kv_record_batch.rs
+++ b/crates/fluss/src/record/kv/kv_record_batch.rs
@@ -96,7 +96,7 @@ impl KvRecordBatch {
if length_i32 < 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
- format!("Invalid batch length: {}", length_i32),
+ format!("Invalid batch length: {length_i32}"),
));
}
@@ -150,10 +150,7 @@ impl KvRecordBatch {
if size < RECORD_BATCH_HEADER_SIZE {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
- format!(
- "Batch size {} is less than header size {}",
- size, RECORD_BATCH_HEADER_SIZE
- ),
+ format!("Batch size {size} is less than header size
{RECORD_BATCH_HEADER_SIZE}"),
));
}
@@ -276,7 +273,7 @@ impl KvRecordBatch {
if count < 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
- format!("Invalid record count: {}", count),
+ format!("Invalid record count: {count}"),
));
}
Ok(KvRecordIterator {
@@ -321,7 +318,7 @@ impl Iterator for KvRecordIterator {
#[cfg(test)]
mod tests {
use super::*;
- use crate::metadata::{DataTypes, KvFormat};
+ use crate::metadata::{DataTypes, KvFormat, RowType};
use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, KvRecordBatchBuilder};
use crate::row::binary::BinaryWriter;
use crate::row::compacted::CompactedRow;
@@ -366,8 +363,8 @@ mod tests {
let mut value1_writer = CompactedRowWriter::new(1);
value1_writer.write_bytes(&[1, 2, 3, 4, 5]);
- let data_types = &[DataTypes::bytes()];
- let row = &CompactedRow::from_bytes(data_types,
value1_writer.buffer());
+ let row_type = RowType::with_data_types([DataTypes::bytes()].to_vec());
+ let row = &CompactedRow::from_bytes(&row_type, value1_writer.buffer());
builder.append_row(key1, Some(row)).unwrap();
let key2 = b"key2";
diff --git a/crates/fluss/src/record/kv/kv_record_batch_builder.rs
b/crates/fluss/src/record/kv/kv_record_batch_builder.rs
index 7d1a797..c36a861 100644
--- a/crates/fluss/src/record/kv/kv_record_batch_builder.rs
+++ b/crates/fluss/src/record/kv/kv_record_batch_builder.rs
@@ -248,7 +248,7 @@ impl KvRecordBatchBuilder {
let total_size = i32::try_from(size_without_length).map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidInput,
- format!("Batch size {} exceeds i32::MAX", size_without_length),
+ format!("Batch size {size_without_length} exceeds i32::MAX"),
)
})?;
@@ -317,14 +317,16 @@ impl Drop for KvRecordBatchBuilder {
#[cfg(test)]
mod tests {
use super::*;
- use crate::metadata::{DataType, DataTypes};
+ use crate::metadata::{DataTypes, RowType};
use crate::row::binary::BinaryWriter;
use crate::row::compacted::{CompactedRow, CompactedRowWriter};
+ use std::sync::LazyLock;
+ static TEST_ROW_TYPE: LazyLock<RowType> =
+ LazyLock::new(|| RowType::with_data_types(vec![DataTypes::bytes()]));
// Helper function to create a CompactedRowWriter with a single bytes
field for testing
fn create_test_row(data: &[u8]) -> CompactedRow<'_> {
- const DATA_TYPE: &[DataType] = &[DataTypes::bytes()];
- CompactedRow::from_bytes(DATA_TYPE, data)
+ CompactedRow::from_bytes(&TEST_ROW_TYPE, data)
}
#[test]
@@ -483,7 +485,6 @@ mod tests {
#[test]
fn test_builder_with_compacted_row_writer() {
- use crate::metadata::{DataType, IntType, StringType};
use crate::record::kv::KvRecordBatch;
use crate::row::InternalRow;
use crate::row::compacted::CompactedRow;
@@ -491,18 +492,13 @@ mod tests {
let mut builder = KvRecordBatchBuilder::new(1, 100000,
KvFormat::COMPACTED);
builder.set_writer_state(100, 5);
- let types = vec![
- DataType::Int(IntType::new()),
- DataType::String(StringType::new()),
- ];
-
// Create and append first record with CompactedRowWriter
let mut row_writer1 = CompactedRowWriter::new(2);
row_writer1.write_int(42);
row_writer1.write_string("hello");
- let data_types = &[DataTypes::int(), DataTypes::string()];
- let row1 = &CompactedRow::from_bytes(data_types, row_writer1.buffer());
+ let row_type = RowType::with_data_types([DataTypes::int(),
DataTypes::string()].to_vec());
+ let row1 = &CompactedRow::from_bytes(&row_type, row_writer1.buffer());
let key1 = b"key1";
assert!(builder.has_room_for_row(key1, Some(row1)));
@@ -513,7 +509,7 @@ mod tests {
row_writer2.write_int(100);
row_writer2.write_string("world");
- let row2 = &CompactedRow::from_bytes(data_types, row_writer2.buffer());
+ let row2 = &CompactedRow::from_bytes(&row_type, row_writer2.buffer());
let key2 = b"key2";
builder.append_row(key2, Some(row2)).unwrap();
@@ -539,14 +535,14 @@ mod tests {
// Verify first record
let record1 = records[0].as_ref().unwrap();
assert_eq!(record1.key().as_ref(), key1);
- let row1 = CompactedRow::from_bytes(&types, record1.value().unwrap());
+ let row1 = CompactedRow::from_bytes(&row_type,
record1.value().unwrap());
assert_eq!(row1.get_int(0), 42);
assert_eq!(row1.get_string(1), "hello");
// Verify second record
let record2 = records[1].as_ref().unwrap();
assert_eq!(record2.key().as_ref(), key2);
- let row2 = CompactedRow::from_bytes(&types, record2.value().unwrap());
+ let row2 = CompactedRow::from_bytes(&row_type,
record2.value().unwrap());
assert_eq!(row2.get_int(0), 100);
assert_eq!(row2.get_string(1), "world");
@@ -561,8 +557,8 @@ mod tests {
let mut row_writer = CompactedRowWriter::new(1);
row_writer.write_int(42);
- let data_types = &[DataTypes::int()];
- let row = &CompactedRow::from_bytes(data_types, row_writer.buffer());
+ let row_type = RowType::with_data_types([DataTypes::int()].to_vec());
+ let row = &CompactedRow::from_bytes(&row_type, row_writer.buffer());
// INDEXED format should reject append_row
let mut indexed_builder = KvRecordBatchBuilder::new(1, 4096,
KvFormat::INDEXED);
diff --git a/crates/fluss/src/row/compacted/compacted_row.rs
b/crates/fluss/src/row/compacted/compacted_row.rs
index 9ff3b5f..144f898 100644
--- a/crates/fluss/src/row/compacted/compacted_row.rs
+++ b/crates/fluss/src/row/compacted/compacted_row.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use crate::metadata::DataType;
+use crate::metadata::RowType;
use crate::row::compacted::compacted_row_reader::{CompactedRowDeserializer,
CompactedRowReader};
use crate::row::{BinaryRow, GenericRow, InternalRow};
use std::sync::{Arc, OnceLock};
@@ -38,10 +38,10 @@ pub fn calculate_bit_set_width_in_bytes(arity: usize) ->
usize {
#[allow(dead_code)]
impl<'a> CompactedRow<'a> {
- pub fn from_bytes(data_types: &'a [DataType], data: &'a [u8]) -> Self {
+ pub fn from_bytes(row_type: &'a RowType, data: &'a [u8]) -> Self {
Self::deserialize(
- Arc::new(CompactedRowDeserializer::new(data_types)),
- data_types.len(),
+ Arc::new(CompactedRowDeserializer::new(row_type)),
+ row_type.fields().len(),
data,
)
}
@@ -84,7 +84,10 @@ impl<'a> InternalRow for CompactedRow<'a> {
}
fn is_null_at(&self, pos: usize) -> bool {
- self.deserializer.get_data_types()[pos].is_nullable() &&
self.reader.is_null_at(pos)
+ self.deserializer.get_row_type().fields().as_slice()[pos]
+ .data_type
+ .is_nullable()
+ && self.reader.is_null_at(pos)
}
fn get_boolean(&self, pos: usize) -> bool {
@@ -138,7 +141,7 @@ mod tests {
use crate::row::binary::BinaryWriter;
use crate::metadata::{
- BigIntType, BooleanType, BytesType, DoubleType, FloatType, IntType,
SmallIntType,
+ BigIntType, BooleanType, BytesType, DataType, DoubleType, FloatType,
IntType, SmallIntType,
StringType, TinyIntType,
};
use crate::row::compacted::compacted_row_writer::CompactedRowWriter;
@@ -146,7 +149,7 @@ mod tests {
#[test]
fn test_compacted_row() {
// Test all primitive types
- let types = vec![
+ let row_type = RowType::with_data_types(vec![
DataType::Boolean(BooleanType::new()),
DataType::TinyInt(TinyIntType::new()),
DataType::SmallInt(SmallIntType::new()),
@@ -156,9 +159,9 @@ mod tests {
DataType::Double(DoubleType::new()),
DataType::String(StringType::new()),
DataType::Bytes(BytesType::new()),
- ];
+ ]);
- let mut writer = CompactedRowWriter::new(types.len());
+ let mut writer = CompactedRowWriter::new(row_type.fields().len());
writer.write_boolean(true);
writer.write_byte(1);
@@ -171,7 +174,7 @@ mod tests {
writer.write_bytes(&[1, 2, 3, 4, 5]);
let bytes = writer.to_bytes();
- let mut row = CompactedRow::from_bytes(types.as_slice(),
bytes.as_ref());
+ let mut row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
assert_eq!(row.get_field_count(), 9);
assert!(row.get_boolean(0));
@@ -185,20 +188,23 @@ mod tests {
assert_eq!(row.get_bytes(8), &[1, 2, 3, 4, 5]);
// Test with nulls
- let types = vec![
- DataType::Int(IntType::new()),
- DataType::String(StringType::new()),
- DataType::Double(DoubleType::new()),
- ];
+ let row_type = RowType::with_data_types(
+ [
+ DataType::Int(IntType::new()),
+ DataType::String(StringType::new()),
+ DataType::Double(DoubleType::new()),
+ ]
+ .to_vec(),
+ );
- let mut writer = CompactedRowWriter::new(types.len());
+ let mut writer = CompactedRowWriter::new(row_type.fields().len());
writer.write_int(100);
writer.set_null_at(1);
writer.write_double(2.71);
let bytes = writer.to_bytes();
- row = CompactedRow::from_bytes(types.as_slice(), bytes.as_ref());
+ row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
assert!(!row.is_null_at(0));
assert!(row.is_null_at(1));
@@ -211,26 +217,28 @@ mod tests {
assert_eq!(row.get_int(0), 100);
// Test from_bytes
- let types = vec![
+ let row_type = RowType::with_data_types(vec![
DataType::Int(IntType::new()),
DataType::String(StringType::new()),
- ];
+ ]);
- let mut writer = CompactedRowWriter::new(types.len());
+ let mut writer = CompactedRowWriter::new(row_type.fields().len());
writer.write_int(-1);
writer.write_string("test");
let bytes = writer.to_bytes();
- let mut row = CompactedRow::from_bytes(types.as_slice(),
bytes.as_ref());
+ let mut row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
assert_eq!(row.get_int(0), -1);
assert_eq!(row.get_string(1), "test");
// Test large row
let num_fields = 100;
- let types: Vec<DataType> = (0..num_fields)
- .map(|_| DataType::Int(IntType::new()))
- .collect();
+ let row_type = RowType::with_data_types(
+ (0..num_fields)
+ .map(|_| DataType::Int(IntType::new()))
+ .collect(),
+ );
let mut writer = CompactedRowWriter::new(num_fields);
@@ -239,7 +247,7 @@ mod tests {
}
let bytes = writer.to_bytes();
- row = CompactedRow::from_bytes(types.as_slice(), bytes.as_ref());
+ row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
for i in 0..num_fields {
assert_eq!(row.get_int(i), (i * 10) as i32);
diff --git a/crates/fluss/src/row/compacted/compacted_row_reader.rs
b/crates/fluss/src/row/compacted/compacted_row_reader.rs
index 9ce5095..408706c 100644
--- a/crates/fluss/src/row/compacted/compacted_row_reader.rs
+++ b/crates/fluss/src/row/compacted/compacted_row_reader.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use crate::metadata::RowType;
use crate::row::compacted::compacted_row::calculate_bit_set_width_in_bytes;
use crate::{
metadata::DataType,
@@ -27,31 +28,32 @@ use std::str::from_utf8;
#[allow(dead_code)]
#[derive(Clone)]
pub struct CompactedRowDeserializer<'a> {
- schema: Cow<'a, [DataType]>,
+ row_type: Cow<'a, RowType>,
}
#[allow(dead_code)]
impl<'a> CompactedRowDeserializer<'a> {
- pub fn new(schema: &'a [DataType]) -> Self {
+ pub fn new(row_type: &'a RowType) -> Self {
Self {
- schema: Cow::Borrowed(schema),
+ row_type: Cow::Borrowed(row_type),
}
}
- pub fn new_from_owned(schema: Vec<DataType>) -> Self {
+ pub fn new_from_owned(row_type: RowType) -> Self {
Self {
- schema: Cow::Owned(schema),
+ row_type: Cow::Owned(row_type),
}
}
- pub fn get_data_types(&self) -> &[DataType] {
- self.schema.as_ref()
+ pub fn get_row_type(&self) -> &RowType {
+ self.row_type.as_ref()
}
pub fn deserialize(&self, reader: &CompactedRowReader<'a>) ->
GenericRow<'a> {
let mut row = GenericRow::new();
let mut cursor = reader.initial_position();
- for (col_pos, dtype) in self.schema.iter().enumerate() {
+ for (col_pos, data_field) in self.row_type.fields().iter().enumerate()
{
+ let dtype = &data_field.data_type;
if dtype.is_nullable() && reader.is_null_at(col_pos) {
row.set_field(col_pos, Datum::Null);
continue;
diff --git a/crates/fluss/src/row/encode/compacted_row_encoder.rs
b/crates/fluss/src/row/encode/compacted_row_encoder.rs
index fc39bb7..48b9f3f 100644
--- a/crates/fluss/src/row/encode/compacted_row_encoder.rs
+++ b/crates/fluss/src/row/encode/compacted_row_encoder.rs
@@ -17,7 +17,7 @@
use crate::error::Error::IllegalArgument;
use crate::error::Result;
-use crate::metadata::DataType;
+use crate::metadata::RowType;
use crate::row::Datum;
use crate::row::binary::{BinaryRowFormat, BinaryWriter, ValueWriter};
use crate::row::compacted::{CompactedRow, CompactedRowDeserializer,
CompactedRowWriter};
@@ -33,18 +33,18 @@ pub struct CompactedRowEncoder<'a> {
}
impl<'a> CompactedRowEncoder<'a> {
- pub fn new(field_data_types: Vec<DataType>) -> Result<Self> {
- let field_writers = field_data_types
- .iter()
+ pub fn new(row_type: RowType) -> Result<Self> {
+ let field_writers = row_type
+ .field_types()
.map(|d| ValueWriter::create_value_writer(d,
Some(&BinaryRowFormat::Compacted)))
.collect::<Result<Vec<_>>>()?;
Ok(Self {
- arity: field_data_types.len(),
- writer: CompactedRowWriter::new(field_data_types.len()),
+ arity: field_writers.len(),
+ writer: CompactedRowWriter::new(field_writers.len()),
field_writers,
compacted_row_deserializer:
Arc::new(CompactedRowDeserializer::new_from_owned(
- field_data_types,
+ row_type,
)),
})
}
@@ -60,10 +60,7 @@ impl RowEncoder for CompactedRowEncoder<'_> {
self.field_writers
.get(pos)
.ok_or_else(|| IllegalArgument {
- message: format!(
- "invalid position {} when attempting to encode value {}",
- pos, value
- ),
+ message: format!("invalid position {pos} when attempting to
encode value {value}"),
})?
.write_value(&mut self.writer, pos, &value)
}
diff --git a/crates/fluss/src/row/encode/mod.rs
b/crates/fluss/src/row/encode/mod.rs
index 34863ab..c294ecf 100644
--- a/crates/fluss/src/row/encode/mod.rs
+++ b/crates/fluss/src/row/encode/mod.rs
@@ -19,7 +19,7 @@ mod compacted_key_encoder;
mod compacted_row_encoder;
use crate::error::Result;
-use crate::metadata::{DataLakeFormat, DataType, KvFormat, RowType};
+use crate::metadata::{DataLakeFormat, KvFormat, RowType};
use crate::row::encode::compacted_key_encoder::CompactedKeyEncoder;
use crate::row::encode::compacted_row_encoder::CompactedRowEncoder;
use crate::row::{BinaryRow, Datum, InternalRow};
@@ -111,18 +111,18 @@ pub struct RowEncoderFactory {}
#[allow(dead_code)]
impl RowEncoderFactory {
pub fn create(kv_format: KvFormat, row_type: &RowType) -> Result<impl
RowEncoder> {
- Self::create_for_field_types(kv_format,
row_type.field_types().cloned().collect())
+ Self::create_for_field_types(kv_format, row_type.clone())
}
pub fn create_for_field_types(
kv_format: KvFormat,
- field_data_types: Vec<DataType>,
+ row_type: RowType,
) -> Result<impl RowEncoder> {
match kv_format {
KvFormat::INDEXED => {
todo!()
}
- KvFormat::COMPACTED => CompactedRowEncoder::new(field_data_types),
+ KvFormat::COMPACTED => CompactedRowEncoder::new(row_type),
}
}
}
diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs
index 4996063..3477f1d 100644
--- a/crates/fluss/src/row/mod.rs
+++ b/crates/fluss/src/row/mod.rs
@@ -21,11 +21,13 @@ mod datum;
pub mod binary;
pub mod compacted;
-mod encode;
+pub mod encode;
mod field_getter;
pub use column::*;
+pub use compacted::CompactedRow;
pub use datum::*;
+pub use encode::KeyEncoder;
pub trait BinaryRow: InternalRow {
/// Returns the binary representation of this row as a byte slice.
diff --git a/crates/fluss/src/rpc/api_key.rs b/crates/fluss/src/rpc/api_key.rs
index c515396..9f9268e 100644
--- a/crates/fluss/src/rpc/api_key.rs
+++ b/crates/fluss/src/rpc/api_key.rs
@@ -31,6 +31,7 @@ pub enum ApiKey {
MetaData,
ProduceLog,
FetchLog,
+ Lookup,
ListOffsets,
GetFileSystemSecurityToken,
GetDatabaseInfo,
@@ -53,6 +54,7 @@ impl From<i16> for ApiKey {
1012 => ApiKey::MetaData,
1014 => ApiKey::ProduceLog,
1015 => ApiKey::FetchLog,
+ 1017 => ApiKey::Lookup,
1021 => ApiKey::ListOffsets,
1025 => ApiKey::GetFileSystemSecurityToken,
1032 => ApiKey::GetLatestLakeSnapshot,
@@ -77,6 +79,7 @@ impl From<ApiKey> for i16 {
ApiKey::MetaData => 1012,
ApiKey::ProduceLog => 1014,
ApiKey::FetchLog => 1015,
+ ApiKey::Lookup => 1017,
ApiKey::ListOffsets => 1021,
ApiKey::GetFileSystemSecurityToken => 1025,
ApiKey::GetLatestLakeSnapshot => 1032,
@@ -105,6 +108,7 @@ mod tests {
(1012, ApiKey::MetaData),
(1014, ApiKey::ProduceLog),
(1015, ApiKey::FetchLog),
+ (1017, ApiKey::Lookup),
(1021, ApiKey::ListOffsets),
(1025, ApiKey::GetFileSystemSecurityToken),
(1032, ApiKey::GetLatestLakeSnapshot),
diff --git a/crates/fluss/src/rpc/message/lookup.rs
b/crates/fluss/src/rpc/message/lookup.rs
new file mode 100644
index 0000000..3de47d6
--- /dev/null
+++ b/crates/fluss/src/rpc/message/lookup.rs
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::proto::LookupResponse;
+use crate::rpc::frame::ReadError;
+
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::frame::WriteError;
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+use crate::{impl_read_version_type, impl_write_version_type, proto};
+use prost::Message;
+
+use bytes::{Buf, BufMut};
+
+pub struct LookupRequest {
+ pub inner_request: proto::LookupRequest,
+}
+
+impl LookupRequest {
+ pub fn new(
+ table_id: i64,
+ partition_id: Option<i64>,
+ bucket_id: i32,
+ keys: Vec<Vec<u8>>,
+ ) -> Self {
+ let bucket_req = proto::PbLookupReqForBucket {
+ partition_id,
+ bucket_id,
+ key: keys,
+ };
+
+ let request = proto::LookupRequest {
+ table_id,
+ buckets_req: vec![bucket_req],
+ };
+
+ Self {
+ inner_request: request,
+ }
+ }
+}
+
+impl RequestBody for LookupRequest {
+ type ResponseBody = LookupResponse;
+
+ const API_KEY: ApiKey = ApiKey::Lookup;
+
+ const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(LookupRequest);
+impl_read_version_type!(LookupResponse);
diff --git a/crates/fluss/src/rpc/message/mod.rs
b/crates/fluss/src/rpc/message/mod.rs
index b619ee4..2fe506b 100644
--- a/crates/fluss/src/rpc/message/mod.rs
+++ b/crates/fluss/src/rpc/message/mod.rs
@@ -34,6 +34,7 @@ mod header;
mod list_databases;
mod list_offsets;
mod list_tables;
+mod lookup;
mod produce_log;
mod table_exists;
mod update_metadata;
@@ -53,6 +54,7 @@ pub use header::*;
pub use list_databases::*;
pub use list_offsets::*;
pub use list_tables::*;
+pub use lookup::*;
pub use produce_log::*;
pub use table_exists::*;
pub use update_metadata::*;
diff --git a/crates/fluss/src/util/varint.rs b/crates/fluss/src/util/varint.rs
index 96fd1f5..83a75f6 100644
--- a/crates/fluss/src/util/varint.rs
+++ b/crates/fluss/src/util/varint.rs
@@ -364,12 +364,11 @@ mod tests {
let mut reader = Cursor::new(&buffer);
let read_value = read_unsigned_varint(&mut reader).unwrap();
- assert_eq!(value, read_value, "Round trip failed for value {}",
value);
+ assert_eq!(value, read_value, "Round trip failed for value
{value}");
assert_eq!(
written,
buffer.len(),
- "Bytes written mismatch for value {}",
- value
+ "Bytes written mismatch for value {value}"
);
// Test with BufMut
@@ -382,22 +381,19 @@ mod tests {
assert_eq!(
calculated_size,
buffer.len(),
- "Size calculation failed for value {}",
- value
+ "Size calculation failed for value {value}"
);
// Test reading from bytes
let (read_value_bytes, bytes_read) =
read_unsigned_varint_bytes(&buffer).unwrap();
assert_eq!(
value, read_value_bytes,
- "Bytes read failed for value {}",
- value
+ "Bytes read failed for value {value}"
);
assert_eq!(
bytes_read,
buffer.len(),
- "Bytes read count mismatch for value {}",
- value
+ "Bytes read count mismatch for value {value}"
);
}
}