Copilot commented on code in PR #175:
URL: https://github.com/apache/fluss-rust/pull/175#discussion_r2702235782
##########
crates/fluss/src/row/compacted/compacted_row_writer.rs:
##########
@@ -89,64 +108,250 @@ impl BinaryWriter for CompactedRowWriter {
self.buffer[byte_index] |= 1u8 << bit;
}
- fn write_boolean(&mut self, value: bool) {
+ fn write_boolean(&mut self, value: bool) -> Result<()> {
let b = if value { 1u8 } else { 0u8 };
- self.write_raw(&[b]);
+ self.write_raw(&[b])
}
- fn write_byte(&mut self, value: u8) {
- self.write_raw(&[value]);
+ fn write_byte(&mut self, value: u8) -> Result<()> {
+ self.write_raw(&[value])
}
- fn write_bytes(&mut self, value: &[u8]) {
- let len_i32 =
- i32::try_from(value.len()).expect("byte slice too large to encode
length as i32");
- self.write_int(len_i32);
- self.write_raw(value);
+ fn write_bytes(&mut self, value: &[u8]) -> Result<()> {
+ let len_i32 = i32::try_from(value.len()).map_err(|_|
Error::IllegalArgument {
+ message: format!(
+ "Byte slice too large to encode length as i32: {} bytes
exceeds i32::MAX",
+ value.len()
+ ),
+ })?;
+ self.write_int(len_i32)?;
+ self.write_raw(value)
}
- fn write_char(&mut self, value: &str, _length: usize) {
+ fn write_char(&mut self, value: &str, _length: usize) -> Result<()> {
// TODO: currently, we encoding CHAR(length) as the same with STRING,
the length info can be
// omitted and the bytes length should be enforced in the future.
- self.write_string(value);
+ self.write_string(value)
}
- fn write_string(&mut self, value: &str) {
- self.write_bytes(value.as_ref());
+ fn write_string(&mut self, value: &str) -> Result<()> {
+ self.write_bytes(value.as_ref())
}
- fn write_short(&mut self, value: i16) {
- self.write_raw(&value.to_ne_bytes());
+ fn write_short(&mut self, value: i16) -> Result<()> {
+ // Use native endianness to match Java's UnsafeUtils.putShort behavior
+ // Java uses sun.misc.Unsafe which writes in native byte order
(typically LE on x86/ARM)
+ self.write_raw(&value.to_ne_bytes())
}
- fn write_int(&mut self, value: i32) {
+ fn write_int(&mut self, value: i32) -> Result<()> {
self.ensure_capacity(Self::MAX_INT_SIZE);
let bytes_written =
write_unsigned_varint_to_slice(value as u32, &mut
self.buffer[self.position..]);
self.position += bytes_written;
+ Ok(())
}
- fn write_long(&mut self, value: i64) {
+ fn write_long(&mut self, value: i64) -> Result<()> {
self.ensure_capacity(Self::MAX_LONG_SIZE);
let bytes_written =
write_unsigned_varint_u64_to_slice(value as u64, &mut
self.buffer[self.position..]);
self.position += bytes_written;
+ Ok(())
}
- fn write_float(&mut self, value: f32) {
- self.write_raw(&value.to_ne_bytes());
+
+ fn write_float(&mut self, value: f32) -> Result<()> {
+ // Use native endianness to match Java's UnsafeUtils.putFloat behavior
+ self.write_raw(&value.to_ne_bytes())
}
- fn write_double(&mut self, value: f64) {
- self.write_raw(&value.to_ne_bytes());
+ fn write_double(&mut self, value: f64) -> Result<()> {
+ // Use native endianness to match Java's UnsafeUtils.putDouble behavior
+ self.write_raw(&value.to_ne_bytes())
}
- fn write_binary(&mut self, bytes: &[u8], length: usize) {
+ fn write_binary(&mut self, bytes: &[u8], length: usize) -> Result<()> {
// TODO: currently, we encoding BINARY(length) as the same with BYTES,
the length info can
// be omitted and the bytes length should be enforced in the future.
- self.write_bytes(&bytes[..length.min(bytes.len())]);
+ self.write_bytes(&bytes[..length.min(bytes.len())])
}
fn complete(&mut self) {
// do nothing
}
+
+ fn write_decimal(
+ &mut self,
+ value: &bigdecimal::BigDecimal,
+ precision: u32,
+ scale: u32,
+ ) -> Result<()> {
+ const MAX_COMPACT_PRECISION: u32 = 18;
+ use bigdecimal::rounding::RoundingMode;
+
+ // Step 1 (Java: Decimal.fromBigDecimal): rescale + validate precision
+ // bd = bd.setScale(scale, RoundingMode.HALF_UP);
+ let scaled = value.with_scale_round(scale as i64,
RoundingMode::HalfUp);
+
+ // In bigdecimal, after scaling, the "unscaled value" is exactly the
bigint mantissa.
+ // That corresponds to Java:
scaled.movePointRight(scale).toBigIntegerExact().
+ let (unscaled, exp) = scaled.as_bigint_and_exponent();
+
+ // Sanity check
+ debug_assert_eq!(
+ exp, scale as i64,
+ "Scaled decimal exponent ({}) != expected scale ({})",
+ exp, scale
+ );
+
+ // Java validates: if (bd.precision() > precision) return null;
+ // Java BigDecimal.precision() == number of base-10 digits in the
unscaled value (sign ignored),
+ // with 0 having precision 1, and trailing zeros stripped.
+ let prec = java_decimal_precision(&unscaled);
+ if prec > precision as usize {
+ return Err(Error::IllegalArgument {
+ message: format!(
+ "Decimal precision overflow after rescale: scaled={} has
{} digits but precision is {} (orig={})",
+ scaled, prec, precision, value
+ ),
+ });
+ }
+
+ // Step 2 (Java: CompactedRowWriter.writeDecimal): serialize
precomputed unscaled representation
+ if precision <= MAX_COMPACT_PRECISION {
+ let unscaled_i64 = i64::try_from(&unscaled).map_err(|_|
Error::IllegalArgument {
+ message: format!(
+ "Decimal mantissa exceeds i64 range for compact precision
{}: unscaled={} (scaled={}, orig={})",
+ precision, unscaled, scaled, value
+ ),
+ })?;
+ self.write_long(unscaled_i64)
+ } else {
+ // Java: BigInteger.toByteArray() (two's complement big-endian,
minimal length)
+ let bytes = unscaled.to_signed_bytes_be();
+ self.write_bytes(&bytes)
+ }
Review Comment:
The `java_decimal_precision` logic is duplicated in
`crates/fluss/src/row/column.rs` (lines 189-198). Consider extracting this into
a shared utility function to avoid code duplication and ensure consistency.
This same logic appears in at least two places in the codebase.
##########
crates/fluss/src/row/column.rs:
##########
@@ -126,6 +126,105 @@ impl InternalRow for ColumnarRow {
.value(self.row_id)
}
+ fn get_decimal(&self, pos: usize, precision: usize, scale: usize) ->
bigdecimal::BigDecimal {
+ use arrow::datatypes::DataType;
+
+ let column = self.record_batch.column(pos);
+ let array = column
+ .as_any()
+ .downcast_ref::<Decimal128Array>()
+ .unwrap_or_else(|| {
+ panic!(
+ "Expected Decimal128Array at column {}, found: {:?}",
+ pos,
+ column.data_type()
+ )
+ });
+
+ // Null check: InternalRow trait doesn't return Result, so this would
be UB
+ debug_assert!(
+ !array.is_null(self.row_id),
+ "get_decimal called on null value at pos {} row {}",
+ pos,
+ self.row_id
+ );
+
+ // Read precision/scale from schema field metadata (version-safe
approach)
+ let schema = self.record_batch.schema();
+ let field = schema.field(pos);
+ let (arrow_precision, arrow_scale) = match field.data_type() {
+ DataType::Decimal128(p, s) => (*p as usize, *s as i64),
+ dt => panic!(
+ "Expected Decimal128 data type at column {}, found: {:?}",
+ pos, dt
+ ),
+ };
+
+ // Validate Arrow precision matches schema (scale may differ due to
schema evolution)
+ debug_assert_eq!(
+ arrow_precision, precision,
+ "Arrow Decimal128 precision ({}) doesn't match schema precision
({})",
+ arrow_precision, precision
+ );
+
+ let i128_val = array.value(self.row_id);
+
+ // Construct BigDecimal with Arrow's scale
+ let bd = bigdecimal::BigDecimal::new(
+ bigdecimal::num_bigint::BigInt::from(i128_val),
+ arrow_scale,
+ );
+
+ // Rescale if needed (matches Java's Decimal.fromBigDecimal behavior)
+ let result = if arrow_scale != scale as i64 {
+ use bigdecimal::rounding::RoundingMode;
+ bd.with_scale_round(scale as i64, RoundingMode::HalfUp)
+ } else {
+ bd
+ };
+
+ // Validate precision after rescale (matches Java: if (bd.precision()
> precision) throw)
+ // Use Java's precision rules: strip trailing zeros, count significant
digits
+ let (unscaled, _) = result.as_bigint_and_exponent();
+ let actual_precision = {
+ use bigdecimal::num_traits::Zero;
+ if unscaled.is_zero() {
+ 1
+ } else {
+ let s = unscaled.magnitude().to_str_radix(10);
+ let trimmed = s.trim_end_matches('0');
+ trimmed.len()
+ }
+ };
+
+ if actual_precision > precision {
+ panic!(
+ "Decimal precision overflow at column {} row {}: value {} has
{} digits but precision is {}",
+ pos, self.row_id, result, actual_precision, precision
+ );
+ }
+
+ result
+ }
Review Comment:
The precision validation logic (lines 186-198) is duplicated from the
`write_decimal` method in `compacted_row_writer.rs`. Consider extracting the
shared precision calculation logic into a common utility function to avoid
duplication and ensure consistency across the codebase.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]