This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new b1b56a6  feat: introduce lookup support for primary key tables (#159)
b1b56a6 is described below

commit b1b56a6e1778a92f0845eb1f31013fb39861cb3f
Author: Andrea Bozzo <[email protected]>
AuthorDate: Sat Jan 17 02:45:00 2026 +0100

    feat: introduce lookup support for primary key tables (#159)
---
 crates/fluss/src/client/table/lookup.rs            | 252 +++++++++++++++++++++
 crates/fluss/src/client/table/mod.rs               |  38 +++-
 crates/fluss/src/client/write/write_format.rs      |   4 +-
 crates/fluss/src/metadata/table.rs                 |   1 +
 crates/fluss/src/proto/fluss_api.proto             |  28 +++
 crates/fluss/src/record/kv/kv_record.rs            |  12 +-
 crates/fluss/src/record/kv/kv_record_batch.rs      |  15 +-
 .../fluss/src/record/kv/kv_record_batch_builder.rs |  30 ++-
 crates/fluss/src/row/compacted/compacted_row.rs    |  58 +++--
 .../src/row/compacted/compacted_row_reader.rs      |  18 +-
 .../fluss/src/row/encode/compacted_row_encoder.rs  |  19 +-
 crates/fluss/src/row/encode/mod.rs                 |   8 +-
 crates/fluss/src/row/mod.rs                        |   4 +-
 crates/fluss/src/rpc/api_key.rs                    |   4 +
 crates/fluss/src/rpc/message/lookup.rs             |  67 ++++++
 crates/fluss/src/rpc/message/mod.rs                |   2 +
 crates/fluss/src/util/varint.rs                    |  14 +-
 17 files changed, 478 insertions(+), 96 deletions(-)

diff --git a/crates/fluss/src/client/table/lookup.rs 
b/crates/fluss/src/client/table/lookup.rs
new file mode 100644
index 0000000..1d32ebd
--- /dev/null
+++ b/crates/fluss/src/client/table/lookup.rs
@@ -0,0 +1,252 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::bucketing::BucketingFunction;
+use crate::client::connection::FlussConnection;
+use crate::client::metadata::Metadata;
+use crate::error::{Error, Result};
+use crate::metadata::{RowType, TableBucket, TableInfo};
+use crate::row::InternalRow;
+use crate::row::compacted::CompactedRow;
+use crate::row::encode::KeyEncoder;
+use crate::rpc::ApiError;
+use crate::rpc::message::LookupRequest;
+use std::sync::Arc;
+
+/// The result of a lookup operation.
+///
+/// Contains the rows returned from a lookup. For primary key lookups,
+/// this will contain at most one row. For prefix key lookups (future),
+/// this may contain multiple rows.
+pub struct LookupResult<'a> {
+    rows: Vec<Vec<u8>>,
+    row_type: &'a RowType,
+}
+
+impl<'a> LookupResult<'a> {
+    /// Creates a new LookupResult from a list of row bytes.
+    fn new(rows: Vec<Vec<u8>>, row_type: &'a RowType) -> Self {
+        Self { rows, row_type }
+    }
+
+    /// Creates an empty LookupResult.
+    fn empty(row_type: &'a RowType) -> Self {
+        Self {
+            rows: Vec::new(),
+            row_type,
+        }
+    }
+
+    /// Returns the only row in the result set as a [`CompactedRow`].
+    ///
+    /// This method provides a zero-copy view of the row data, which means the 
returned
+    /// `CompactedRow` borrows from this result set and cannot outlive it.
+    ///
+    /// # Returns
+    /// - `Ok(Some(row))`: If exactly one row exists.
+    /// - `Ok(None)`: If the result set is empty.
+    /// - `Err(Error::UnexpectedError)`: If the result set contains more than 
one row.
+    ///
+    pub fn get_single_row(&self) -> Result<Option<CompactedRow<'_>>> {
+        match self.rows.len() {
+            0 => Ok(None),
+            1 => Ok(Some(CompactedRow::from_bytes(self.row_type, 
&self.rows[0]))),
+            _ => Err(Error::UnexpectedError {
+                message: "LookupResult contains multiple rows, use get_rows() 
instead".to_string(),
+                source: None,
+            }),
+        }
+    }
+
+    /// Returns all rows as CompactedRows.
+    pub fn get_rows(&self) -> Vec<CompactedRow<'_>> {
+        self.rows
+            .iter()
+            .map(|bytes| CompactedRow::from_bytes(self.row_type, bytes))
+            .collect()
+    }
+}
+
+/// Configuration and factory struct for creating lookup operations.
+///
+/// `TableLookup` follows the same pattern as `TableScan` and `TableAppend`,
+/// providing a builder-style API for configuring lookup operations before
+/// creating the actual `Lookuper`.
+///
+/// # Example
+/// ```ignore
+/// let table = conn.get_table(&table_path).await?;
+/// let lookuper = table.new_lookup()?.create_lookuper()?;
+/// let result = lookuper.lookup(&row).await?;
+/// if let Some(value) = result.get_single_row() {
+///     println!("Found: {:?}", value);
+/// }
+/// ```
+// TODO: Add lookup_by(column_names) for prefix key lookups (PrefixKeyLookuper)
+// TODO: Add create_typed_lookuper<T>() for typed lookups with POJO mapping
+pub struct TableLookup<'a> {
+    conn: &'a FlussConnection,
+    table_info: TableInfo,
+    metadata: Arc<Metadata>,
+}
+
+impl<'a> TableLookup<'a> {
+    pub(super) fn new(
+        conn: &'a FlussConnection,
+        table_info: TableInfo,
+        metadata: Arc<Metadata>,
+    ) -> Self {
+        Self {
+            conn,
+            table_info,
+            metadata,
+        }
+    }
+
+    /// Creates a `Lookuper` for performing key-based lookups.
+    ///
+    /// The lookuper will automatically encode the key and compute the bucket
+    /// for each lookup using the appropriate bucketing function.
+    pub fn create_lookuper(self) -> Result<Lookuper<'a>> {
+        let num_buckets = self.table_info.get_num_buckets();
+
+        // Get data lake format from table config for bucketing function
+        let data_lake_format = 
self.table_info.get_table_config().get_datalake_format()?;
+        let bucketing_function = <dyn 
BucketingFunction>::of(data_lake_format.as_ref());
+
+        // Create key encoder for the primary key fields
+        let pk_fields = self.table_info.get_physical_primary_keys().to_vec();
+        let key_encoder =
+            <dyn KeyEncoder>::of(self.table_info.row_type(), pk_fields, 
data_lake_format)?;
+
+        Ok(Lookuper {
+            conn: self.conn,
+            table_info: self.table_info,
+            metadata: self.metadata,
+            bucketing_function,
+            key_encoder,
+            num_buckets,
+        })
+    }
+}
+
+/// Performs key-based lookups against a primary key table.
+///
+/// The `Lookuper` automatically encodes the lookup key, computes the target
+/// bucket, finds the appropriate tablet server, and retrieves the value.
+///
+/// # Example
+/// ```ignore
+/// let lookuper = table.new_lookup()?.create_lookuper()?;
+/// let row = GenericRow::new(vec![Datum::Int32(42)]); // lookup key
+/// let result = lookuper.lookup(&row).await?;
+/// ```
+// TODO: Support partitioned tables (extract partition from key)
+pub struct Lookuper<'a> {
+    conn: &'a FlussConnection,
+    table_info: TableInfo,
+    metadata: Arc<Metadata>,
+    bucketing_function: Box<dyn BucketingFunction>,
+    key_encoder: Box<dyn KeyEncoder>,
+    num_buckets: i32,
+}
+
+impl<'a> Lookuper<'a> {
+    /// Looks up a value by its primary key.
+    ///
+    /// The key is encoded and the bucket is automatically computed using
+    /// the table's bucketing function.
+    ///
+    /// # Arguments
+    /// * `row` - The row containing the primary key field values
+    ///
+    /// # Returns
+    /// * `Ok(LookupResult)` - The lookup result (may be empty if key not 
found)
+    /// * `Err(Error)` - If the lookup fails
+    pub async fn lookup(&mut self, row: &dyn InternalRow) -> 
Result<LookupResult<'_>> {
+        // todo: support batch lookup
+        // Encode the key from the row
+        let encoded_key = self.key_encoder.encode_key(row)?;
+        let key_bytes = encoded_key.to_vec();
+
+        // Compute bucket from encoded key
+        let bucket_id = self
+            .bucketing_function
+            .bucketing(&key_bytes, self.num_buckets)?;
+
+        let table_id = self.table_info.get_table_id();
+        let table_bucket = TableBucket::new(table_id, bucket_id);
+
+        // Find the leader for this bucket
+        let cluster = self.metadata.get_cluster();
+        let leader =
+            cluster
+                .leader_for(&table_bucket)
+                .ok_or_else(|| Error::LeaderNotAvailable {
+                    message: format!("No leader found for table bucket: 
{table_bucket}"),
+                })?;
+
+        // Get connection to the tablet server
+        let tablet_server =
+            cluster
+                .get_tablet_server(leader.id())
+                .ok_or_else(|| Error::LeaderNotAvailable {
+                    message: format!(
+                        "Tablet server {} is not found in metadata cache",
+                        leader.id()
+                    ),
+                })?;
+
+        let connections = self.conn.get_connections();
+        let connection = connections.get_connection(tablet_server).await?;
+
+        // Send lookup request
+        let request = LookupRequest::new(table_id, None, bucket_id, 
vec![key_bytes]);
+        let response = connection.request(request).await?;
+
+        // Extract the values from response
+        if let Some(bucket_resp) = response.buckets_resp.into_iter().next() {
+            // Check for errors
+            if let Some(error_code) = bucket_resp.error_code {
+                if error_code != 0 {
+                    return Err(Error::FlussAPIError {
+                        api_error: ApiError {
+                            code: error_code,
+                            message: 
bucket_resp.error_message.unwrap_or_default(),
+                        },
+                    });
+                }
+            }
+
+            // Collect all values
+            let rows: Vec<Vec<u8>> = bucket_resp
+                .values
+                .into_iter()
+                .filter_map(|pb_value| pb_value.values)
+                .collect();
+
+            return Ok(LookupResult::new(rows, self.table_info.row_type()));
+        }
+
+        Ok(LookupResult::empty(self.table_info.row_type()))
+    }
+
+    /// Returns a reference to the table info.
+    pub fn table_info(&self) -> &TableInfo {
+        &self.table_info
+    }
+}
diff --git a/crates/fluss/src/client/table/mod.rs 
b/crates/fluss/src/client/table/mod.rs
index 26341d7..7356be2 100644
--- a/crates/fluss/src/client/table/mod.rs
+++ b/crates/fluss/src/client/table/mod.rs
@@ -17,14 +17,14 @@
 
 use crate::client::connection::FlussConnection;
 use crate::client::metadata::Metadata;
