XiaoHongbo-Hope commented on code in PR #205:
URL: https://github.com/apache/paimon-rust/pull/205#discussion_r3036374679


##########
crates/paimon/src/spec/binary_row.rs:
##########
@@ -0,0 +1,752 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! BinaryRow: an implementation of InternalRow backed by raw binary bytes,
+//! and BinaryRowBuilder for constructing BinaryRow instances.
+
+use crate::spec::murmur_hash::hash_by_words;
+use serde::{Deserialize, Serialize};
+
+pub const EMPTY_BINARY_ROW: BinaryRow = BinaryRow::new(0);
+
+/// Highest bit mask for detecting inline vs variable-length encoding.
+const HIGHEST_FIRST_BIT: u64 = 0x80 << 56;
+
+/// Mask to extract the 7-bit length from an inline-encoded value.
+const HIGHEST_SECOND_TO_EIGHTH_BIT: u64 = 0x7F << 56;
+
+/// An implementation of InternalRow backed by raw binary bytes.
+///
+/// Binary layout (little-endian):
+/// ```text
+/// | header (8 bytes) | null bit set (8-byte aligned) | fixed-length (8B per 
field) | variable-length |
+/// ```
+///
+/// Impl Reference: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java>
+#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct BinaryRow {
+    arity: i32,
+    null_bits_size_in_bytes: i32,
+
+    #[serde(with = "serde_bytes")]
+    data: Vec<u8>,
+}
+
+impl BinaryRow {
+    pub const HEADER_SIZE_IN_BYTES: i32 = 8;
+
+    pub const fn cal_bit_set_width_in_bytes(arity: i32) -> i32 {
+        ((arity + 63 + Self::HEADER_SIZE_IN_BYTES) / 64) * 8
+    }
+
+    pub const fn cal_fix_part_size_in_bytes(arity: i32) -> i32 {
+        Self::cal_bit_set_width_in_bytes(arity) + 8 * arity
+    }
+
+    pub const fn new(arity: i32) -> Self {
+        Self {
+            arity,
+            null_bits_size_in_bytes: Self::cal_bit_set_width_in_bytes(arity),
+            data: Vec::new(),
+        }
+    }
+
+    pub fn from_bytes(arity: i32, data: Vec<u8>) -> Self {
+        let null_bits_size_in_bytes = Self::cal_bit_set_width_in_bytes(arity);
+        Self {
+            arity,
+            null_bits_size_in_bytes,
+            data,
+        }
+    }
+
+    pub fn from_serialized_bytes(data: &[u8]) -> crate::Result<Self> {
+        if data.len() < 4 {
+            return Err(crate::Error::UnexpectedError {
+                message: format!(
+                    "BinaryRow: serialized data too short for arity prefix: {} 
bytes",
+                    data.len()
+                ),
+                source: None,
+            });
+        }
+        let arity = i32::from_be_bytes([data[0], data[1], data[2], data[3]]);
+        Ok(Self::from_bytes(arity, data[4..].to_vec()))
+    }
+
+    pub fn arity(&self) -> i32 {
+        self.arity
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.data.is_empty()
+    }
+
+    pub fn data(&self) -> &[u8] {
+        &self.data
+    }
+
+    pub fn is_null_at(&self, pos: usize) -> bool {
+        let bit_index = pos + Self::HEADER_SIZE_IN_BYTES as usize;
+        let byte_index = bit_index / 8;
+        let bit_offset = bit_index % 8;
+        (self.data[byte_index] & (1 << bit_offset)) != 0
+    }
+
+    fn field_offset(&self, pos: usize) -> usize {
+        self.null_bits_size_in_bytes as usize + pos * 8
+    }
+
+    fn read_slice<const N: usize>(&self, offset: usize) -> crate::Result<[u8; 
N]> {
+        self.data
+            .get(offset..offset + N)
+            .and_then(|s| s.try_into().ok())
+            .ok_or_else(|| crate::Error::UnexpectedError {
+                message: format!(
+                    "BinaryRow: read {N} bytes at offset {offset} exceeds data 
length {}",
+                    self.data.len()
+                ),
+                source: None,
+            })
+    }
+
+    fn read_byte_at(&self, offset: usize) -> crate::Result<u8> {
+        self.data
+            .get(offset)
+            .copied()
+            .ok_or_else(|| crate::Error::UnexpectedError {
+                message: format!(
+                    "BinaryRow: read 1 byte at offset {offset} exceeds data 
length {}",
+                    self.data.len()
+                ),
+                source: None,
+            })
+    }
+
+    fn read_i64_at(&self, offset: usize) -> crate::Result<i64> {
+        self.read_slice::<8>(offset).map(i64::from_le_bytes)
+    }
+
+    fn read_i32_at(&self, offset: usize) -> crate::Result<i32> {
+        self.read_slice::<4>(offset).map(i32::from_le_bytes)
+    }
+
+    pub fn get_boolean(&self, pos: usize) -> crate::Result<bool> {
+        self.read_byte_at(self.field_offset(pos)).map(|b| b != 0)
+    }
+
+    pub fn get_byte(&self, pos: usize) -> crate::Result<i8> {
+        self.read_byte_at(self.field_offset(pos)).map(|b| b as i8)
+    }
+
+    pub fn get_short(&self, pos: usize) -> crate::Result<i16> {
+        self.read_slice::<2>(self.field_offset(pos))
+            .map(i16::from_le_bytes)
+    }
+
+    pub fn get_int(&self, pos: usize) -> crate::Result<i32> {
+        self.read_i32_at(self.field_offset(pos))
+    }
+
+    pub fn get_long(&self, pos: usize) -> crate::Result<i64> {
+        self.read_i64_at(self.field_offset(pos))
+    }
+
+    pub fn get_float(&self, pos: usize) -> crate::Result<f32> {
+        self.read_slice::<4>(self.field_offset(pos))
+            .map(f32::from_le_bytes)
+    }
+
+    pub fn get_double(&self, pos: usize) -> crate::Result<f64> {
+        self.read_slice::<8>(self.field_offset(pos))
+            .map(f64::from_le_bytes)
+    }
+
+    fn resolve_var_length_field(&self, pos: usize) -> crate::Result<(usize, 
usize)> {
+        let field_off = self.field_offset(pos);
+        let raw = self.read_i64_at(field_off)? as u64;
+
+        let (start, len) = if raw & HIGHEST_FIRST_BIT == 0 {
+            let offset = (raw >> 32) as usize;
+            let len = (raw & 0xFFFF_FFFF) as usize;
+            (offset, len)
+        } else {
+            let len = ((raw & HIGHEST_SECOND_TO_EIGHTH_BIT) >> 56) as usize;
+            (field_off, len)
+        };
+
+        let end = start
+            .checked_add(len)
+            .ok_or_else(|| crate::Error::UnexpectedError {
+                message: format!(
+                    "BinaryRow: var-len field at pos {pos}: offset {start} + 
len {len} overflows"
+                ),
+                source: None,
+            })?;
+        if end > self.data.len() {
+            return Err(crate::Error::UnexpectedError {
+                message: format!(
+                    "BinaryRow: var-len field at pos {pos}: range 
[{start}..{end}) exceeds data length {}",
+                    self.data.len()
+                ),
+                source: None,
+            });
+        }
+        Ok((start, len))
+    }
+
+    pub fn get_binary(&self, pos: usize) -> crate::Result<&[u8]> {
+        let (start, len) = self.resolve_var_length_field(pos)?;
+        Ok(&self.data[start..start + len])
+    }
+
+    pub fn get_string(&self, pos: usize) -> crate::Result<&str> {
+        let bytes = self.get_binary(pos)?;
+        std::str::from_utf8(bytes).map_err(|e| crate::Error::UnexpectedError {
+            message: format!("BinaryRow: invalid UTF-8 in string field at pos 
{pos}: {e}"),
+            source: Some(Box::new(e)),
+        })
+    }
+
+    pub(crate) fn get_decimal_unscaled(&self, pos: usize, precision: u32) -> 
crate::Result<i128> {
+        if precision <= 18 {
+            Ok(self.get_long(pos)? as i128)
+        } else {
+            let bytes = self.get_binary(pos)?;
+            if bytes.is_empty() {
+                return Err(crate::Error::UnexpectedError {
+                    message: format!("BinaryRow: empty bytes for non-compact 
Decimal at pos {pos}"),
+                    source: None,
+                });
+            }
+            let negative = bytes[0] & 0x80 != 0;
+            let mut val: i128 = if negative { -1 } else { 0 };
+            for &b in bytes {
+                val = (val << 8) | (b as i128);
+            }
+            Ok(val)
+        }
+    }
+
+    pub(crate) fn get_timestamp_raw(
+        &self,
+        pos: usize,
+        precision: u32,
+    ) -> crate::Result<(i64, i32)> {
+        if precision <= 3 {
+            Ok((self.get_long(pos)?, 0))
+        } else {
+            let field_off = self.field_offset(pos);
+            let offset_and_nano = self.read_i64_at(field_off)? as u64;
+            let offset = (offset_and_nano >> 32) as usize;
+            let nano_of_milli = offset_and_nano as i32;
+
+            if offset + 8 > self.data.len() {
+                return Err(crate::Error::UnexpectedError {
+                    message: format!(
+                        "BinaryRow: non-compact Timestamp at pos {pos}: offset 
{offset} + 8 exceeds data length {}",
+                        self.data.len()
+                    ),
+                    source: None,
+                });
+            }
+            let millis = i64::from_le_bytes(self.read_slice::<8>(offset)?);
+            Ok((millis, nano_of_milli))
+        }
+    }
+
+    pub fn hash_code(&self) -> i32 {
+        hash_by_words(&self.data)
+    }
+
+    /// Build a BinaryRow from typed Datum values using `BinaryRowBuilder`.
+    pub fn from_datums(datums: &[(&crate::spec::Datum, 
&crate::spec::DataType)]) -> Option<Self> {
+        let arity = datums.len() as i32;
+        let mut builder = BinaryRowBuilder::new(arity);
+
+        for (pos, (datum, _data_type)) in datums.iter().enumerate() {

Review Comment:
   `from_datums` currently ignores `_data_type`, but Java Paimon's `BinaryRow` 
encoding depends on the declared logical type and precision.
   
   <img width="2032" height="1662" alt="Image" 
src="https://github.com/user-attachments/assets/bee6185f-2cfa-42c1-a5de-947ffa713c7d";
 />
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to