blackmwk commented on code in PR #2301:
URL: https://github.com/apache/iceberg-rust/pull/2301#discussion_r3026246474
##########
crates/iceberg/src/arrow/reader.rs:
##########
@@ -1232,6 +1255,203 @@ fn add_fallback_field_ids_to_arrow_schema(arrow_schema:
&ArrowSchemaRef) -> Arc<
))
}
+/// Coerce Arrow schema types for INT96 columns to match the Iceberg table
schema.
+///
+/// arrow-rs defaults INT96 to `Timestamp(Nanosecond)`, which overflows i64
for dates outside
+/// ~1677-2262. We use arrow-rs's schema hint mechanism to read INT96 at the
resolution
+/// specified by the Iceberg schema (`timestamp` → microsecond, `timestamp_ns`
→ nanosecond).
+///
+/// Iceberg Java handles this differently: it bypasses parquet-mr with a
custom column reader
+/// (`GenericParquetReaders.TimestampInt96Reader`). We achieve the same result
via schema hints.
+///
+/// References:
+/// - Iceberg spec primitive types:
<https://iceberg.apache.org/spec/#primitive-types>
+/// - arrow-rs schema hint support:
<https://github.com/apache/arrow-rs/pull/7285>
+fn coerce_int96_timestamps(
+ arrow_schema: &ArrowSchemaRef,
+ iceberg_schema: &Schema,
+) -> Option<Arc<ArrowSchema>> {
+ let mut visitor = Int96CoercionVisitor::new(iceberg_schema);
+ let coerced = visit_schema(arrow_schema, &mut visitor).ok()?;
+ if visitor.changed {
+ Some(Arc::new(coerced))
+ } else {
+ None
+ }
+}
+
+/// Visitor that coerces `Timestamp(Nanosecond)` Arrow fields to the resolution
+/// indicated by the Iceberg schema. Follows the same pattern as
`MetadataStripVisitor`.
+struct Int96CoercionVisitor<'a> {
+ iceberg_schema: &'a Schema,
+ field_stack: Vec<Field>,
+ changed: bool,
+}
+
+impl<'a> Int96CoercionVisitor<'a> {
+ fn new(iceberg_schema: &'a Schema) -> Self {
+ Self {
+ iceberg_schema,
+ field_stack: Vec::new(),
+ changed: false,
+ }
+ }
+
+ /// Determine the target TimeUnit for a Timestamp(Nanosecond) field based
on the
+ /// Iceberg schema. Falls back to microsecond when field IDs are
unavailable,
+ /// matching Iceberg Java behavior.
+ fn target_unit(&self, field: &Field) -> Option<TimeUnit> {
+ if !matches!(
+ field.data_type(),
+ DataType::Timestamp(TimeUnit::Nanosecond, _)
+ ) {
+ return None;
+ }
+
+ let target = field
+ .metadata()
+ .get(PARQUET_FIELD_ID_META_KEY)
+ .and_then(|id_str| id_str.parse::<i32>().ok())
+ .and_then(|field_id| self.iceberg_schema.field_by_id(field_id))
+ .and_then(|f| match &*f.field_type {
+ Type::Primitive(PrimitiveType::Timestamp |
PrimitiveType::Timestamptz) => {
+ Some(TimeUnit::Microsecond)
+ }
+ Type::Primitive(PrimitiveType::TimestampNs |
PrimitiveType::TimestamptzNs) => {
+ Some(TimeUnit::Nanosecond)
+ }
+ _ => None,
+ })
+ // Iceberg Java reads INT96 as microseconds by default
+ .unwrap_or(TimeUnit::Microsecond);
+
+ if target == TimeUnit::Nanosecond {
+ None
+ } else {
+ Some(target)
+ }
+ }
+}
+
+impl ArrowSchemaVisitor for Int96CoercionVisitor<'_> {
+ type T = Field;
+ type U = ArrowSchema;
+
+ fn before_field(&mut self, field: &Field) -> Result<()> {
+ self.field_stack.push(field.as_ref().clone());
+ Ok(())
+ }
+
+ fn before_list_element(&mut self, field: &Field) -> Result<()> {
+ self.field_stack.push(field.as_ref().clone());
+ Ok(())
+ }
+
+ fn before_map_key(&mut self, field: &Field) -> Result<()> {
+ self.field_stack.push(field.as_ref().clone());
+ Ok(())
+ }
+
+ fn before_map_value(&mut self, field: &Field) -> Result<()> {
+ self.field_stack.push(field.as_ref().clone());
+ Ok(())
+ }
+
+ fn schema(&mut self, schema: &ArrowSchema, values: Vec<Self::T>) ->
Result<Self::U> {
Review Comment:
```suggestion
fn schema(&mut self, schema: &ArrowSchema, values: Vec<Field>) ->
Result<ArrowSchema> {
```
Please don't use self type here, it's difficult to read.
##########
crates/iceberg/src/arrow/reader.rs:
##########
@@ -1232,6 +1255,203 @@ fn add_fallback_field_ids_to_arrow_schema(arrow_schema:
&ArrowSchemaRef) -> Arc<
))
}
+/// Coerce Arrow schema types for INT96 columns to match the Iceberg table
schema.
+///
+/// arrow-rs defaults INT96 to `Timestamp(Nanosecond)`, which overflows i64
for dates outside
+/// ~1677-2262. We use arrow-rs's schema hint mechanism to read INT96 at the
resolution
+/// specified by the Iceberg schema (`timestamp` → microsecond, `timestamp_ns`
→ nanosecond).
+///
+/// Iceberg Java handles this differently: it bypasses parquet-mr with a
custom column reader
+/// (`GenericParquetReaders.TimestampInt96Reader`). We achieve the same result
via schema hints.
+///
+/// References:
+/// - Iceberg spec primitive types:
<https://iceberg.apache.org/spec/#primitive-types>
+/// - arrow-rs schema hint support:
<https://github.com/apache/arrow-rs/pull/7285>
+fn coerce_int96_timestamps(
+ arrow_schema: &ArrowSchemaRef,
+ iceberg_schema: &Schema,
+) -> Option<Arc<ArrowSchema>> {
+ let mut visitor = Int96CoercionVisitor::new(iceberg_schema);
+ let coerced = visit_schema(arrow_schema, &mut visitor).ok()?;
+ if visitor.changed {
+ Some(Arc::new(coerced))
+ } else {
+ None
+ }
+}
+
+/// Visitor that coerces `Timestamp(Nanosecond)` Arrow fields to the resolution
+/// indicated by the Iceberg schema. Follows the same pattern as
`MetadataStripVisitor`.
+struct Int96CoercionVisitor<'a> {
+ iceberg_schema: &'a Schema,
+ field_stack: Vec<Field>,
Review Comment:
This is too heavy, we should store `FieldRef`. We may need to change
ArrowSchemaVisitor's method signature, please create an issue to track it.
##########
crates/iceberg/src/arrow/reader.rs:
##########
@@ -1232,6 +1255,203 @@ fn add_fallback_field_ids_to_arrow_schema(arrow_schema:
&ArrowSchemaRef) -> Arc<
))
}
+/// Coerce Arrow schema types for INT96 columns to match the Iceberg table
schema.
+///
+/// arrow-rs defaults INT96 to `Timestamp(Nanosecond)`, which overflows i64
for dates outside
+/// ~1677-2262. We use arrow-rs's schema hint mechanism to read INT96 at the
resolution
+/// specified by the Iceberg schema (`timestamp` → microsecond, `timestamp_ns`
→ nanosecond).
+///
+/// Iceberg Java handles this differently: it bypasses parquet-mr with a
custom column reader
+/// (`GenericParquetReaders.TimestampInt96Reader`). We achieve the same result
via schema hints.
+///
+/// References:
+/// - Iceberg spec primitive types:
<https://iceberg.apache.org/spec/#primitive-types>
+/// - arrow-rs schema hint support:
<https://github.com/apache/arrow-rs/pull/7285>
+fn coerce_int96_timestamps(
+ arrow_schema: &ArrowSchemaRef,
+ iceberg_schema: &Schema,
+) -> Option<Arc<ArrowSchema>> {
+ let mut visitor = Int96CoercionVisitor::new(iceberg_schema);
+ let coerced = visit_schema(arrow_schema, &mut visitor).ok()?;
+ if visitor.changed {
+ Some(Arc::new(coerced))
+ } else {
+ None
+ }
+}
+
+/// Visitor that coerces `Timestamp(Nanosecond)` Arrow fields to the resolution
+/// indicated by the Iceberg schema. Follows the same pattern as
`MetadataStripVisitor`.
+struct Int96CoercionVisitor<'a> {
+ iceberg_schema: &'a Schema,
+ field_stack: Vec<Field>,
+ changed: bool,
+}
+
+impl<'a> Int96CoercionVisitor<'a> {
+ fn new(iceberg_schema: &'a Schema) -> Self {
+ Self {
+ iceberg_schema,
+ field_stack: Vec::new(),
+ changed: false,
+ }
+ }
+
+ /// Determine the target TimeUnit for a Timestamp(Nanosecond) field based
on the
+ /// Iceberg schema. Falls back to microsecond when field IDs are
unavailable,
+ /// matching Iceberg Java behavior.
+ fn target_unit(&self, field: &Field) -> Option<TimeUnit> {
+ if !matches!(
+ field.data_type(),
+ DataType::Timestamp(TimeUnit::Nanosecond, _)
+ ) {
+ return None;
+ }
+
+ let target = field
+ .metadata()
+ .get(PARQUET_FIELD_ID_META_KEY)
+ .and_then(|id_str| id_str.parse::<i32>().ok())
+ .and_then(|field_id| self.iceberg_schema.field_by_id(field_id))
+ .and_then(|f| match &*f.field_type {
+ Type::Primitive(PrimitiveType::Timestamp |
PrimitiveType::Timestamptz) => {
+ Some(TimeUnit::Microsecond)
+ }
+ Type::Primitive(PrimitiveType::TimestampNs |
PrimitiveType::TimestamptzNs) => {
+ Some(TimeUnit::Nanosecond)
+ }
+ _ => None,
+ })
+ // Iceberg Java reads INT96 as microseconds by default
+ .unwrap_or(TimeUnit::Microsecond);
+
+ if target == TimeUnit::Nanosecond {
+ None
+ } else {
+ Some(target)
+ }
+ }
+}
+
+impl ArrowSchemaVisitor for Int96CoercionVisitor<'_> {
+ type T = Field;
Review Comment:
Should this be `FieldRef`?
##########
crates/iceberg/src/arrow/reader.rs:
##########
@@ -1232,6 +1255,203 @@ fn add_fallback_field_ids_to_arrow_schema(arrow_schema:
&ArrowSchemaRef) -> Arc<
))
}
+/// Coerce Arrow schema types for INT96 columns to match the Iceberg table
schema.
+///
+/// arrow-rs defaults INT96 to `Timestamp(Nanosecond)`, which overflows i64
for dates outside
+/// ~1677-2262. We use arrow-rs's schema hint mechanism to read INT96 at the
resolution
+/// specified by the Iceberg schema (`timestamp` → microsecond, `timestamp_ns`
→ nanosecond).
+///
+/// Iceberg Java handles this differently: it bypasses parquet-mr with a
custom column reader
+/// (`GenericParquetReaders.TimestampInt96Reader`). We achieve the same result
via schema hints.
+///
+/// References:
+/// - Iceberg spec primitive types:
<https://iceberg.apache.org/spec/#primitive-types>
+/// - arrow-rs schema hint support:
<https://github.com/apache/arrow-rs/pull/7285>
+fn coerce_int96_timestamps(
+ arrow_schema: &ArrowSchemaRef,
+ iceberg_schema: &Schema,
+) -> Option<Arc<ArrowSchema>> {
+ let mut visitor = Int96CoercionVisitor::new(iceberg_schema);
+ let coerced = visit_schema(arrow_schema, &mut visitor).ok()?;
+ if visitor.changed {
+ Some(Arc::new(coerced))
+ } else {
+ None
+ }
+}
+
+/// Visitor that coerces `Timestamp(Nanosecond)` Arrow fields to the resolution
+/// indicated by the Iceberg schema. Follows the same pattern as
`MetadataStripVisitor`.
+struct Int96CoercionVisitor<'a> {
+ iceberg_schema: &'a Schema,
+ field_stack: Vec<Field>,
+ changed: bool,
+}
+
+impl<'a> Int96CoercionVisitor<'a> {
+ fn new(iceberg_schema: &'a Schema) -> Self {
+ Self {
+ iceberg_schema,
+ field_stack: Vec::new(),
+ changed: false,
+ }
+ }
+
+ /// Determine the target TimeUnit for a Timestamp(Nanosecond) field based
on the
+ /// Iceberg schema. Falls back to microsecond when field IDs are
unavailable,
+ /// matching Iceberg Java behavior.
+ fn target_unit(&self, field: &Field) -> Option<TimeUnit> {
+ if !matches!(
+ field.data_type(),
+ DataType::Timestamp(TimeUnit::Nanosecond, _)
+ ) {
+ return None;
+ }
+
+ let target = field
+ .metadata()
+ .get(PARQUET_FIELD_ID_META_KEY)
+ .and_then(|id_str| id_str.parse::<i32>().ok())
+ .and_then(|field_id| self.iceberg_schema.field_by_id(field_id))
+ .and_then(|f| match &*f.field_type {
+ Type::Primitive(PrimitiveType::Timestamp |
PrimitiveType::Timestamptz) => {
+ Some(TimeUnit::Microsecond)
+ }
+ Type::Primitive(PrimitiveType::TimestampNs |
PrimitiveType::TimestamptzNs) => {
+ Some(TimeUnit::Nanosecond)
+ }
+ _ => None,
+ })
+ // Iceberg Java reads INT96 as microseconds by default
+ .unwrap_or(TimeUnit::Microsecond);
+
+ if target == TimeUnit::Nanosecond {
+ None
+ } else {
+ Some(target)
+ }
+ }
+}
+
+impl ArrowSchemaVisitor for Int96CoercionVisitor<'_> {
+ type T = Field;
+ type U = ArrowSchema;
+
+ fn before_field(&mut self, field: &Field) -> Result<()> {
+ self.field_stack.push(field.as_ref().clone());
+ Ok(())
+ }
+
+ fn before_list_element(&mut self, field: &Field) -> Result<()> {
+ self.field_stack.push(field.as_ref().clone());
+ Ok(())
+ }
+
+ fn before_map_key(&mut self, field: &Field) -> Result<()> {
+ self.field_stack.push(field.as_ref().clone());
+ Ok(())
+ }
+
+ fn before_map_value(&mut self, field: &Field) -> Result<()> {
+ self.field_stack.push(field.as_ref().clone());
+ Ok(())
+ }
+
+ fn schema(&mut self, schema: &ArrowSchema, values: Vec<Self::T>) ->
Result<Self::U> {
Review Comment:
Please change all methods.
##########
crates/iceberg/src/arrow/reader.rs:
##########
@@ -1232,6 +1255,203 @@ fn add_fallback_field_ids_to_arrow_schema(arrow_schema:
&ArrowSchemaRef) -> Arc<
))
}
+/// Coerce Arrow schema types for INT96 columns to match the Iceberg table
schema.
+///
+/// arrow-rs defaults INT96 to `Timestamp(Nanosecond)`, which overflows i64
for dates outside
+/// ~1677-2262. We use arrow-rs's schema hint mechanism to read INT96 at the
resolution
+/// specified by the Iceberg schema (`timestamp` → microsecond, `timestamp_ns`
→ nanosecond).
+///
+/// Iceberg Java handles this differently: it bypasses parquet-mr with a
custom column reader
+/// (`GenericParquetReaders.TimestampInt96Reader`). We achieve the same result
via schema hints.
+///
+/// References:
+/// - Iceberg spec primitive types:
<https://iceberg.apache.org/spec/#primitive-types>
+/// - arrow-rs schema hint support:
<https://github.com/apache/arrow-rs/pull/7285>
+fn coerce_int96_timestamps(
+ arrow_schema: &ArrowSchemaRef,
+ iceberg_schema: &Schema,
+) -> Option<Arc<ArrowSchema>> {
+ let mut visitor = Int96CoercionVisitor::new(iceberg_schema);
+ let coerced = visit_schema(arrow_schema, &mut visitor).ok()?;
+ if visitor.changed {
+ Some(Arc::new(coerced))
+ } else {
+ None
+ }
+}
+
+/// Visitor that coerces `Timestamp(Nanosecond)` Arrow fields to the resolution
+/// indicated by the Iceberg schema. Follows the same pattern as
`MetadataStripVisitor`.
+struct Int96CoercionVisitor<'a> {
Review Comment:
Please move this struct and its ut to a standalone module. This file is
already too large.
##########
crates/iceberg/src/arrow/reader.rs:
##########
@@ -1232,6 +1255,203 @@ fn add_fallback_field_ids_to_arrow_schema(arrow_schema:
&ArrowSchemaRef) -> Arc<
))
}
+/// Coerce Arrow schema types for INT96 columns to match the Iceberg table
schema.
+///
+/// arrow-rs defaults INT96 to `Timestamp(Nanosecond)`, which overflows i64
for dates outside
+/// ~1677-2262. We use arrow-rs's schema hint mechanism to read INT96 at the
resolution
+/// specified by the Iceberg schema (`timestamp` → microsecond, `timestamp_ns`
→ nanosecond).
+///
+/// Iceberg Java handles this differently: it bypasses parquet-mr with a
custom column reader
+/// (`GenericParquetReaders.TimestampInt96Reader`). We achieve the same result
via schema hints.
+///
+/// References:
+/// - Iceberg spec primitive types:
<https://iceberg.apache.org/spec/#primitive-types>
+/// - arrow-rs schema hint support:
<https://github.com/apache/arrow-rs/pull/7285>
+fn coerce_int96_timestamps(
+ arrow_schema: &ArrowSchemaRef,
+ iceberg_schema: &Schema,
+) -> Option<Arc<ArrowSchema>> {
+ let mut visitor = Int96CoercionVisitor::new(iceberg_schema);
+ let coerced = visit_schema(arrow_schema, &mut visitor).ok()?;
+ if visitor.changed {
+ Some(Arc::new(coerced))
+ } else {
+ None
+ }
+}
+
+/// Visitor that coerces `Timestamp(Nanosecond)` Arrow fields to the resolution
+/// indicated by the Iceberg schema. Follows the same pattern as
`MetadataStripVisitor`.
+struct Int96CoercionVisitor<'a> {
+ iceberg_schema: &'a Schema,
+ field_stack: Vec<Field>,
+ changed: bool,
+}
+
+impl<'a> Int96CoercionVisitor<'a> {
+ fn new(iceberg_schema: &'a Schema) -> Self {
+ Self {
+ iceberg_schema,
+ field_stack: Vec::new(),
+ changed: false,
+ }
+ }
+
+ /// Determine the target TimeUnit for a Timestamp(Nanosecond) field based
on the
+ /// Iceberg schema. Falls back to microsecond when field IDs are
unavailable,
+ /// matching Iceberg Java behavior.
+ fn target_unit(&self, field: &Field) -> Option<TimeUnit> {
+ if !matches!(
+ field.data_type(),
+ DataType::Timestamp(TimeUnit::Nanosecond, _)
+ ) {
+ return None;
+ }
+
+ let target = field
+ .metadata()
+ .get(PARQUET_FIELD_ID_META_KEY)
+ .and_then(|id_str| id_str.parse::<i32>().ok())
+ .and_then(|field_id| self.iceberg_schema.field_by_id(field_id))
+ .and_then(|f| match &*f.field_type {
+ Type::Primitive(PrimitiveType::Timestamp |
PrimitiveType::Timestamptz) => {
+ Some(TimeUnit::Microsecond)
+ }
+ Type::Primitive(PrimitiveType::TimestampNs |
PrimitiveType::TimestamptzNs) => {
+ Some(TimeUnit::Nanosecond)
+ }
+ _ => None,
+ })
+ // Iceberg Java reads INT96 as microseconds by default
+ .unwrap_or(TimeUnit::Microsecond);
+
+ if target == TimeUnit::Nanosecond {
+ None
+ } else {
+ Some(target)
+ }
+ }
+}
+
+impl ArrowSchemaVisitor for Int96CoercionVisitor<'_> {
+ type T = Field;
+ type U = ArrowSchema;
+
+ fn before_field(&mut self, field: &Field) -> Result<()> {
+ self.field_stack.push(field.as_ref().clone());
+ Ok(())
+ }
+
+ fn before_list_element(&mut self, field: &Field) -> Result<()> {
+ self.field_stack.push(field.as_ref().clone());
+ Ok(())
+ }
+
+ fn before_map_key(&mut self, field: &Field) -> Result<()> {
+ self.field_stack.push(field.as_ref().clone());
+ Ok(())
+ }
+
+ fn before_map_value(&mut self, field: &Field) -> Result<()> {
+ self.field_stack.push(field.as_ref().clone());
+ Ok(())
+ }
+
+ fn schema(&mut self, schema: &ArrowSchema, values: Vec<Self::T>) ->
Result<Self::U> {
+ Ok(ArrowSchema::new_with_metadata(
+ values,
+ schema.metadata().clone(),
+ ))
+ }
+
+ fn r#struct(&mut self, _fields: &Fields, results: Vec<Self::T>) ->
Result<Self::T> {
+ let field_info = self
+ .field_stack
+ .pop()
Review Comment:
We could do pop in after_field method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]