mbutrovich commented on code in PR #2301:
URL: https://github.com/apache/iceberg-rust/pull/2301#discussion_r3018253572
##########
crates/iceberg/src/arrow/reader.rs:
##########
@@ -1232,6 +1257,202 @@ fn add_fallback_field_ids_to_arrow_schema(arrow_schema:
&ArrowSchemaRef) -> Arc<
))
}
+/// Coerce Arrow schema types for INT96 columns to match the Iceberg table
schema.
+///
+/// arrow-rs defaults INT96 to `Timestamp(Nanosecond)`, which overflows i64
for dates outside
+/// ~1677-2262. We use arrow-rs's schema hint mechanism to read INT96 at the
resolution
+/// specified by the Iceberg schema (`timestamp` → microsecond, `timestamp_ns`
→ nanosecond).
+///
+/// Iceberg Java handles this differently: it bypasses parquet-mr with a
custom column reader
+/// (`GenericParquetReaders.TimestampInt96Reader`). We achieve the same result
via schema hints.
+///
+/// References:
+/// - Iceberg spec primitive types:
<https://iceberg.apache.org/spec/#primitive-types>
+/// - arrow-rs schema hint support:
<https://github.com/apache/arrow-rs/pull/7285>
+fn coerce_int96_timestamps(
+ parquet_schema: &SchemaDescriptor,
+ arrow_schema: &ArrowSchemaRef,
+ iceberg_schema: &Schema,
+) -> Option<Arc<ArrowSchema>> {
+ let int96_paths: Vec<ColumnPath> = parquet_schema
+ .columns()
+ .iter()
+ .filter(|col| col.physical_type() == PhysicalType::INT96)
+ .map(|col| col.path().clone())
+ .collect();
+
+ if int96_paths.is_empty() {
+ return None;
+ }
+
+ let mut fields: Vec<FieldRef> =
arrow_schema.fields().iter().cloned().collect();
+ let mut any_changed = false;
+
+ for path in &int96_paths {
+ let parts = path.parts();
+ if let Some(idx) = fields.iter().position(|f| f.name() == &parts[0]) {
+ let (new_field, changed) = coerce_field_at_path(&fields[idx],
parts, 0, iceberg_schema);
+ if changed {
+ fields[idx] = new_field;
+ any_changed = true;
+ }
+ }
+ }
+
+ if any_changed {
+ Some(Arc::new(ArrowSchema::new_with_metadata(
+ fields,
+ arrow_schema.metadata().clone(),
+ )))
+ } else {
+ None
+ }
+}
+
+/// Navigate an Arrow field tree using a Parquet column path and coerce the
leaf
+/// INT96 `Timestamp(Nanosecond)` to the resolution specified by the Iceberg
schema.
+///
+/// Parquet column paths include intermediate group names for nested types
(e.g. the
+/// repeated group in LIST encoding). This function accounts for those extra
levels
+/// when descending through List, LargeList, and Map Arrow types.
+fn coerce_field_at_path(
+ field: &FieldRef,
+ parquet_path: &[String],
+ depth: usize,
+ iceberg_schema: &Schema,
+) -> (FieldRef, bool) {
+ if depth == parquet_path.len() - 1 {
+ return coerce_timestamp_field(field, iceberg_schema);
+ }
+
+ match field.data_type() {
+ DataType::Struct(fields) => {
+ let child_name = &parquet_path[depth + 1];
+ let mut new_fields: Vec<FieldRef> =
fields.iter().cloned().collect();
+ let mut changed = false;
+
+ if let Some(idx) = new_fields.iter().position(|f| f.name() ==
child_name) {
+ let (new_child, child_changed) =
+ coerce_field_at_path(&new_fields[idx], parquet_path, depth
+ 1, iceberg_schema);
+ if child_changed {
+ new_fields[idx] = new_child;
+ changed = true;
+ }
+ }
+
+ if changed {
+ let new_field = Arc::new(
+ Field::new(
+ field.name(),
+ DataType::Struct(Fields::from(new_fields)),
+ field.is_nullable(),
+ )
+ .with_metadata(field.metadata().clone()),
+ );
+ (new_field, true)
+ } else {
+ (Arc::clone(field), false)
+ }
+ }
+ DataType::List(element_field) | DataType::LargeList(element_field) => {
+ // Parquet 3-level LIST encoding inserts a repeated group between
the list
+ // and its element, e.g. `my_list.list.element` where `list` is the
+ // intermediate group. The group name varies across writers — the
spec says
+ // "list" but legacy data uses "element", "array",
`<parent>_tuple`, etc.
+ // We skip it (depth + 1) since we navigate by Parquet ColumnPath
parts
+ // which contain the actual name from the file. See the Parquet
LogicalTypes
+ // spec for the backward-compatibility rules:
+ //
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
+ if depth + 2 < parquet_path.len() {
+ debug_assert_eq!(
+ element_field.name(),
+ &parquet_path[depth + 2],
+ "Arrow list element name '{}' does not match Parquet path
segment '{}'",
+ element_field.name(),
+ &parquet_path[depth + 2]
+ );
+ let (new_element, changed) =
+ coerce_field_at_path(element_field, parquet_path, depth +
2, iceberg_schema);
+ if changed {
+ let new_type = match field.data_type() {
+ DataType::List(_) => DataType::List(new_element),
+ DataType::LargeList(_) =>
DataType::LargeList(new_element),
+ _ => unreachable!(),
+ };
+ let new_field = Arc::new(
+ Field::new(field.name(), new_type, field.is_nullable())
+ .with_metadata(field.metadata().clone()),
+ );
+ return (new_field, true);
+ }
+ }
Review Comment:
I don't see a good form of logging in the codebase yet. I'll bring it up at
the next sync.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]