+use crate::error::{Error, Result};
 use crate::metadata::{TableInfo, TablePath};
 use std::sync::Arc;
 
-use crate::error::Result;
-
 pub const EARLIEST_OFFSET: i64 = -2;
 
 mod append;
+mod lookup;
 
 mod log_fetch_buffer;
 mod remote_log;
@@ -32,6 +32,7 @@ mod scanner;
 mod writer;
 
 pub use append::{AppendWriter, TableAppend};
+pub use lookup::{LookupResult, Lookuper, TableLookup};
 pub use scanner::{LogScanner, RecordBatchLogScanner, TableScan};
 
 #[allow(dead_code)]
@@ -85,6 +86,39 @@ impl<'a> FlussTable<'a> {
     pub fn has_primary_key(&self) -> bool {
         self.has_primary_key
     }
+
+    /// Creates a new `TableLookup` for configuring lookup operations.
+    ///
+    /// This follows the same pattern as `new_scan()` and `new_append()`,
+    /// returning a configuration object that can be used to create a 
`Lookuper`.
+    ///
+    /// The table must have a primary key (be a primary key table).
+    ///
+    /// # Returns
+    /// * `Ok(TableLookup)` - A lookup configuration object
+    /// * `Err(Error)` - If the table doesn't have a primary key
+    ///
+    /// # Example
+    /// ```ignore
+    /// let table = conn.get_table(&table_path).await?;
+    /// let lookuper = table.new_lookup()?.create_lookuper()?;
+    /// let key = vec![1, 2, 3]; // encoded primary key bytes
+    /// if let Some(value) = lookuper.lookup(key).await? {
+    ///     println!("Found value: {:?}", value);
+    /// }
+    /// ```
+    pub fn new_lookup(&self) -> Result<TableLookup<'_>> {
+        if !self.has_primary_key {
+            return Err(Error::UnsupportedOperation {
+                message: "Lookup is only supported for primary key 
tables".to_string(),
+            });
+        }
+        Ok(TableLookup::new(
+            self.conn,
+            self.table_info.clone(),
+            self.metadata.clone(),
+        ))
+    }
 }
 
 impl<'a> Drop for FlussTable<'a> {
diff --git a/crates/fluss/src/client/write/write_format.rs 
b/crates/fluss/src/client/write/write_format.rs
index d65e42d..4a0c0d8 100644
--- a/crates/fluss/src/client/write/write_format.rs
+++ b/crates/fluss/src/client/write/write_format.rs
@@ -39,7 +39,7 @@ impl WriteFormat {
         match self {
             WriteFormat::CompactedKv => Ok(KvFormat::COMPACTED),
             other => Err(IllegalArgument {
-                message: format!("WriteFormat `{}` is not a KvFormat", other),
+                message: format!("WriteFormat `{other}` is not a KvFormat"),
             }),
         }
     }
@@ -48,7 +48,7 @@ impl WriteFormat {
         match kv_format {
             KvFormat::COMPACTED => Ok(WriteFormat::CompactedKv),
             other => Err(IllegalArgument {
-                message: format!("Unknown KvFormat: `{}`", other),
+                message: format!("Unknown KvFormat: `{other}`"),
             }),
         }
     }
diff --git a/crates/fluss/src/metadata/table.rs 
b/crates/fluss/src/metadata/table.rs
index b1e8a90..da85b0c 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -729,6 +729,7 @@ impl TableConfig {
         ArrowCompressionInfo::from_conf(&self.properties)
     }
 
+    /// Returns the data lake format if configured, or None if not set.
     pub fn get_datalake_format(&self) -> Result<Option<DataLakeFormat>> {
         self.properties
             .get("table.datalake.format")
diff --git a/crates/fluss/src/proto/fluss_api.proto 
b/crates/fluss/src/proto/fluss_api.proto
index dbbb45d..b4ae840 100644
--- a/crates/fluss/src/proto/fluss_api.proto
+++ b/crates/fluss/src/proto/fluss_api.proto
@@ -317,4 +317,32 @@ message GetFileSystemSecurityTokenResponse {
   required bytes token = 2;
   optional int64 expiration_time = 3;
   repeated PbKeyValue addition_info = 4;
+}
+
+// lookup request and response
+message LookupRequest {
+  required int64 table_id = 1;
+  repeated PbLookupReqForBucket buckets_req = 2;
+}
+
+message LookupResponse {
+  repeated PbLookupRespForBucket buckets_resp = 1;
+}
+
+message PbLookupReqForBucket {
+  optional int64 partition_id = 1;
+  required int32 bucket_id = 2;
+  repeated bytes key = 3;
+}
+
+message PbLookupRespForBucket {
+  optional int64 partition_id = 1;
+  required int32 bucket_id = 2;
+  optional int32 error_code = 3;
+  optional string error_message = 4;
+  repeated PbValue values = 5;
+}
+
+message PbValue {
+  optional bytes values = 1;
 }
\ No newline at end of file
diff --git a/crates/fluss/src/record/kv/kv_record.rs 
b/crates/fluss/src/record/kv/kv_record.rs
index 8c30713..ab8c2ac 100644
--- a/crates/fluss/src/record/kv/kv_record.rs
+++ b/crates/fluss/src/record/kv/kv_record.rs
@@ -101,7 +101,7 @@ impl KvRecord {
         let size_i32 = i32::try_from(size_in_bytes).map_err(|_| {
             io::Error::new(
                 io::ErrorKind::InvalidInput,
-                format!("Record size {} exceeds i32::MAX", size_in_bytes),
+                format!("Record size {size_in_bytes} exceeds i32::MAX"),
             )
         })?;
         buf.put_i32_le(size_i32);
@@ -141,7 +141,7 @@ impl KvRecord {
         if size_in_bytes_i32 < 0 {
             return Err(io::Error::new(
                 io::ErrorKind::InvalidData,
-                format!("Invalid record length: {}", size_in_bytes_i32),
+                format!("Invalid record length: {size_in_bytes_i32}"),
             ));
         }
 
@@ -150,10 +150,7 @@ impl KvRecord {
         let total_size = 
size_in_bytes.checked_add(LENGTH_LENGTH).ok_or_else(|| {
             io::Error::new(
                 io::ErrorKind::InvalidData,
-                format!(
-                    "Record size overflow: {} + {}",
-                    size_in_bytes, LENGTH_LENGTH
-                ),
+                format!("Record size overflow: {size_in_bytes} + 
{LENGTH_LENGTH}"),
             )
         })?;
 
@@ -162,8 +159,7 @@ impl KvRecord {
             return Err(io::Error::new(
                 io::ErrorKind::UnexpectedEof,
                 format!(
-                    "Not enough bytes to read record: expected {}, available 
{}",
-                    total_size, available
+                    "Not enough bytes to read record: expected {total_size}, 
available {available}"
                 ),
             ));
         }
diff --git a/crates/fluss/src/record/kv/kv_record_batch.rs 
b/crates/fluss/src/record/kv/kv_record_batch.rs
index 6ead642..eb3c09a 100644
--- a/crates/fluss/src/record/kv/kv_record_batch.rs
+++ b/crates/fluss/src/record/kv/kv_record_batch.rs
@@ -96,7 +96,7 @@ impl KvRecordBatch {
         if length_i32 < 0 {
             return Err(io::Error::new(
                 io::ErrorKind::InvalidData,
-                format!("Invalid batch length: {}", length_i32),
+                format!("Invalid batch length: {length_i32}"),
             ));
         }
 
@@ -150,10 +150,7 @@ impl KvRecordBatch {
         if size < RECORD_BATCH_HEADER_SIZE {
             return Err(io::Error::new(
                 io::ErrorKind::InvalidData,
-                format!(
-                    "Batch size {} is less than header size {}",
-                    size, RECORD_BATCH_HEADER_SIZE
-                ),
+                format!("Batch size {size} is less than header size 
{RECORD_BATCH_HEADER_SIZE}"),
             ));
         }
 
@@ -276,7 +273,7 @@ impl KvRecordBatch {
         if count < 0 {
             return Err(io::Error::new(
                 io::ErrorKind::InvalidData,
-                format!("Invalid record count: {}", count),
+                format!("Invalid record count: {count}"),
             ));
         }
         Ok(KvRecordIterator {
@@ -321,7 +318,7 @@ impl Iterator for KvRecordIterator {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::metadata::{DataTypes, KvFormat};
+    use crate::metadata::{DataTypes, KvFormat, RowType};
     use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, KvRecordBatchBuilder};
     use crate::row::binary::BinaryWriter;
     use crate::row::compacted::CompactedRow;
@@ -366,8 +363,8 @@ mod tests {
         let mut value1_writer = CompactedRowWriter::new(1);
         value1_writer.write_bytes(&[1, 2, 3, 4, 5]);
 
-        let data_types = &[DataTypes::bytes()];
-        let row = &CompactedRow::from_bytes(data_types, 
value1_writer.buffer());
+        let row_type = RowType::with_data_types([DataTypes::bytes()].to_vec());
+        let row = &CompactedRow::from_bytes(&row_type, value1_writer.buffer());
         builder.append_row(key1, Some(row)).unwrap();
 
         let key2 = b"key2";
diff --git a/crates/fluss/src/record/kv/kv_record_batch_builder.rs 
b/crates/fluss/src/record/kv/kv_record_batch_builder.rs
index 7d1a797..c36a861 100644
--- a/crates/fluss/src/record/kv/kv_record_batch_builder.rs
+++ b/crates/fluss/src/record/kv/kv_record_batch_builder.rs
@@ -248,7 +248,7 @@ impl KvRecordBatchBuilder {
         let total_size = i32::try_from(size_without_length).map_err(|_| {
             io::Error::new(
                 io::ErrorKind::InvalidInput,
-                format!("Batch size {} exceeds i32::MAX", size_without_length),
+                format!("Batch size {size_without_length} exceeds i32::MAX"),
             )
         })?;
 
@@ -317,14 +317,16 @@ impl Drop for KvRecordBatchBuilder {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::metadata::{DataType, DataTypes};
+    use crate::metadata::{DataTypes, RowType};
     use crate::row::binary::BinaryWriter;
     use crate::row::compacted::{CompactedRow, CompactedRowWriter};
+    use std::sync::LazyLock;
+    static TEST_ROW_TYPE: LazyLock<RowType> =
+        LazyLock::new(|| RowType::with_data_types(vec![DataTypes::bytes()]));
 
     // Helper function to create a CompactedRowWriter with a single bytes 
field for testing
     fn create_test_row(data: &[u8]) -> CompactedRow<'_> {
-        const DATA_TYPE: &[DataType] = &[DataTypes::bytes()];
-        CompactedRow::from_bytes(DATA_TYPE, data)
+        CompactedRow::from_bytes(&TEST_ROW_TYPE, data)
     }
 
     #[test]
@@ -483,7 +485,6 @@ mod tests {
 
     #[test]
     fn test_builder_with_compacted_row_writer() {
-        use crate::metadata::{DataType, IntType, StringType};
         use crate::record::kv::KvRecordBatch;
         use crate::row::InternalRow;
         use crate::row::compacted::CompactedRow;
@@ -491,18 +492,13 @@ mod tests {
         let mut builder = KvRecordBatchBuilder::new(1, 100000, 
KvFormat::COMPACTED);
         builder.set_writer_state(100, 5);
 
-        let types = vec![
-            DataType::Int(IntType::new()),
-            DataType::String(StringType::new()),
-        ];
-
         // Create and append first record with CompactedRowWriter
         let mut row_writer1 = CompactedRowWriter::new(2);
         row_writer1.write_int(42);
         row_writer1.write_string("hello");
 
-        let data_types = &[DataTypes::int(), DataTypes::string()];
-        let row1 = &CompactedRow::from_bytes(data_types, row_writer1.buffer());
+        let row_type = RowType::with_data_types([DataTypes::int(), 
DataTypes::string()].to_vec());
+        let row1 = &CompactedRow::from_bytes(&row_type, row_writer1.buffer());
 
         let key1 = b"key1";
         assert!(builder.has_room_for_row(key1, Some(row1)));
@@ -513,7 +509,7 @@ mod tests {
         row_writer2.write_int(100);
         row_writer2.write_string("world");
 
-        let row2 = &CompactedRow::from_bytes(data_types, row_writer2.buffer());
+        let row2 = &CompactedRow::from_bytes(&row_type, row_writer2.buffer());
 
         let key2 = b"key2";
         builder.append_row(key2, Some(row2)).unwrap();
@@ -539,14 +535,14 @@ mod tests {
         // Verify first record
         let record1 = records[0].as_ref().unwrap();
         assert_eq!(record1.key().as_ref(), key1);
-        let row1 = CompactedRow::from_bytes(&types, record1.value().unwrap());
+        let row1 = CompactedRow::from_bytes(&row_type, 
record1.value().unwrap());
         assert_eq!(row1.get_int(0), 42);
         assert_eq!(row1.get_string(1), "hello");
 
         // Verify second record
         let record2 = records[1].as_ref().unwrap();
         assert_eq!(record2.key().as_ref(), key2);
-        let row2 = CompactedRow::from_bytes(&types, record2.value().unwrap());
+        let row2 = CompactedRow::from_bytes(&row_type, 
record2.value().unwrap());
         assert_eq!(row2.get_int(0), 100);
         assert_eq!(row2.get_string(1), "world");
 
@@ -561,8 +557,8 @@ mod tests {
         let mut row_writer = CompactedRowWriter::new(1);
         row_writer.write_int(42);
 
-        let data_types = &[DataTypes::int()];
-        let row = &CompactedRow::from_bytes(data_types, row_writer.buffer());
+        let row_type = RowType::with_data_types([DataTypes::int()].to_vec());
+        let row = &CompactedRow::from_bytes(&row_type, row_writer.buffer());
 
         // INDEXED format should reject append_row
         let mut indexed_builder = KvRecordBatchBuilder::new(1, 4096, 
KvFormat::INDEXED);
diff --git a/crates/fluss/src/row/compacted/compacted_row.rs 
b/crates/fluss/src/row/compacted/compacted_row.rs
index 9ff3b5f..144f898 100644
--- a/crates/fluss/src/row/compacted/compacted_row.rs
+++ b/crates/fluss/src/row/compacted/compacted_row.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::metadata::DataType;
+use crate::metadata::RowType;
 use crate::row::compacted::compacted_row_reader::{CompactedRowDeserializer, 
CompactedRowReader};
 use crate::row::{BinaryRow, GenericRow, InternalRow};
 use std::sync::{Arc, OnceLock};
@@ -38,10 +38,10 @@ pub fn calculate_bit_set_width_in_bytes(arity: usize) -> 
usize {
 
 #[allow(dead_code)]
 impl<'a> CompactedRow<'a> {
-    pub fn from_bytes(data_types: &'a [DataType], data: &'a [u8]) -> Self {
+    pub fn from_bytes(row_type: &'a RowType, data: &'a [u8]) -> Self {
         Self::deserialize(
-            Arc::new(CompactedRowDeserializer::new(data_types)),
-            data_types.len(),
+            Arc::new(CompactedRowDeserializer::new(row_type)),
+            row_type.fields().len(),
             data,
         )
     }
@@ -84,7 +84,10 @@ impl<'a> InternalRow for CompactedRow<'a> {
     }
 
     fn is_null_at(&self, pos: usize) -> bool {
-        self.deserializer.get_data_types()[pos].is_nullable() && 
self.reader.is_null_at(pos)
+        self.deserializer.get_row_type().fields().as_slice()[pos]
+            .data_type
+            .is_nullable()
+            && self.reader.is_null_at(pos)
     }
 
     fn get_boolean(&self, pos: usize) -> bool {
@@ -138,7 +141,7 @@ mod tests {
     use crate::row::binary::BinaryWriter;
 
     use crate::metadata::{
-        BigIntType, BooleanType, BytesType, DoubleType, FloatType, IntType, 
SmallIntType,
+        BigIntType, BooleanType, BytesType, DataType, DoubleType, FloatType, 
IntType, SmallIntType,
         StringType, TinyIntType,
     };
     use crate::row::compacted::compacted_row_writer::CompactedRowWriter;
@@ -146,7 +149,7 @@ mod tests {
     #[test]
     fn test_compacted_row() {
         // Test all primitive types
-        let types = vec![
+        let row_type = RowType::with_data_types(vec![
             DataType::Boolean(BooleanType::new()),
             DataType::TinyInt(TinyIntType::new()),
             DataType::SmallInt(SmallIntType::new()),
@@ -156,9 +159,9 @@ mod tests {
             DataType::Double(DoubleType::new()),
             DataType::String(StringType::new()),
             DataType::Bytes(BytesType::new()),
-        ];
+        ]);
 
-        let mut writer = CompactedRowWriter::new(types.len());
+        let mut writer = CompactedRowWriter::new(row_type.fields().len());
 
         writer.write_boolean(true);
         writer.write_byte(1);
@@ -171,7 +174,7 @@ mod tests {
         writer.write_bytes(&[1, 2, 3, 4, 5]);
 
         let bytes = writer.to_bytes();
-        let mut row = CompactedRow::from_bytes(types.as_slice(), 
bytes.as_ref());
+        let mut row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
 
         assert_eq!(row.get_field_count(), 9);
         assert!(row.get_boolean(0));
@@ -185,20 +188,23 @@ mod tests {
         assert_eq!(row.get_bytes(8), &[1, 2, 3, 4, 5]);
 
         // Test with nulls
-        let types = vec![
-            DataType::Int(IntType::new()),
-            DataType::String(StringType::new()),
-            DataType::Double(DoubleType::new()),
-        ];
+        let row_type = RowType::with_data_types(
+            [
+                DataType::Int(IntType::new()),
+                DataType::String(StringType::new()),
+                DataType::Double(DoubleType::new()),
+            ]
+            .to_vec(),
+        );
 
-        let mut writer = CompactedRowWriter::new(types.len());
+        let mut writer = CompactedRowWriter::new(row_type.fields().len());
 
         writer.write_int(100);
         writer.set_null_at(1);
         writer.write_double(2.71);
 
         let bytes = writer.to_bytes();
-        row = CompactedRow::from_bytes(types.as_slice(), bytes.as_ref());
+        row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
 
         assert!(!row.is_null_at(0));
         assert!(row.is_null_at(1));
@@ -211,26 +217,28 @@ mod tests {
         assert_eq!(row.get_int(0), 100);
 
         // Test from_bytes
-        let types = vec![
+        let row_type = RowType::with_data_types(vec![
             DataType::Int(IntType::new()),
             DataType::String(StringType::new()),
-        ];
+        ]);
 
-        let mut writer = CompactedRowWriter::new(types.len());
+        let mut writer = CompactedRowWriter::new(row_type.fields().len());
         writer.write_int(-1);
         writer.write_string("test");
 
         let bytes = writer.to_bytes();
-        let mut row = CompactedRow::from_bytes(types.as_slice(), 
bytes.as_ref());
+        let mut row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
 
         assert_eq!(row.get_int(0), -1);
         assert_eq!(row.get_string(1), "test");
 
         // Test large row
         let num_fields = 100;
-        let types: Vec<DataType> = (0..num_fields)
-            .map(|_| DataType::Int(IntType::new()))
-            .collect();
+        let row_type = RowType::with_data_types(
+            (0..num_fields)
+                .map(|_| DataType::Int(IntType::new()))
+                .collect(),
+        );
 
         let mut writer = CompactedRowWriter::new(num_fields);
 
@@ -239,7 +247,7 @@ mod tests {
         }
 
         let bytes = writer.to_bytes();
-        row = CompactedRow::from_bytes(types.as_slice(), bytes.as_ref());
+        row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
 
         for i in 0..num_fields {
             assert_eq!(row.get_int(i), (i * 10) as i32);
diff --git a/crates/fluss/src/row/compacted/compacted_row_reader.rs 
b/crates/fluss/src/row/compacted/compacted_row_reader.rs
index 9ce5095..408706c 100644
--- a/crates/fluss/src/row/compacted/compacted_row_reader.rs
+++ b/crates/fluss/src/row/compacted/compacted_row_reader.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::metadata::RowType;
 use crate::row::compacted::compacted_row::calculate_bit_set_width_in_bytes;
 use crate::{
     metadata::DataType,
@@ -27,31 +28,32 @@ use std::str::from_utf8;
 #[allow(dead_code)]
 #[derive(Clone)]
 pub struct CompactedRowDeserializer<'a> {
-    schema: Cow<'a, [DataType]>,
+    row_type: Cow<'a, RowType>,
 }
 
 #[allow(dead_code)]
 impl<'a> CompactedRowDeserializer<'a> {
-    pub fn new(schema: &'a [DataType]) -> Self {
+    pub fn new(row_type: &'a RowType) -> Self {
         Self {
-            schema: Cow::Borrowed(schema),
+            row_type: Cow::Borrowed(row_type),
         }
     }
 
-    pub fn new_from_owned(schema: Vec<DataType>) -> Self {
+    pub fn new_from_owned(row_type: RowType) -> Self {
         Self {
-            schema: Cow::Owned(schema),
+            row_type: Cow::Owned(row_type),
         }
     }
 
-    pub fn get_data_types(&self) -> &[DataType] {
-        self.schema.as_ref()
+    pub fn get_row_type(&self) -> &RowType {
+        self.row_type.as_ref()
     }
 
     pub fn deserialize(&self, reader: &CompactedRowReader<'a>) -> 
GenericRow<'a> {
         let mut row = GenericRow::new();
         let mut cursor = reader.initial_position();
-        for (col_pos, dtype) in self.schema.iter().enumerate() {
+        for (col_pos, data_field) in self.row_type.fields().iter().enumerate() 
{
+            let dtype = &data_field.data_type;
             if dtype.is_nullable() && reader.is_null_at(col_pos) {
                 row.set_field(col_pos, Datum::Null);
                 continue;
diff --git a/crates/fluss/src/row/encode/compacted_row_encoder.rs 
b/crates/fluss/src/row/encode/compacted_row_encoder.rs
index fc39bb7..48b9f3f 100644
--- a/crates/fluss/src/row/encode/compacted_row_encoder.rs
+++ b/crates/fluss/src/row/encode/compacted_row_encoder.rs
@@ -17,7 +17,7 @@
 
 use crate::error::Error::IllegalArgument;
 use crate::error::Result;
-use crate::metadata::DataType;
+use crate::metadata::RowType;
 use crate::row::Datum;
 use crate::row::binary::{BinaryRowFormat, BinaryWriter, ValueWriter};
 use crate::row::compacted::{CompactedRow, CompactedRowDeserializer, 
CompactedRowWriter};
@@ -33,18 +33,18 @@ pub struct CompactedRowEncoder<'a> {
 }
 
 impl<'a> CompactedRowEncoder<'a> {
-    pub fn new(field_data_types: Vec<DataType>) -> Result<Self> {
-        let field_writers = field_data_types
-            .iter()
+    pub fn new(row_type: RowType) -> Result<Self> {
+        let field_writers = row_type
+            .field_types()
             .map(|d| ValueWriter::create_value_writer(d, 
Some(&BinaryRowFormat::Compacted)))
             .collect::<Result<Vec<_>>>()?;
 
         Ok(Self {
-            arity: field_data_types.len(),
-            writer: CompactedRowWriter::new(field_data_types.len()),
+            arity: field_writers.len(),
+            writer: CompactedRowWriter::new(field_writers.len()),
             field_writers,
             compacted_row_deserializer: 
Arc::new(CompactedRowDeserializer::new_from_owned(
-                field_data_types,
+                row_type,
             )),
         })
     }
@@ -60,10 +60,7 @@ impl RowEncoder for CompactedRowEncoder<'_> {
         self.field_writers
             .get(pos)
             .ok_or_else(|| IllegalArgument {
-                message: format!(
-                    "invalid position {} when attempting to encode value {}",
-                    pos, value
-                ),
+                message: format!("invalid position {pos} when attempting to 
encode value {value}"),
             })?
             .write_value(&mut self.writer, pos, &value)
     }
diff --git a/crates/fluss/src/row/encode/mod.rs 
b/crates/fluss/src/row/encode/mod.rs
index 34863ab..c294ecf 100644
--- a/crates/fluss/src/row/encode/mod.rs
+++ b/crates/fluss/src/row/encode/mod.rs
@@ -19,7 +19,7 @@ mod compacted_key_encoder;
 mod compacted_row_encoder;
 
 use crate::error::Result;
-use crate::metadata::{DataLakeFormat, DataType, KvFormat, RowType};
+use crate::metadata::{DataLakeFormat, KvFormat, RowType};
 use crate::row::encode::compacted_key_encoder::CompactedKeyEncoder;
 use crate::row::encode::compacted_row_encoder::CompactedRowEncoder;
 use crate::row::{BinaryRow, Datum, InternalRow};
@@ -111,18 +111,18 @@ pub struct RowEncoderFactory {}
 #[allow(dead_code)]
 impl RowEncoderFactory {
     pub fn create(kv_format: KvFormat, row_type: &RowType) -> Result<impl 
RowEncoder> {
-        Self::create_for_field_types(kv_format, 
row_type.field_types().cloned().collect())
+        Self::create_for_field_types(kv_format, row_type.clone())
     }
 
     pub fn create_for_field_types(
         kv_format: KvFormat,
-        field_data_types: Vec<DataType>,
+        row_type: RowType,
     ) -> Result<impl RowEncoder> {
         match kv_format {
             KvFormat::INDEXED => {
                 todo!()
             }
-            KvFormat::COMPACTED => CompactedRowEncoder::new(field_data_types),
+            KvFormat::COMPACTED => CompactedRowEncoder::new(row_type),
         }
     }
 }
diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs
index 4996063..3477f1d 100644
--- a/crates/fluss/src/row/mod.rs
+++ b/crates/fluss/src/row/mod.rs
@@ -21,11 +21,13 @@ mod datum;
 
 pub mod binary;
 pub mod compacted;
-mod encode;
+pub mod encode;
 mod field_getter;
 
 pub use column::*;
+pub use compacted::CompactedRow;
 pub use datum::*;
+pub use encode::KeyEncoder;
 
 pub trait BinaryRow: InternalRow {
     /// Returns the binary representation of this row as a byte slice.
diff --git a/crates/fluss/src/rpc/api_key.rs b/crates/fluss/src/rpc/api_key.rs
index c515396..9f9268e 100644
--- a/crates/fluss/src/rpc/api_key.rs
+++ b/crates/fluss/src/rpc/api_key.rs
@@ -31,6 +31,7 @@ pub enum ApiKey {
     MetaData,
     ProduceLog,
     FetchLog,
+    Lookup,
     ListOffsets,
     GetFileSystemSecurityToken,
     GetDatabaseInfo,
@@ -53,6 +54,7 @@ impl From<i16> for ApiKey {
             1012 => ApiKey::MetaData,
             1014 => ApiKey::ProduceLog,
             1015 => ApiKey::FetchLog,
+            1017 => ApiKey::Lookup,
             1021 => ApiKey::ListOffsets,
             1025 => ApiKey::GetFileSystemSecurityToken,
             1032 => ApiKey::GetLatestLakeSnapshot,
@@ -77,6 +79,7 @@ impl From<ApiKey> for i16 {
             ApiKey::MetaData => 1012,
             ApiKey::ProduceLog => 1014,
             ApiKey::FetchLog => 1015,
+            ApiKey::Lookup => 1017,
             ApiKey::ListOffsets => 1021,
             ApiKey::GetFileSystemSecurityToken => 1025,
             ApiKey::GetLatestLakeSnapshot => 1032,
@@ -105,6 +108,7 @@ mod tests {
             (1012, ApiKey::MetaData),
             (1014, ApiKey::ProduceLog),
             (1015, ApiKey::FetchLog),
+            (1017, ApiKey::Lookup),
             (1021, ApiKey::ListOffsets),
             (1025, ApiKey::GetFileSystemSecurityToken),
             (1032, ApiKey::GetLatestLakeSnapshot),
diff --git a/crates/fluss/src/rpc/message/lookup.rs 
b/crates/fluss/src/rpc/message/lookup.rs
new file mode 100644
index 0000000..3de47d6
--- /dev/null
+++ b/crates/fluss/src/rpc/message/lookup.rs
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::proto::LookupResponse;
+use crate::rpc::frame::ReadError;
+
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::frame::WriteError;
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+use crate::{impl_read_version_type, impl_write_version_type, proto};
+use prost::Message;
+
+use bytes::{Buf, BufMut};
+
+pub struct LookupRequest {
+    pub inner_request: proto::LookupRequest,
+}
+
+impl LookupRequest {
+    pub fn new(
+        table_id: i64,
+        partition_id: Option<i64>,
+        bucket_id: i32,
+        keys: Vec<Vec<u8>>,
+    ) -> Self {
+        let bucket_req = proto::PbLookupReqForBucket {
+            partition_id,
+            bucket_id,
+            key: keys,
+        };
+
+        let request = proto::LookupRequest {
+            table_id,
+            buckets_req: vec![bucket_req],
+        };
+
+        Self {
+            inner_request: request,
+        }
+    }
+}
+
+impl RequestBody for LookupRequest {
+    type ResponseBody = LookupResponse;
+
+    const API_KEY: ApiKey = ApiKey::Lookup;
+
+    const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(LookupRequest);
+impl_read_version_type!(LookupResponse);
diff --git a/crates/fluss/src/rpc/message/mod.rs 
b/crates/fluss/src/rpc/message/mod.rs
index b619ee4..2fe506b 100644
--- a/crates/fluss/src/rpc/message/mod.rs
+++ b/crates/fluss/src/rpc/message/mod.rs
@@ -34,6 +34,7 @@ mod header;
 mod list_databases;
 mod list_offsets;
 mod list_tables;
+mod lookup;
 mod produce_log;
 mod table_exists;
 mod update_metadata;
@@ -53,6 +54,7 @@ pub use header::*;
 pub use list_databases::*;
 pub use list_offsets::*;
 pub use list_tables::*;
+pub use lookup::*;
 pub use produce_log::*;
 pub use table_exists::*;
 pub use update_metadata::*;
diff --git a/crates/fluss/src/util/varint.rs b/crates/fluss/src/util/varint.rs
index 96fd1f5..83a75f6 100644
--- a/crates/fluss/src/util/varint.rs
+++ b/crates/fluss/src/util/varint.rs
@@ -364,12 +364,11 @@ mod tests {
             let mut reader = Cursor::new(&buffer);
             let read_value = read_unsigned_varint(&mut reader).unwrap();
 
-            assert_eq!(value, read_value, "Round trip failed for value {}", 
value);
+            assert_eq!(value, read_value, "Round trip failed for value 
{value}");
             assert_eq!(
                 written,
                 buffer.len(),
-                "Bytes written mismatch for value {}",
-                value
+                "Bytes written mismatch for value {value}"
             );
 
             // Test with BufMut
@@ -382,22 +381,19 @@ mod tests {
             assert_eq!(
                 calculated_size,
                 buffer.len(),
-                "Size calculation failed for value {}",
-                value
+                "Size calculation failed for value {value}"
             );
 
             // Test reading from bytes
             let (read_value_bytes, bytes_read) = 
read_unsigned_varint_bytes(&buffer).unwrap();
             assert_eq!(
                 value, read_value_bytes,
-                "Bytes read failed for value {}",
-                value
+                "Bytes read failed for value {value}"
             );
             assert_eq!(
                 bytes_read,
                 buffer.len(),
-                "Bytes read count mismatch for value {}",
-                value
+                "Bytes read count mismatch for value {value}"
             );
         }
     }


Reply via email to