This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 4a2e657 feat(blob): add BlobType groundwork and preserve DDL
semantics (#250)
4a2e657 is described below
commit 4a2e657c135b4268d21eb5c4acf186dd8e2a790a
Author: Zach <[email protected]>
AuthorDate: Thu Apr 16 11:16:32 2026 +0800
feat(blob): add BlobType groundwork and preserve DDL semantics (#250)
---
crates/integrations/datafusion/src/sql_handler.rs | 494 ++++++++++++---------
.../datafusion/tests/sql_handler_tests.rs | 36 +-
crates/paimon/src/arrow/format/parquet.rs | 1 +
crates/paimon/src/arrow/mod.rs | 15 +-
crates/paimon/src/spec/partition_utils.rs | 1 +
crates/paimon/src/spec/types.rs | 66 ++-
crates/paimon/src/table/data_evolution_writer.rs | 80 +++-
crates/paimon/src/table/table_write.rs | 74 ++-
crates/paimon/tests/fixtures/blob_type.json | 1 +
.../paimon/tests/fixtures/blob_type_nullable.json | 1 +
10 files changed, 555 insertions(+), 214 deletions(-)
diff --git a/crates/integrations/datafusion/src/sql_handler.rs
b/crates/integrations/datafusion/src/sql_handler.rs
index 78f64bb..b32b63f 100644
--- a/crates/integrations/datafusion/src/sql_handler.rs
+++ b/crates/integrations/datafusion/src/sql_handler.rs
@@ -44,10 +44,14 @@ use datafusion::sql::sqlparser::ast::{
use datafusion::sql::sqlparser::dialect::GenericDialect;
use datafusion::sql::sqlparser::parser::Parser;
use paimon::catalog::{Catalog, Identifier};
-use paimon::spec::SchemaChange;
+use paimon::spec::{
+ ArrayType as PaimonArrayType, BigIntType, BlobType, BooleanType, DataField
as PaimonDataField,
+ DataType as PaimonDataType, DateType, DecimalType, DoubleType, FloatType,
IntType,
+ LocalZonedTimestampType, MapType as PaimonMapType, RowType as
PaimonRowType, SchemaChange,
+ SmallIntType, TimestampType, TinyIntType, VarBinaryType, VarCharType,
+};
use crate::error::to_datafusion_error;
-use paimon::arrow::arrow_to_paimon_type;
/// Wraps a [`SessionContext`] and a Paimon [`Catalog`] to handle DDL
statements
/// that DataFusion does not natively support (e.g. ALTER TABLE).
@@ -136,15 +140,7 @@ impl PaimonSqlHandler {
// Columns
for col in &ct.columns {
- let arrow_type = sql_data_type_to_arrow(&col.data_type)?;
- let nullable = !col.options.iter().any(|opt| {
- matches!(
- opt.option,
- datafusion::sql::sqlparser::ast::ColumnOption::NotNull
- )
- });
- let paimon_type =
- arrow_to_paimon_type(&arrow_type,
nullable).map_err(to_datafusion_error)?;
+ let paimon_type = column_def_to_paimon_type(col)?;
builder = builder.column(col.name.value.clone(), paimon_type);
}
@@ -324,110 +320,149 @@ impl PaimonSqlHandler {
/// Convert a sqlparser [`ColumnDef`] to a Paimon [`SchemaChange::AddColumn`].
fn column_def_to_add_column(col: &ColumnDef) -> DFResult<SchemaChange> {
- let arrow_type = sql_data_type_to_arrow(&col.data_type)?;
- let nullable = !col.options.iter().any(|opt| {
- matches!(
- opt.option,
- datafusion::sql::sqlparser::ast::ColumnOption::NotNull
- )
- });
- let paimon_type = arrow_to_paimon_type(&arrow_type,
nullable).map_err(to_datafusion_error)?;
+ let paimon_type = column_def_to_paimon_type(col)?;
Ok(SchemaChange::add_column(
col.name.value.clone(),
paimon_type,
))
}
-/// Convert a sqlparser SQL data type to an Arrow data type.
-fn sql_data_type_to_arrow(
+fn column_def_to_paimon_type(col: &ColumnDef) -> DFResult<PaimonDataType> {
+ sql_data_type_to_paimon_type(&col.data_type, column_def_nullable(col))
+}
+
+fn column_def_nullable(col: &ColumnDef) -> bool {
+ !col.options.iter().any(|opt| {
+ matches!(
+ opt.option,
+ datafusion::sql::sqlparser::ast::ColumnOption::NotNull
+ )
+ })
+}
+
+/// Convert a sqlparser SQL data type to a Paimon data type.
+///
+/// DDL schema translation must use this function instead of going through
Arrow,
+/// because Arrow cannot preserve logical distinctions such as `BLOB` vs
`VARBINARY`.
+fn sql_data_type_to_paimon_type(
sql_type: &datafusion::sql::sqlparser::ast::DataType,
-) -> DFResult<ArrowDataType> {
- use datafusion::sql::sqlparser::ast::{ArrayElemTypeDef, DataType as
SqlType};
+ nullable: bool,
+) -> DFResult<PaimonDataType> {
+ use datafusion::sql::sqlparser::ast::{
+ ArrayElemTypeDef, DataType as SqlType, ExactNumberInfo, TimezoneInfo,
+ };
+
match sql_type {
- SqlType::Boolean => Ok(ArrowDataType::Boolean),
- SqlType::TinyInt(_) => Ok(ArrowDataType::Int8),
- SqlType::SmallInt(_) => Ok(ArrowDataType::Int16),
- SqlType::Int(_) | SqlType::Integer(_) => Ok(ArrowDataType::Int32),
- SqlType::BigInt(_) => Ok(ArrowDataType::Int64),
- SqlType::Float(_) => Ok(ArrowDataType::Float32),
- SqlType::Real => Ok(ArrowDataType::Float32),
- SqlType::Double(_) | SqlType::DoublePrecision =>
Ok(ArrowDataType::Float64),
- SqlType::Varchar(_) | SqlType::CharVarying(_) | SqlType::Text |
SqlType::String(_) => {
- Ok(ArrowDataType::Utf8)
+ SqlType::Boolean =>
Ok(PaimonDataType::Boolean(BooleanType::with_nullable(
+ nullable,
+ ))),
+ SqlType::TinyInt(_) =>
Ok(PaimonDataType::TinyInt(TinyIntType::with_nullable(
+ nullable,
+ ))),
+ SqlType::SmallInt(_) =>
Ok(PaimonDataType::SmallInt(SmallIntType::with_nullable(
+ nullable,
+ ))),
+ SqlType::Int(_) | SqlType::Integer(_) => {
+ Ok(PaimonDataType::Int(IntType::with_nullable(nullable)))
+ }
+ SqlType::BigInt(_) =>
Ok(PaimonDataType::BigInt(BigIntType::with_nullable(nullable))),
+ SqlType::Float(_) | SqlType::Real => {
+ Ok(PaimonDataType::Float(FloatType::with_nullable(nullable)))
+ }
+ SqlType::Double(_) | SqlType::DoublePrecision => {
+ Ok(PaimonDataType::Double(DoubleType::with_nullable(nullable)))
}
- SqlType::Char(_) | SqlType::Character(_) => Ok(ArrowDataType::Utf8),
- SqlType::Binary(_) | SqlType::Varbinary(_) | SqlType::Blob(_) |
SqlType::Bytea => {
- Ok(ArrowDataType::Binary)
+ SqlType::Varchar(_)
+ | SqlType::CharVarying(_)
+ | SqlType::Text
+ | SqlType::String(_)
+ | SqlType::Char(_)
+ | SqlType::Character(_) => Ok(PaimonDataType::VarChar(
+ VarCharType::with_nullable(nullable, VarCharType::MAX_LENGTH)
+ .map_err(to_datafusion_error)?,
+ )),
+ SqlType::Binary(_) | SqlType::Varbinary(_) | SqlType::Bytea => {
+ Ok(PaimonDataType::VarBinary(
+ VarBinaryType::try_new(nullable, VarBinaryType::MAX_LENGTH)
+ .map_err(to_datafusion_error)?,
+ ))
}
- SqlType::Date => Ok(ArrowDataType::Date32),
+ SqlType::Blob(_) =>
Ok(PaimonDataType::Blob(BlobType::with_nullable(nullable))),
+ SqlType::Date =>
Ok(PaimonDataType::Date(DateType::with_nullable(nullable))),
SqlType::Timestamp(precision, tz_info) => {
- use datafusion::sql::sqlparser::ast::TimezoneInfo;
- let unit = match precision {
- Some(0) => datafusion::arrow::datatypes::TimeUnit::Second,
- Some(1..=3) | None =>
datafusion::arrow::datatypes::TimeUnit::Millisecond,
- Some(4..=6) =>
datafusion::arrow::datatypes::TimeUnit::Microsecond,
- _ => datafusion::arrow::datatypes::TimeUnit::Nanosecond,
+ let precision = match precision {
+ Some(0) => 0,
+ Some(1..=3) | None => 3,
+ Some(4..=6) => 6,
+ _ => 9,
};
- let tz = match tz_info {
- TimezoneInfo::None | TimezoneInfo::WithoutTimeZone => None,
- _ => Some("UTC".into()),
- };
- Ok(ArrowDataType::Timestamp(unit, tz))
+ match tz_info {
+ TimezoneInfo::None | TimezoneInfo::WithoutTimeZone => {
+ Ok(PaimonDataType::Timestamp(
+ TimestampType::with_nullable(nullable, precision)
+ .map_err(to_datafusion_error)?,
+ ))
+ }
+ _ => Ok(PaimonDataType::LocalZonedTimestamp(
+ LocalZonedTimestampType::with_nullable(nullable, precision)
+ .map_err(to_datafusion_error)?,
+ )),
+ }
}
SqlType::Decimal(info) => {
- use datafusion::sql::sqlparser::ast::ExactNumberInfo;
- let (p, s) = match info {
- ExactNumberInfo::PrecisionAndScale(p, s) => (*p as u8, *s as
i8),
- ExactNumberInfo::Precision(p) => (*p as u8, 0),
+ let (precision, scale) = match info {
+ ExactNumberInfo::PrecisionAndScale(precision, scale) => {
+ (*precision as u32, *scale as u32)
+ }
+ ExactNumberInfo::Precision(precision) => (*precision as u32,
0),
ExactNumberInfo::None => (10, 0),
};
- Ok(ArrowDataType::Decimal128(p, s))
+ Ok(PaimonDataType::Decimal(
+ DecimalType::with_nullable(nullable, precision, scale)
+ .map_err(to_datafusion_error)?,
+ ))
}
SqlType::Array(elem_def) => {
- let elem_type = match elem_def {
+ let element_type = match elem_def {
ArrayElemTypeDef::AngleBracket(t)
| ArrayElemTypeDef::SquareBracket(t, _)
- | ArrayElemTypeDef::Parenthesis(t) =>
sql_data_type_to_arrow(t)?,
+ | ArrayElemTypeDef::Parenthesis(t) =>
sql_data_type_to_paimon_type(t, true)?,
ArrayElemTypeDef::None => {
return Err(DataFusionError::Plan(
"ARRAY type requires an element type".to_string(),
));
}
};
- Ok(ArrowDataType::List(Arc::new(Field::new(
- "element", elem_type, true,
- ))))
+ Ok(PaimonDataType::Array(PaimonArrayType::with_nullable(
+ nullable,
+ element_type,
+ )))
}
SqlType::Map(key_type, value_type) => {
- let key = sql_data_type_to_arrow(key_type)?;
- let value = sql_data_type_to_arrow(value_type)?;
- let entries = Field::new(
- "entries",
- ArrowDataType::Struct(
- vec![
- Field::new("key", key, false),
- Field::new("value", value, true),
- ]
- .into(),
- ),
- false,
- );
- Ok(ArrowDataType::Map(Arc::new(entries), false))
+ let key = sql_data_type_to_paimon_type(key_type, false)?;
+ let value = sql_data_type_to_paimon_type(value_type, true)?;
+ Ok(PaimonDataType::Map(PaimonMapType::with_nullable(
+ nullable, key, value,
+ )))
}
SqlType::Struct(fields, _) => {
- let arrow_fields: Vec<Field> = fields
+ let paimon_fields = fields
.iter()
- .map(|f| {
- let name = f
+ .enumerate()
+ .map(|(idx, field)| {
+ let name = field
.field_name
.as_ref()
.map(|n| n.value.clone())
.unwrap_or_default();
- let dt = sql_data_type_to_arrow(&f.field_type)?;
- Ok(Field::new(name, dt, true))
+ let data_type =
sql_data_type_to_paimon_type(&field.field_type, true)?;
+ Ok(PaimonDataField::new(idx as i32, name, data_type))
})
- .collect::<DFResult<_>>()?;
- Ok(ArrowDataType::Struct(arrow_fields.into()))
+ .collect::<DFResult<Vec<_>>>()?;
+ Ok(PaimonDataType::Row(PaimonRowType::with_nullable(
+ nullable,
+ paimon_fields,
+ )))
}
_ => Err(DataFusionError::Plan(format!(
"Unsupported SQL data type: {sql_type}"
@@ -494,9 +529,8 @@ mod tests {
use std::sync::Mutex;
use async_trait::async_trait;
- use datafusion::arrow::datatypes::TimeUnit;
use paimon::catalog::Database;
- use paimon::spec::Schema as PaimonSchema;
+ use paimon::spec::{DataType as PaimonDataType, Schema as PaimonSchema};
use paimon::table::Table;
// ==================== Mock Catalog ====================
@@ -619,56 +653,57 @@ mod tests {
PaimonSqlHandler::new(SessionContext::new(), catalog, "paimon")
}
- // ==================== sql_data_type_to_arrow tests ====================
+ fn assert_sql_type_to_paimon(
+ sql_type: datafusion::sql::sqlparser::ast::DataType,
+ expected: PaimonDataType,
+ ) {
+ assert_eq!(
+ sql_data_type_to_paimon_type(&sql_type, true).unwrap(),
+ expected
+ );
+ }
+
+ // ==================== sql_data_type_to_paimon_type tests
====================
#[test]
fn test_sql_type_boolean() {
use datafusion::sql::sqlparser::ast::DataType as SqlType;
- assert_eq!(
- sql_data_type_to_arrow(&SqlType::Boolean).unwrap(),
- ArrowDataType::Boolean
+ assert_sql_type_to_paimon(
+ SqlType::Boolean,
+ PaimonDataType::Boolean(BooleanType::new()),
);
}
#[test]
fn test_sql_type_integers() {
use datafusion::sql::sqlparser::ast::DataType as SqlType;
- assert_eq!(
- sql_data_type_to_arrow(&SqlType::TinyInt(None)).unwrap(),
- ArrowDataType::Int8
- );
- assert_eq!(
- sql_data_type_to_arrow(&SqlType::SmallInt(None)).unwrap(),
- ArrowDataType::Int16
- );
- assert_eq!(
- sql_data_type_to_arrow(&SqlType::Int(None)).unwrap(),
- ArrowDataType::Int32
+ assert_sql_type_to_paimon(
+ SqlType::TinyInt(None),
+ PaimonDataType::TinyInt(TinyIntType::new()),
);
- assert_eq!(
- sql_data_type_to_arrow(&SqlType::Integer(None)).unwrap(),
- ArrowDataType::Int32
+ assert_sql_type_to_paimon(
+ SqlType::SmallInt(None),
+ PaimonDataType::SmallInt(SmallIntType::new()),
);
- assert_eq!(
- sql_data_type_to_arrow(&SqlType::BigInt(None)).unwrap(),
- ArrowDataType::Int64
+ assert_sql_type_to_paimon(SqlType::Int(None),
PaimonDataType::Int(IntType::new()));
+ assert_sql_type_to_paimon(SqlType::Integer(None),
PaimonDataType::Int(IntType::new()));
+ assert_sql_type_to_paimon(
+ SqlType::BigInt(None),
+ PaimonDataType::BigInt(BigIntType::new()),
);
}
#[test]
fn test_sql_type_floats() {
use datafusion::sql::sqlparser::ast::{DataType as SqlType,
ExactNumberInfo};
- assert_eq!(
-
sql_data_type_to_arrow(&SqlType::Float(ExactNumberInfo::None)).unwrap(),
- ArrowDataType::Float32
+ assert_sql_type_to_paimon(
+ SqlType::Float(ExactNumberInfo::None),
+ PaimonDataType::Float(FloatType::new()),
);
- assert_eq!(
- sql_data_type_to_arrow(&SqlType::Real).unwrap(),
- ArrowDataType::Float32
- );
- assert_eq!(
- sql_data_type_to_arrow(&SqlType::DoublePrecision).unwrap(),
- ArrowDataType::Float64
+ assert_sql_type_to_paimon(SqlType::Real,
PaimonDataType::Float(FloatType::new()));
+ assert_sql_type_to_paimon(
+ SqlType::DoublePrecision,
+ PaimonDataType::Double(DoubleType::new()),
);
}
@@ -676,10 +711,11 @@ mod tests {
fn test_sql_type_string_variants() {
use datafusion::sql::sqlparser::ast::DataType as SqlType;
for sql_type in [SqlType::Varchar(None), SqlType::Text,
SqlType::String(None)] {
- assert_eq!(
- sql_data_type_to_arrow(&sql_type).unwrap(),
- ArrowDataType::Utf8,
- "failed for {sql_type:?}"
+ assert_sql_type_to_paimon(
+ sql_type.clone(),
+ PaimonDataType::VarChar(
+ VarCharType::with_nullable(true,
VarCharType::MAX_LENGTH).unwrap(),
+ ),
);
}
}
@@ -687,133 +723,120 @@ mod tests {
#[test]
fn test_sql_type_binary() {
use datafusion::sql::sqlparser::ast::DataType as SqlType;
- assert_eq!(
- sql_data_type_to_arrow(&SqlType::Bytea).unwrap(),
- ArrowDataType::Binary
+ assert_sql_type_to_paimon(
+ SqlType::Bytea,
+ PaimonDataType::VarBinary(
+ VarBinaryType::try_new(true,
VarBinaryType::MAX_LENGTH).unwrap(),
+ ),
);
}
#[test]
fn test_sql_type_date() {
use datafusion::sql::sqlparser::ast::DataType as SqlType;
- assert_eq!(
- sql_data_type_to_arrow(&SqlType::Date).unwrap(),
- ArrowDataType::Date32
- );
+ assert_sql_type_to_paimon(SqlType::Date,
PaimonDataType::Date(DateType::new()));
}
#[test]
fn test_sql_type_timestamp_default() {
use datafusion::sql::sqlparser::ast::{DataType as SqlType,
TimezoneInfo};
- let result = sql_data_type_to_arrow(&SqlType::Timestamp(None,
TimezoneInfo::None)).unwrap();
- assert_eq!(
- result,
- ArrowDataType::Timestamp(TimeUnit::Millisecond, None)
+ assert_sql_type_to_paimon(
+ SqlType::Timestamp(None, TimezoneInfo::None),
+ PaimonDataType::Timestamp(TimestampType::with_nullable(true,
3).unwrap()),
);
}
#[test]
fn test_sql_type_timestamp_with_precision() {
use datafusion::sql::sqlparser::ast::{DataType as SqlType,
TimezoneInfo};
- // precision 0 => Second
- assert_eq!(
- sql_data_type_to_arrow(&SqlType::Timestamp(Some(0),
TimezoneInfo::None)).unwrap(),
- ArrowDataType::Timestamp(TimeUnit::Second, None)
+ assert_sql_type_to_paimon(
+ SqlType::Timestamp(Some(0), TimezoneInfo::None),
+ PaimonDataType::Timestamp(TimestampType::with_nullable(true,
0).unwrap()),
);
- // precision 3 => Millisecond
- assert_eq!(
- sql_data_type_to_arrow(&SqlType::Timestamp(Some(3),
TimezoneInfo::None)).unwrap(),
- ArrowDataType::Timestamp(TimeUnit::Millisecond, None)
+ assert_sql_type_to_paimon(
+ SqlType::Timestamp(Some(3), TimezoneInfo::None),
+ PaimonDataType::Timestamp(TimestampType::with_nullable(true,
3).unwrap()),
);
- // precision 6 => Microsecond
- assert_eq!(
- sql_data_type_to_arrow(&SqlType::Timestamp(Some(6),
TimezoneInfo::None)).unwrap(),
- ArrowDataType::Timestamp(TimeUnit::Microsecond, None)
+ assert_sql_type_to_paimon(
+ SqlType::Timestamp(Some(6), TimezoneInfo::None),
+ PaimonDataType::Timestamp(TimestampType::with_nullable(true,
6).unwrap()),
);
- // precision 9 => Nanosecond
- assert_eq!(
- sql_data_type_to_arrow(&SqlType::Timestamp(Some(9),
TimezoneInfo::None)).unwrap(),
- ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)
+ assert_sql_type_to_paimon(
+ SqlType::Timestamp(Some(9), TimezoneInfo::None),
+ PaimonDataType::Timestamp(TimestampType::with_nullable(true,
9).unwrap()),
);
}
#[test]
fn test_sql_type_timestamp_with_tz() {
use datafusion::sql::sqlparser::ast::{DataType as SqlType,
TimezoneInfo};
- let result =
- sql_data_type_to_arrow(&SqlType::Timestamp(None,
TimezoneInfo::WithTimeZone)).unwrap();
- assert_eq!(
- result,
- ArrowDataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into()))
+ assert_sql_type_to_paimon(
+ SqlType::Timestamp(None, TimezoneInfo::WithTimeZone),
+ PaimonDataType::LocalZonedTimestamp(
+ LocalZonedTimestampType::with_nullable(true, 3).unwrap(),
+ ),
);
}
#[test]
fn test_sql_type_decimal() {
use datafusion::sql::sqlparser::ast::{DataType as SqlType,
ExactNumberInfo};
- assert_eq!(
-
sql_data_type_to_arrow(&SqlType::Decimal(ExactNumberInfo::PrecisionAndScale(18,
2)))
- .unwrap(),
- ArrowDataType::Decimal128(18, 2)
+ assert_sql_type_to_paimon(
+ SqlType::Decimal(ExactNumberInfo::PrecisionAndScale(18, 2)),
+ PaimonDataType::Decimal(DecimalType::with_nullable(true, 18,
2).unwrap()),
);
- assert_eq!(
-
sql_data_type_to_arrow(&SqlType::Decimal(ExactNumberInfo::Precision(10))).unwrap(),
- ArrowDataType::Decimal128(10, 0)
+ assert_sql_type_to_paimon(
+ SqlType::Decimal(ExactNumberInfo::Precision(10)),
+ PaimonDataType::Decimal(DecimalType::with_nullable(true, 10,
0).unwrap()),
);
- assert_eq!(
-
sql_data_type_to_arrow(&SqlType::Decimal(ExactNumberInfo::None)).unwrap(),
- ArrowDataType::Decimal128(10, 0)
+ assert_sql_type_to_paimon(
+ SqlType::Decimal(ExactNumberInfo::None),
+ PaimonDataType::Decimal(DecimalType::with_nullable(true, 10,
0).unwrap()),
);
}
#[test]
fn test_sql_type_unsupported() {
use datafusion::sql::sqlparser::ast::DataType as SqlType;
- assert!(sql_data_type_to_arrow(&SqlType::Regclass).is_err());
+ assert!(sql_data_type_to_paimon_type(&SqlType::Regclass,
true).is_err());
}
#[test]
fn test_sql_type_array() {
use datafusion::sql::sqlparser::ast::{ArrayElemTypeDef, DataType as
SqlType};
- let result =
sql_data_type_to_arrow(&SqlType::Array(ArrayElemTypeDef::AngleBracket(
- Box::new(SqlType::Int(None)),
- )))
- .unwrap();
- assert_eq!(
- result,
- ArrowDataType::List(Arc::new(Field::new("element",
ArrowDataType::Int32, true)))
+ assert_sql_type_to_paimon(
+
SqlType::Array(ArrayElemTypeDef::AngleBracket(Box::new(SqlType::Int(None)))),
+ PaimonDataType::Array(PaimonArrayType::with_nullable(
+ true,
+ PaimonDataType::Int(IntType::new()),
+ )),
);
}
#[test]
fn test_sql_type_array_no_element() {
use datafusion::sql::sqlparser::ast::{ArrayElemTypeDef, DataType as
SqlType};
-
assert!(sql_data_type_to_arrow(&SqlType::Array(ArrayElemTypeDef::None)).is_err());
+ assert!(
+
sql_data_type_to_paimon_type(&SqlType::Array(ArrayElemTypeDef::None),
true).is_err()
+ );
}
#[test]
fn test_sql_type_map() {
use datafusion::sql::sqlparser::ast::DataType as SqlType;
- let result = sql_data_type_to_arrow(&SqlType::Map(
- Box::new(SqlType::Varchar(None)),
- Box::new(SqlType::Int(None)),
- ))
- .unwrap();
- let expected = ArrowDataType::Map(
- Arc::new(Field::new(
- "entries",
- ArrowDataType::Struct(
- vec![
- Field::new("key", ArrowDataType::Utf8, false),
- Field::new("value", ArrowDataType::Int32, true),
- ]
- .into(),
+ assert_sql_type_to_paimon(
+ SqlType::Map(
+ Box::new(SqlType::Varchar(None)),
+ Box::new(SqlType::Int(None)),
+ ),
+ PaimonDataType::Map(PaimonMapType::with_nullable(
+ true,
+ PaimonDataType::VarChar(
+ VarCharType::with_nullable(false,
VarCharType::MAX_LENGTH).unwrap(),
),
- false,
+ PaimonDataType::Int(IntType::new()),
)),
- false,
);
- assert_eq!(result, expected);
}
#[test]
@@ -821,31 +844,35 @@ mod tests {
use datafusion::sql::sqlparser::ast::{
DataType as SqlType, Ident, StructBracketKind, StructField,
};
- let result = sql_data_type_to_arrow(&SqlType::Struct(
- vec![
- StructField {
- field_name: Some(Ident::new("name")),
- field_type: SqlType::Varchar(None),
- options: None,
- },
- StructField {
- field_name: Some(Ident::new("age")),
- field_type: SqlType::Int(None),
- options: None,
- },
- ],
- StructBracketKind::AngleBrackets,
- ))
- .unwrap();
- assert_eq!(
- result,
- ArrowDataType::Struct(
+ assert_sql_type_to_paimon(
+ SqlType::Struct(
vec![
- Field::new("name", ArrowDataType::Utf8, true),
- Field::new("age", ArrowDataType::Int32, true),
- ]
- .into()
- )
+ StructField {
+ field_name: Some(Ident::new("name")),
+ field_type: SqlType::Varchar(None),
+ options: None,
+ },
+ StructField {
+ field_name: Some(Ident::new("age")),
+ field_type: SqlType::Int(None),
+ options: None,
+ },
+ ],
+ StructBracketKind::AngleBrackets,
+ ),
+ PaimonDataType::Row(PaimonRowType::with_nullable(
+ true,
+ vec![
+ PaimonDataField::new(
+ 0,
+ "name".to_string(),
+ PaimonDataType::VarChar(
+ VarCharType::with_nullable(true,
VarCharType::MAX_LENGTH).unwrap(),
+ ),
+ ),
+ PaimonDataField::new(1, "age".to_string(),
PaimonDataType::Int(IntType::new())),
+ ],
+ )),
);
}
@@ -1040,6 +1067,30 @@ mod tests {
}
}
+ #[tokio::test]
+ async fn test_create_table_blob_type_preserved() {
+ let catalog = Arc::new(MockCatalog::new());
+ let handler = make_handler(catalog.clone());
+
+ handler
+ .sql("CREATE TABLE mydb.t1 (payload BLOB NOT NULL)")
+ .await
+ .unwrap();
+
+ let calls = catalog.take_calls();
+ assert_eq!(calls.len(), 1);
+ if let CatalogCall::CreateTable { schema, .. } = &calls[0] {
+ assert_eq!(schema.fields().len(), 1);
+ assert!(matches!(
+ schema.fields()[0].data_type(),
+ PaimonDataType::Blob(_)
+ ));
+ assert!(!schema.fields()[0].data_type().is_nullable());
+ } else {
+ panic!("expected CreateTable call");
+ }
+ }
+
#[tokio::test]
async fn test_alter_table_add_column() {
let catalog = Arc::new(MockCatalog::new());
@@ -1069,6 +1120,33 @@ mod tests {
}
}
+ #[tokio::test]
+ async fn test_alter_table_add_blob_column() {
+ let catalog = Arc::new(MockCatalog::new());
+ let handler = make_handler(catalog.clone());
+
+ handler
+ .sql("ALTER TABLE mydb.t1 ADD COLUMN payload BLOB")
+ .await
+ .unwrap();
+
+ let calls = catalog.take_calls();
+ assert_eq!(calls.len(), 1);
+ if let CatalogCall::AlterTable { changes, .. } = &calls[0] {
+ assert_eq!(changes.len(), 1);
+ assert!(matches!(
+ &changes[0],
+ SchemaChange::AddColumn {
+ field_name,
+ data_type,
+ ..
+ } if field_name == "payload" && matches!(data_type,
PaimonDataType::Blob(_))
+ ));
+ } else {
+ panic!("expected AlterTable call");
+ }
+ }
+
#[tokio::test]
async fn test_alter_table_drop_column() {
let catalog = Arc::new(MockCatalog::new());
diff --git a/crates/integrations/datafusion/tests/sql_handler_tests.rs
b/crates/integrations/datafusion/tests/sql_handler_tests.rs
index c3608b3..5e9105a 100644
--- a/crates/integrations/datafusion/tests/sql_handler_tests.rs
+++ b/crates/integrations/datafusion/tests/sql_handler_tests.rs
@@ -22,7 +22,7 @@ use std::sync::Arc;
use datafusion::catalog::CatalogProvider;
use datafusion::prelude::SessionContext;
use paimon::catalog::Identifier;
-use paimon::spec::{ArrayType, DataType, IntType, MapType, VarCharType};
+use paimon::spec::{ArrayType, BlobType, DataType, IntType, MapType,
VarCharType};
use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options};
use paimon_datafusion::{PaimonCatalogProvider, PaimonRelationPlanner,
PaimonSqlHandler};
use tempfile::TempDir;
@@ -147,6 +147,40 @@ async fn test_create_table() {
assert_eq!(schema.primary_keys(), &["id"]);
}
+#[tokio::test]
+async fn test_create_table_with_blob_type() {
+ let (_tmp, catalog) = create_test_env();
+ let handler = create_handler(catalog.clone());
+
+ catalog
+ .create_database("mydb", false, Default::default())
+ .await
+ .unwrap();
+
+ handler
+ .sql(
+ "CREATE TABLE paimon.mydb.assets (
+ id INT NOT NULL,
+ payload BLOB,
+ PRIMARY KEY (id)
+ )",
+ )
+ .await
+ .expect("CREATE TABLE with BLOB should succeed");
+
+ let table = catalog
+ .get_table(&Identifier::new("mydb", "assets"))
+ .await
+ .unwrap();
+ let schema = table.schema();
+ assert_eq!(schema.fields().len(), 2);
+ assert_eq!(schema.primary_keys(), &["id"]);
+ assert_eq!(
+ *schema.fields()[1].data_type(),
+ DataType::Blob(BlobType::new())
+ );
+}
+
#[tokio::test]
async fn test_create_table_with_partition() {
let (_tmp, catalog) = create_test_env();
diff --git a/crates/paimon/src/arrow/format/parquet.rs
b/crates/paimon/src/arrow/format/parquet.rs
index 74de8a2..0ff0a2c 100644
--- a/crates/paimon/src/arrow/format/parquet.rs
+++ b/crates/paimon/src/arrow/format/parquet.rs
@@ -755,6 +755,7 @@ fn literal_scalar_for_parquet_filter(
DataType::Time(_)
| DataType::Timestamp(_)
| DataType::LocalZonedTimestamp(_)
+ | DataType::Blob(_)
| DataType::Array(_)
| DataType::Map(_)
| DataType::Multiset(_)
diff --git a/crates/paimon/src/arrow/mod.rs b/crates/paimon/src/arrow/mod.rs
index 37f907b..50a3a68 100644
--- a/crates/paimon/src/arrow/mod.rs
+++ b/crates/paimon/src/arrow/mod.rs
@@ -39,7 +39,9 @@ pub fn paimon_type_to_arrow(dt: &PaimonDataType) ->
crate::Result<ArrowDataType>
PaimonDataType::Float(_) => ArrowDataType::Float32,
PaimonDataType::Double(_) => ArrowDataType::Float64,
PaimonDataType::VarChar(_) | PaimonDataType::Char(_) =>
ArrowDataType::Utf8,
- PaimonDataType::Binary(_) | PaimonDataType::VarBinary(_) =>
ArrowDataType::Binary,
+ PaimonDataType::Binary(_) | PaimonDataType::VarBinary(_) |
PaimonDataType::Blob(_) => {
+ ArrowDataType::Binary
+ }
PaimonDataType::Date(_) => ArrowDataType::Date32,
PaimonDataType::Time(_) =>
ArrowDataType::Time32(TimeUnit::Millisecond),
PaimonDataType::Timestamp(t) => {
@@ -341,6 +343,17 @@ mod tests {
}
}
+ #[test]
+ fn test_blob_type_maps_one_way_to_arrow_binary() {
+ let blob = PaimonDataType::Blob(BlobType::new());
+ let varbinary = PaimonDataType::VarBinary(
+ VarBinaryType::try_new(true, VarBinaryType::MAX_LENGTH).unwrap(),
+ );
+
+ assert_paimon_to_arrow(&blob, &ArrowDataType::Binary);
+ assert_arrow_to_paimon(&ArrowDataType::Binary, true, &varbinary);
+ }
+
#[test]
fn test_timestamp_roundtrip() {
// millisecond precision
diff --git a/crates/paimon/src/spec/partition_utils.rs
b/crates/paimon/src/spec/partition_utils.rs
index 5d05863..427ea35 100644
--- a/crates/paimon/src/spec/partition_utils.rs
+++ b/crates/paimon/src/spec/partition_utils.rs
@@ -265,6 +265,7 @@ fn format_partition_value(
| DataType::Double(_)
| DataType::Binary(_)
| DataType::VarBinary(_)
+ | DataType::Blob(_)
| DataType::Array(_)
| DataType::Map(_)
| DataType::Multiset(_)
diff --git a/crates/paimon/src/spec/types.rs b/crates/paimon/src/spec/types.rs
index d86c068..7e0f78c 100644
--- a/crates/paimon/src/spec/types.rs
+++ b/crates/paimon/src/spec/types.rs
@@ -76,6 +76,8 @@ pub enum DataType {
Binary(BinaryType),
/// Data type of a variable-length binary string (=a sequence of bytes).
VarBinary(VarBinaryType),
+ /// Data type of binary large object.
+ Blob(BlobType),
/// Data type of a fixed-length character string.
Char(CharType),
/// Data type of a variable-length character string.
@@ -117,6 +119,20 @@ impl DataType {
}
}
+ /// Returns whether this type is or contains (recursively) a [`BlobType`].
+ pub fn contains_blob_type(&self) -> bool {
+ match self {
+ DataType::Blob(_) => true,
+ DataType::Array(v) => v.element_type.contains_blob_type(),
+ DataType::Map(v) => {
+ v.key_type.contains_blob_type() ||
v.value_type.contains_blob_type()
+ }
+ DataType::Multiset(v) => v.element_type.contains_blob_type(),
+ DataType::Row(v) => v.fields.iter().any(|f|
f.data_type().contains_blob_type()),
+ _ => false,
+ }
+ }
+
/// Returns whether this type is nullable.
pub fn is_nullable(&self) -> bool {
match self {
@@ -130,6 +146,7 @@ impl DataType {
DataType::Float(v) => v.nullable,
DataType::Binary(v) => v.nullable,
DataType::VarBinary(v) => v.nullable,
+ DataType::Blob(v) => v.nullable,
DataType::Char(v) => v.nullable,
DataType::VarChar(v) => v.nullable,
DataType::Date(v) => v.nullable,
@@ -165,6 +182,7 @@ impl DataType {
DataType::VarBinary(v) => {
DataType::VarBinary(VarBinaryType::try_new(nullable,
v.length())?)
}
+ DataType::Blob(_) =>
DataType::Blob(BlobType::with_nullable(nullable)),
DataType::Char(v) =>
DataType::Char(CharType::with_nullable(nullable, v.length())?),
DataType::VarChar(v) => {
DataType::VarChar(VarCharType::with_nullable(nullable,
v.length())?)
@@ -386,6 +404,39 @@ impl BooleanType {
}
}
+/// BlobType for paimon.
+///
+/// Data type of binary large object.
+///
+/// Impl Reference:
<https://github.com/apache/paimon/blob/master/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java>.
+#[serde_as]
+#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
+#[serde(transparent)]
+pub struct BlobType {
+ #[serde_as(as = "FromInto<serde_utils::NullableType<serde_utils::BLOB>>")]
+ nullable: bool,
+}
+
+impl Default for BlobType {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl BlobType {
+ pub fn new() -> Self {
+ Self::with_nullable(true)
+ }
+
+ pub fn with_nullable(nullable: bool) -> Self {
+ Self { nullable }
+ }
+
+ pub fn family(&self) -> DataTypeFamily {
+ DataTypeFamily::PREDEFINED
+ }
+}
+
/// CharType for paimon.
///
/// Data type of a fixed-length character string.
@@ -1470,6 +1521,11 @@ mod serde_utils {
const NAME: &'static str = "BOOLEAN";
}
+ pub struct BLOB;
+ impl DataTypeName for BLOB {
+ const NAME: &'static str = "BLOB";
+ }
+
pub struct BINARY;
impl DataTypeName for BINARY {
const NAME: &'static str = "BINARY";
@@ -1655,7 +1711,10 @@ mod tests {
let content = std::fs::read(&path)
.unwrap_or_else(|err| panic!("fixtures {path:?} load failed:
{err}"));
- String::from_utf8(content).expect("fixtures content must be valid
utf8")
+ String::from_utf8(content)
+ .expect("fixtures content must be valid utf8")
+ .trim_end_matches(['\n', '\r'])
+ .to_string()
}
fn test_cases() -> Vec<(&'static str, DataType)> {
@@ -1696,6 +1755,11 @@ mod tests {
length: 22,
}),
),
+ ("blob_type", DataType::Blob(BlobType { nullable: false })),
+ (
+ "blob_type_nullable",
+ DataType::Blob(BlobType { nullable: true }),
+ ),
(
"boolean_type",
DataType::Boolean(BooleanType { nullable: false }),
diff --git a/crates/paimon/src/table/data_evolution_writer.rs
b/crates/paimon/src/table/data_evolution_writer.rs
index 9e2ffbb..9a7d07d 100644
--- a/crates/paimon/src/table/data_evolution_writer.rs
+++ b/crates/paimon/src/table/data_evolution_writer.rs
@@ -61,6 +61,14 @@ pub struct DataEvolutionWriter {
matched_batches: Vec<RecordBatch>,
}
+fn schema_contains_blob_type(table: &Table) -> bool {
+ table
+ .schema()
+ .fields()
+ .iter()
+ .any(|field| field.data_type().contains_blob_type())
+}
+
impl DataEvolutionWriter {
/// Create a new writer for the given table and update columns.
///
@@ -73,6 +81,14 @@ impl DataEvolutionWriter {
let schema = table.schema();
let core_options = CoreOptions::new(schema.options());
+ if schema_contains_blob_type(table) {
+ return Err(crate::Error::Unsupported {
+ message:
+ "MERGE INTO does not support BlobType yet; blob write path
is out of scope"
+ .to_string(),
+ });
+ }
+
if !core_options.data_evolution_enabled() {
return Err(crate::Error::Unsupported {
message:
@@ -470,6 +486,12 @@ impl DataEvolutionPartialWriter {
let schema = table.schema();
let core_options = CoreOptions::new(schema.options());
+ if schema_contains_blob_type(table) {
+ return Err(crate::Error::Unsupported {
+ message: "DataEvolutionPartialWriter does not support BlobType
yet".to_string(),
+ });
+ }
+
if !core_options.data_evolution_enabled() {
return Err(crate::Error::Unsupported {
message: "DataEvolutionPartialWriter requires
data-evolution.enabled = true"
@@ -586,7 +608,9 @@ mod tests {
use super::*;
use crate::catalog::Identifier;
use crate::io::FileIOBuilder;
- use crate::spec::{DataType, IntType, Schema, TableSchema, VarCharType};
+ use crate::spec::{
+ BlobType, DataField, DataType, IntType, RowType, Schema, TableSchema,
VarCharType,
+ };
use arrow_array::StringArray;
use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema
as ArrowSchema};
use std::sync::Arc;
@@ -640,6 +664,24 @@ mod tests {
TableSchema::new(0, &schema)
}
+ fn test_blob_data_evolution_schema() -> TableSchema {
+ let schema = Schema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column(
+ "payload",
+ DataType::Row(RowType::new(vec![DataField::new(
+ 1,
+ "blob".into(),
+ DataType::Blob(BlobType::new()),
+ )])),
+ )
+ .option("data-evolution.enabled", "true")
+ .option("row-tracking.enabled", "true")
+ .build()
+ .unwrap();
+ TableSchema::new(0, &schema)
+ }
+
fn test_table(file_io: &FileIO, table_path: &str) -> Table {
Table::new(
file_io.clone(),
@@ -650,6 +692,16 @@ mod tests {
)
}
+ fn test_blob_table(file_io: &FileIO, table_path: &str) -> Table {
+ Table::new(
+ file_io.clone(),
+ Identifier::new("default", "test_de_blob_table"),
+ table_path.to_string(),
+ test_blob_data_evolution_schema(),
+ None,
+ )
+ }
+
async fn setup_dirs(file_io: &FileIO, table_path: &str) {
file_io
.mkdirs(&format!("{table_path}/snapshot/"))
@@ -859,4 +911,30 @@ mod tests {
let result = DataEvolutionPartialWriter::new(&table,
vec!["id".to_string()]);
assert!(result.is_err());
}
+
+ #[test]
+ fn test_rejects_blob_data_evolution_writer() {
+ let file_io = test_file_io();
+ let table = test_blob_table(&file_io, "memory:/test_blob_de_writer");
+
+ let err = DataEvolutionWriter::new(&table, vec!["id".to_string()])
+ .err()
+ .unwrap();
+ assert!(
+ matches!(err, crate::Error::Unsupported { message } if
message.contains("BlobType"))
+ );
+ }
+
+ #[test]
+ fn test_rejects_blob_partial_writer() {
+ let file_io = test_file_io();
+ let table = test_blob_table(&file_io,
"memory:/test_blob_partial_writer");
+
+ let err = DataEvolutionPartialWriter::new(&table,
vec!["id".to_string()])
+ .err()
+ .unwrap();
+ assert!(
+ matches!(err, crate::Error::Unsupported { message } if
message.contains("BlobType"))
+ );
+ }
}
diff --git a/crates/paimon/src/table/table_write.rs
b/crates/paimon/src/table/table_write.rs
index be0bcb5..3d3cbf2 100644
--- a/crates/paimon/src/table/table_write.rs
+++ b/crates/paimon/src/table/table_write.rs
@@ -35,6 +35,12 @@ use std::sync::Arc;
type PartitionBucketKey = (Vec<u8>, i32);
+fn schema_contains_blob_type(fields: &[DataField]) -> bool {
+ fields
+ .iter()
+ .any(|field| field.data_type().contains_blob_type())
+}
+
/// TableWrite writes Arrow RecordBatches to Paimon data files.
///
/// Each (partition, bucket) pair gets its own `DataFileWriter` held in a
HashMap.
@@ -64,6 +70,14 @@ impl TableWrite {
let schema = table.schema();
let core_options = CoreOptions::new(schema.options());
+ if schema_contains_blob_type(schema.fields()) {
+ return Err(crate::Error::Unsupported {
+ message:
+ "TableWrite does not support BlobType yet; blob write path
is out of scope"
+ .to_string(),
+ });
+ }
+
if !schema.primary_keys().is_empty() {
return Err(crate::Error::Unsupported {
message: "TableWrite does not support tables with primary
keys".to_string(),
@@ -308,8 +322,8 @@ mod tests {
use crate::catalog::Identifier;
use crate::io::{FileIO, FileIOBuilder};
use crate::spec::{
- DataType, DecimalType, IntType, LocalZonedTimestampType, Schema,
TableSchema,
- TimestampType, VarCharType,
+ BlobType, DataField, DataType, DecimalType, IntType,
LocalZonedTimestampType, RowType,
+ Schema, TableSchema, TimestampType, VarCharType,
};
use crate::table::{SnapshotManager, TableCommit};
use arrow_array::Int32Array;
@@ -361,6 +375,30 @@ mod tests {
)
}
+ fn test_blob_table_schema() -> TableSchema {
+ let schema = Schema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column("payload", DataType::Blob(BlobType::new()))
+ .build()
+ .unwrap();
+ TableSchema::new(0, &schema)
+ }
+
+ fn test_nested_blob_table_schema() -> TableSchema {
+ let schema = Schema::builder()
+ .column(
+ "payload",
+ DataType::Row(RowType::new(vec![DataField::new(
+ 1,
+ "blob".into(),
+ DataType::Blob(BlobType::new()),
+ )])),
+ )
+ .build()
+ .unwrap();
+ TableSchema::new(0, &schema)
+ }
+
async fn setup_dirs(file_io: &FileIO, table_path: &str) {
file_io
.mkdirs(&format!("{table_path}/snapshot/"))
@@ -430,6 +468,38 @@ mod tests {
assert_eq!(snapshot.total_record_count(), Some(3));
}
+ #[test]
+ fn test_rejects_blob_table() {
+ let table = Table::new(
+ test_file_io(),
+ Identifier::new("default", "test_blob_table"),
+ "memory:/test_blob_table".to_string(),
+ test_blob_table_schema(),
+ None,
+ );
+
+ let err = TableWrite::new(&table).err().unwrap();
+ assert!(
+ matches!(err, crate::Error::Unsupported { message } if
message.contains("BlobType"))
+ );
+ }
+
+ #[test]
+ fn test_rejects_nested_blob_table() {
+ let table = Table::new(
+ test_file_io(),
+ Identifier::new("default", "test_nested_blob_table"),
+ "memory:/test_nested_blob_table".to_string(),
+ test_nested_blob_table_schema(),
+ None,
+ );
+
+ let err = TableWrite::new(&table).err().unwrap();
+ assert!(
+ matches!(err, crate::Error::Unsupported { message } if
message.contains("BlobType"))
+ );
+ }
+
#[tokio::test]
async fn test_write_partitioned() {
let file_io = test_file_io();
diff --git a/crates/paimon/tests/fixtures/blob_type.json
b/crates/paimon/tests/fixtures/blob_type.json
new file mode 100644
index 0000000..ba26b54
--- /dev/null
+++ b/crates/paimon/tests/fixtures/blob_type.json
@@ -0,0 +1 @@
+"BLOB NOT NULL"
diff --git a/crates/paimon/tests/fixtures/blob_type_nullable.json
b/crates/paimon/tests/fixtures/blob_type_nullable.json
new file mode 100644
index 0000000..5b71234
--- /dev/null
+++ b/crates/paimon/tests/fixtures/blob_type_nullable.json
@@ -0,0 +1 @@
+"BLOB"