blackmwk commented on code in PR #2307:
URL: https://github.com/apache/iceberg-rust/pull/2307#discussion_r3026135556


##########
crates/iceberg/src/arrow/reader.rs:
##########
@@ -4667,4 +4691,268 @@ message schema {
         assert_eq!(result[1], expected_1);
         assert_eq!(result[2], expected_2);
     }
+
+    /// Helper: write a Parquet file without field IDs containing a nested 
type column
+    /// followed by an `id` column, then read it with a predicate on `id`.
+    /// This exercises the fallback field ID mapping path that Comet uses.
+    async fn read_migrated_file_with_nested_type_and_predicate(
+        iceberg_schema: SchemaRef,
+        arrow_schema: Arc<ArrowSchema>,
+        batch: RecordBatch,
+    ) {
+        let tmp_dir = TempDir::new().unwrap();
+        let table_location = tmp_dir.path().to_str().unwrap().to_string();
+        let file_path = format!("{table_location}/1.parquet");
+
+        let props = WriterProperties::builder()
+            .set_compression(Compression::SNAPPY)
+            .build();
+        let file = File::create(&file_path).unwrap();
+        let mut writer = ArrowWriter::try_new(file, arrow_schema, 
Some(props)).unwrap();
+        writer.write(&batch).expect("Writing batch");
+        writer.close().unwrap();
+
+        // Fallback field IDs: nested_col=1, id=2
+        let predicate = Reference::new("id").greater_than(Datum::int(1));
+
+        let reader = ArrowReaderBuilder::new(FileIO::new_with_fs())
+            .with_row_group_filtering_enabled(true)
+            .with_row_selection_enabled(true)
+            .build();
+
+        let tasks = Box::pin(futures::stream::iter(
+            vec![Ok(FileScanTask {
+                file_size_in_bytes: 
std::fs::metadata(&file_path).unwrap().len(),
+                start: 0,
+                length: 0,
+                record_count: None,
+                data_file_path: file_path,
+                data_file_format: DataFileFormat::Parquet,
+                schema: iceberg_schema.clone(),
+                project_field_ids: vec![2],
+                predicate: Some(predicate.bind(iceberg_schema, true).unwrap()),
+                deletes: vec![],
+                partition: None,
+                partition_spec: None,
+                name_mapping: None,
+                case_sensitive: false,
+            })]
+            .into_iter(),
+        )) as FileScanTaskStream;
+
+        let result = reader
+            .read(tasks)
+            .unwrap()
+            .try_collect::<Vec<RecordBatch>>()
+            .await
+            .unwrap();
+
+        let ids: Vec<i32> = result
+            .iter()
+            .flat_map(|b| {
+                b.column(0)
+                    .as_primitive::<arrow_array::types::Int32Type>()
+                    .values()
+                    .iter()
+                    .copied()
+            })
+            .collect();
+        assert_eq!(ids, vec![2, 3]);
+    }
+
+    /// Regression for <https://github.com/apache/iceberg-rust/issues/2306>:
+    /// predicate on a column after a struct in a migrated file (no field IDs).
+    #[tokio::test]
+    async fn test_predicate_on_migrated_file_with_struct() {

Review Comment:
   How about merging these three into one test case? The motivation is to see 
test the correctness in a more complex schema.



##########
crates/iceberg/src/arrow/reader.rs:
##########
@@ -4667,4 +4691,268 @@ message schema {
         assert_eq!(result[1], expected_1);
         assert_eq!(result[2], expected_2);
     }
+
+    /// Helper: write a Parquet file without field IDs containing a nested 
type column
+    /// followed by an `id` column, then read it with a predicate on `id`.
+    /// This exercises the fallback field ID mapping path that Comet uses.
+    async fn read_migrated_file_with_nested_type_and_predicate(
+        iceberg_schema: SchemaRef,
+        arrow_schema: Arc<ArrowSchema>,
+        batch: RecordBatch,
+    ) {
+        let tmp_dir = TempDir::new().unwrap();
+        let table_location = tmp_dir.path().to_str().unwrap().to_string();
+        let file_path = format!("{table_location}/1.parquet");
+
+        let props = WriterProperties::builder()
+            .set_compression(Compression::SNAPPY)
+            .build();
+        let file = File::create(&file_path).unwrap();
+        let mut writer = ArrowWriter::try_new(file, arrow_schema, 
Some(props)).unwrap();
+        writer.write(&batch).expect("Writing batch");
+        writer.close().unwrap();
+
+        // Fallback field IDs: nested_col=1, id=2
+        let predicate = Reference::new("id").greater_than(Datum::int(1));
+
+        let reader = ArrowReaderBuilder::new(FileIO::new_with_fs())
+            .with_row_group_filtering_enabled(true)
+            .with_row_selection_enabled(true)
+            .build();
+
+        let tasks = Box::pin(futures::stream::iter(
+            vec![Ok(FileScanTask {
+                file_size_in_bytes: 
std::fs::metadata(&file_path).unwrap().len(),
+                start: 0,
+                length: 0,
+                record_count: None,
+                data_file_path: file_path,
+                data_file_format: DataFileFormat::Parquet,
+                schema: iceberg_schema.clone(),
+                project_field_ids: vec![2],
+                predicate: Some(predicate.bind(iceberg_schema, true).unwrap()),
+                deletes: vec![],
+                partition: None,
+                partition_spec: None,
+                name_mapping: None,
+                case_sensitive: false,
+            })]
+            .into_iter(),
+        )) as FileScanTaskStream;
+
+        let result = reader
+            .read(tasks)
+            .unwrap()
+            .try_collect::<Vec<RecordBatch>>()
+            .await
+            .unwrap();
+
+        let ids: Vec<i32> = result
+            .iter()
+            .flat_map(|b| {
+                b.column(0)
+                    .as_primitive::<arrow_array::types::Int32Type>()
+                    .values()
+                    .iter()
+                    .copied()
+            })
+            .collect();
+        assert_eq!(ids, vec![2, 3]);
+    }
+
+    /// Regression for <https://github.com/apache/iceberg-rust/issues/2306>:
+    /// predicate on a column after a struct in a migrated file (no field IDs).
+    #[tokio::test]
+    async fn test_predicate_on_migrated_file_with_struct() {

Review Comment:
   Also, it would be nice if we could use sth like 
https://docs.rs/serde_arrow/latest/serde_arrow/index.html to make the tests 
more readable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to