luoyuxia commented on code in PR #174: URL: https://github.com/apache/fluss-rust/pull/174#discussion_r2701028891
########## crates/fluss/src/row/row_decoder.rs: ########## @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Row decoder for deserializing binary row formats. +//! +//! Mirrors the Java org.apache.fluss.row.decode package. + +use crate::metadata::{DataField, DataType, KvFormat, RowType}; +use crate::row::compacted::{CompactedRow, CompactedRowDeserializer}; +use std::sync::Arc; + +/// Decoder for creating BinaryRow from bytes. +/// +/// This trait provides an abstraction for decoding different row formats +/// (COMPACTED, INDEXED, etc.) from binary data. +/// +/// Reference: org.apache.fluss.row.decode.RowDecoder +pub trait RowDecoder: Send + Sync { + /// Decode bytes into a CompactedRow. + /// + /// The lifetime 'a ties the returned row to the input data, ensuring + /// the data remains valid as long as the row is used. + fn decode<'a>(&self, data: &'a [u8]) -> CompactedRow<'a>; +} + +/// Decoder for CompactedRow format. +/// +/// Uses the existing CompactedRow infrastructure for decoding. +/// This is a thin wrapper that implements the RowDecoder trait. +/// +/// Reference: org.apache.fluss.row.decode.CompactedRowDecoder +pub struct CompactedRowDecoder { + field_count: usize, + deserializer: Arc<CompactedRowDeserializer<'static>>, +} + +impl CompactedRowDecoder { + /// Create a new CompactedRowDecoder with the given field data types. + pub fn new(data_types: Vec<DataType>) -> Self { + let field_count = data_types.len(); + // Convert Vec<DataType> to RowType by wrapping each in DataField + let fields: Vec<DataField> = data_types + .into_iter() + .enumerate() + .map(|(idx, data_type)| DataField::new(format!("f{idx}"), data_type, None)) + .collect(); + let row_type = RowType::new(fields); + let deserializer = Arc::new(CompactedRowDeserializer::new_from_owned(row_type)); + + Self { + field_count, + deserializer, + } + } +} + +impl RowDecoder for CompactedRowDecoder { + fn decode<'a>(&self, data: &'a [u8]) -> CompactedRow<'a> { + // Use existing CompactedRow::deserialize() infrastructure + CompactedRow::deserialize(Arc::clone(&self.deserializer), self.field_count, data) + } +} + +/// Factory for creating RowDecoders based on KvFormat. +/// +/// Reference: org.apache.fluss.row.decode.RowDecoder.create() +pub struct RowDecoderFactory; + +impl RowDecoderFactory { + /// Create a RowDecoder for the given format and field types. + pub fn create( + kv_format: KvFormat, + data_types: Vec<DataType>, + ) -> std::io::Result<Arc<dyn RowDecoder>> { + match kv_format { + KvFormat::COMPACTED => Ok(Arc::new(CompactedRowDecoder::new(data_types))), + KvFormat::INDEXED => Err(std::io::Error::new( Review Comment: we already have `UnsupportedOperation` in our errors define. Use our `UnsupportedOperation`? ########## crates/fluss/src/row/row_decoder.rs: ########## @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Row decoder for deserializing binary row formats. +//! +//! Mirrors the Java org.apache.fluss.row.decode package. + +use crate::metadata::{DataField, DataType, KvFormat, RowType}; +use crate::row::compacted::{CompactedRow, CompactedRowDeserializer}; +use std::sync::Arc; + +/// Decoder for creating BinaryRow from bytes. +/// +/// This trait provides an abstraction for decoding different row formats +/// (COMPACTED, INDEXED, etc.) from binary data. +/// +/// Reference: org.apache.fluss.row.decode.RowDecoder +pub trait RowDecoder: Send + Sync { + /// Decode bytes into a CompactedRow. + /// + /// The lifetime 'a ties the returned row to the input data, ensuring + /// the data remains valid as long as the row is used. + fn decode<'a>(&self, data: &'a [u8]) -> CompactedRow<'a>; +} + +/// Decoder for CompactedRow format. +/// +/// Uses the existing CompactedRow infrastructure for decoding. +/// This is a thin wrapper that implements the RowDecoder trait. +/// +/// Reference: org.apache.fluss.row.decode.CompactedRowDecoder +pub struct CompactedRowDecoder { + field_count: usize, + deserializer: Arc<CompactedRowDeserializer<'static>>, +} + +impl CompactedRowDecoder { + /// Create a new CompactedRowDecoder with the given field data types. + pub fn new(data_types: Vec<DataType>) -> Self { Review Comment: nit: seem it'll be simpler to pass `RowType` instead of `Vec<DataType>`? ########## crates/fluss/src/record/kv/mod.rs: ########## @@ -33,3 +37,43 @@ pub const NO_WRITER_ID: i64 = -1; /// No batch sequence constant pub const NO_BATCH_SEQUENCE: i32 = -1; + +/// Test utilities for KV record reading. +#[cfg(test)] +pub mod test_utils { + use super::*; + use crate::metadata::{DataType, KvFormat}; + use crate::row::{RowDecoder, RowDecoderFactory}; + use std::io; + use std::sync::Arc; + + /// Simple test-only ReadContext that creates decoders directly from data types. + /// + /// This bypasses the production Schema/SchemaGetter machinery for simpler tests. + pub struct TestReadContext { Review Comment: If it's not shared by other mod, can we just define it in the mod which need it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
