This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 6be82eb chore: improve datum, use Cow to reduce verbosity and avoid
Box::leak gymnastics (#139)
6be82eb is described below
commit 6be82eb694c4a49d2d540a5852614d2b1184eb20
Author: Keith Lee <[email protected]>
AuthorDate: Sun Jan 11 03:16:49 2026 +0000
chore: improve datum, use Cow to reduce verbosity and avoid Box::leak
gymnastics (#139)
---
bindings/cpp/src/types.rs | 6 +-
crates/fluss/src/row/binary/binary_writer.rs | 9 +-
.../src/row/compacted/compacted_row_reader.rs | 7 +-
crates/fluss/src/row/datum.rs | 95 +++++-----------------
crates/fluss/src/row/field_getter.rs | 2 +-
5 files changed, 32 insertions(+), 87 deletions(-)
diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs
index d95da14..fef73ce 100644
--- a/bindings/cpp/src/types.rs
+++ b/bindings/cpp/src/types.rs
@@ -25,6 +25,7 @@ use arrow::array::{
use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
use fcore::row::InternalRow;
use fluss as fcore;
+use std::borrow::Cow;
pub const DATA_TYPE_BOOLEAN: i32 = 1;
pub const DATA_TYPE_TINYINT: i32 = 2;
@@ -218,9 +219,8 @@ pub fn ffi_row_to_core(row: &ffi::FfiGenericRow) ->
fcore::row::GenericRow<'_> {
DATUM_TYPE_INT64 => Datum::Int64(field.i64_val),
DATUM_TYPE_FLOAT32 => Datum::Float32(field.f32_val.into()),
DATUM_TYPE_FLOAT64 => Datum::Float64(field.f64_val.into()),
- DATUM_TYPE_STRING => Datum::String(field.string_val.as_str()),
- // todo: avoid copy bytes for blob
- DATUM_TYPE_BYTES => Datum::Blob(field.bytes_val.clone().into()),
+ DATUM_TYPE_STRING =>
Datum::String(Cow::Borrowed(field.string_val.as_str())),
+ DATUM_TYPE_BYTES =>
Datum::Blob(Cow::Borrowed(field.bytes_val.as_slice())),
_ => Datum::Null,
};
generic_row.set_field(idx, datum);
diff --git a/crates/fluss/src/row/binary/binary_writer.rs
b/crates/fluss/src/row/binary/binary_writer.rs
index 44f10b6..9917c7b 100644
--- a/crates/fluss/src/row/binary/binary_writer.rs
+++ b/crates/fluss/src/row/binary/binary_writer.rs
@@ -170,17 +170,12 @@ impl InnerValueWriter {
writer.write_boolean(*v);
}
(InnerValueWriter::Binary, Datum::Blob(v)) => {
- writer.write_binary(v.as_ref(), v.len());
- }
- (InnerValueWriter::Binary, Datum::BorrowedBlob(v)) => {
- writer.write_binary(v.as_ref(), v.len());
+ let b = v.as_ref();
+ writer.write_binary(b, b.len());
}
(InnerValueWriter::Bytes, Datum::Blob(v)) => {
writer.write_bytes(v.as_ref());
}
- (InnerValueWriter::Bytes, Datum::BorrowedBlob(v)) => {
- writer.write_bytes(v.as_ref());
- }
(InnerValueWriter::TinyInt, Datum::Int8(v)) => {
writer.write_byte(*v as u8);
}
diff --git a/crates/fluss/src/row/compacted/compacted_row_reader.rs
b/crates/fluss/src/row/compacted/compacted_row_reader.rs
index 19afe88..00d94ad 100644
--- a/crates/fluss/src/row/compacted/compacted_row_reader.rs
+++ b/crates/fluss/src/row/compacted/compacted_row_reader.rs
@@ -16,6 +16,7 @@
// under the License.
use bytes::Bytes;
+use std::borrow::Cow;
use crate::{
metadata::DataType,
@@ -52,10 +53,12 @@ impl CompactedRowDeserializer {
DataType::Float(_) =>
Datum::Float32(reader.read_float().into()),
DataType::Double(_) =>
Datum::Float64(reader.read_double().into()),
// TODO: use read_char(length) in the future, but need to keep
compatibility
- DataType::Char(_) | DataType::String(_) =>
Datum::OwnedString(reader.read_string()),
+ DataType::Char(_) | DataType::String(_) => {
+ Datum::String(Cow::Owned(reader.read_string()))
+ }
// TODO: use read_binary(length) in the future, but need to
keep compatibility
DataType::Bytes(_) | DataType::Binary(_) => {
- Datum::Blob(reader.read_bytes().into_vec().into())
+ Datum::Blob(Cow::Owned(reader.read_bytes().into_vec()))
}
_ => panic!("unsupported DataType in
CompactedRowDeserializer"),
};
diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs
index c054e08..fa85ded 100644
--- a/crates/fluss/src/row/datum.rs
+++ b/crates/fluss/src/row/datum.rs
@@ -24,11 +24,9 @@ use arrow::array::{
use jiff::ToSpan;
use ordered_float::OrderedFloat;
use parse_display::Display;
-use ref_cast::RefCast;
use rust_decimal::Decimal;
-use serde::{Deserialize, Serialize};
-use std::fmt;
-use std::ops::Deref;
+use serde::Serialize;
+use std::borrow::Cow;
#[allow(dead_code)]
const THIRTY_YEARS_MICROSECONDS: i64 = 946_684_800_000_000;
@@ -52,14 +50,9 @@ pub enum Datum<'a> {
#[display("{0}")]
Float64(F64),
#[display("'{0}'")]
- String(&'a str),
- /// Owned string
- #[display("'{0}'")]
- OwnedString(String),
- #[display("{0}")]
- Blob(Blob),
+ String(Str<'a>),
#[display("{:?}")]
- BorrowedBlob(&'a [u8]),
+ Blob(Blob<'a>),
#[display("{0}")]
Decimal(Decimal),
#[display("{0}")]
@@ -78,7 +71,6 @@ impl Datum<'_> {
pub fn as_str(&self) -> &str {
match self {
Self::String(s) => s,
- Self::OwnedString(s) => s.as_str(),
_ => panic!("not a string: {self:?}"),
}
}
@@ -86,7 +78,6 @@ impl Datum<'_> {
pub fn as_blob(&self) -> &[u8] {
match self {
Self::Blob(blob) => blob.as_ref(),
- Self::BorrowedBlob(blob) => blob,
_ => panic!("not a blob: {self:?}"),
}
}
@@ -121,10 +112,19 @@ impl<'a> From<i16> for Datum<'a> {
}
}
+pub type Str<'a> = Cow<'a, str>;
+
+impl<'a> From<String> for Datum<'a> {
+ #[inline]
+ fn from(s: String) -> Self {
+ Datum::String(Cow::Owned(s))
+ }
+}
+
impl<'a> From<&'a str> for Datum<'a> {
#[inline]
fn from(s: &'a str) -> Datum<'a> {
- Datum::String(s)
+ Datum::String(Cow::Borrowed(s))
}
}
@@ -226,8 +226,7 @@ impl<'b, 'a: 'b> TryFrom<&'b Datum<'a>> for &'b str {
#[inline]
fn try_from(from: &'b Datum<'a>) -> std::result::Result<Self, Self::Error>
{
match from {
- Datum::String(i) => Ok(*i),
- Datum::OwnedString(s) => Ok(s.as_str()),
+ Datum::String(s) => Ok(s.as_ref()),
_ => Err(()),
}
}
@@ -295,10 +294,8 @@ impl Datum<'_> {
Datum::Int64(v) => append_value_to_arrow!(Int64Builder, *v),
Datum::Float32(v) => append_value_to_arrow!(Float32Builder,
v.into_inner()),
Datum::Float64(v) => append_value_to_arrow!(Float64Builder,
v.into_inner()),
- Datum::String(v) => append_value_to_arrow!(StringBuilder, *v),
- Datum::OwnedString(v) => append_value_to_arrow!(StringBuilder,
v.as_str()),
+ Datum::String(v) => append_value_to_arrow!(StringBuilder,
v.as_ref()),
Datum::Blob(v) => append_value_to_arrow!(BinaryBuilder,
v.as_ref()),
- Datum::BorrowedBlob(v) => append_value_to_arrow!(BinaryBuilder,
*v),
Datum::Decimal(_) | Datum::Date(_) | Datum::Timestamp(_) |
Datum::TimestampTz(_) => {
return Err(RowConvertError {
message: format!(
@@ -349,58 +346,6 @@ impl_to_arrow!(&str, StringBuilder);
pub type F32 = OrderedFloat<f32>;
pub type F64 = OrderedFloat<f64>;
-#[allow(dead_code)]
-pub type Str = Box<str>;
-
-#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize,
Default)]
-pub struct Blob(Box<[u8]>);
-
-impl Deref for Blob {
- type Target = BlobRef;
-
- fn deref(&self) -> &Self::Target {
- BlobRef::new(&self.0)
- }
-}
-
-impl BlobRef {
- pub fn new(bytes: &[u8]) -> &Self {
- // SAFETY: `&BlobRef` and `&[u8]` have the same layout.
- BlobRef::ref_cast(bytes)
- }
-}
-
-/// A slice of a blob.
-#[repr(transparent)]
-#[derive(PartialEq, Eq, PartialOrd, Ord, RefCast, Hash)]
-pub struct BlobRef([u8]);
-
-impl fmt::Debug for Blob {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(f, "{:?}", self.as_ref())
- }
-}
-
-impl fmt::Display for Blob {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(f, "{:?}", self.as_ref())
- }
-}
-
-impl AsRef<[u8]> for BlobRef {
- fn as_ref(&self) -> &[u8] {
- &self.0
- }
-}
-
-impl Deref for BlobRef {
- type Target = [u8];
-
- fn deref(&self) -> &Self::Target {
- &self.0
- }
-}
-
#[derive(PartialOrd, Ord, Display, PartialEq, Eq, Debug, Copy, Clone, Default,
Hash, Serialize)]
pub struct Date(i32);
@@ -410,15 +355,17 @@ pub struct Timestamp(i64);
#[derive(PartialOrd, Ord, Display, PartialEq, Eq, Debug, Copy, Clone, Default,
Hash, Serialize)]
pub struct TimestampLtz(i64);
-impl From<Vec<u8>> for Blob {
+pub type Blob<'a> = Cow<'a, [u8]>;
+
+impl<'a> From<Vec<u8>> for Datum<'a> {
fn from(vec: Vec<u8>) -> Self {
- Blob(vec.into())
+ Datum::Blob(Blob::from(vec))
}
}
impl<'a> From<&'a [u8]> for Datum<'a> {
fn from(bytes: &'a [u8]) -> Datum<'a> {
- Datum::BorrowedBlob(bytes)
+ Datum::Blob(Blob::from(bytes))
}
}
diff --git a/crates/fluss/src/row/field_getter.rs
b/crates/fluss/src/row/field_getter.rs
index 3a9cf0f..8e529e5 100644
--- a/crates/fluss/src/row/field_getter.rs
+++ b/crates/fluss/src/row/field_getter.rs
@@ -83,7 +83,7 @@ pub enum InnerFieldGetter {
impl InnerFieldGetter {
pub fn get_field<'a>(&self, row: &'a dyn InternalRow) -> Datum<'a> {
match self {
- InnerFieldGetter::Char { pos, len } =>
Datum::String(row.get_char(*pos, *len)),
+ InnerFieldGetter::Char { pos, len } =>
Datum::from(row.get_char(*pos, *len)),
InnerFieldGetter::String { pos } =>
Datum::from(row.get_string(*pos)),
InnerFieldGetter::Bool { pos } =>
Datum::from(row.get_boolean(*pos)),
InnerFieldGetter::Binary { pos, len } =>
Datum::from(row.get_binary(*pos, *len)),