etseidl commented on code in PR #6000:
URL: https://github.com/apache/arrow-rs/pull/6000#discussion_r1698720501


##########
parquet/src/file/writer.rs:
##########
@@ -1854,6 +2053,167 @@ mod tests {
         assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
     }
 
+    struct TestMetadata {
+        file_size: usize,
+        metadata: ParquetMetaData,
+    }
+
+    fn get_test_metadata(write_page_index: bool, read_page_index: bool) -> 
TestMetadata {
+        let mut buf = BytesMut::new().writer();
+        let schema: Arc<Schema> = Arc::new(Schema::new(vec![Field::new(
+            "a",
+            ArrowDataType::Int32,
+            true,
+        )]));
+
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, 
Some(2)]));
+
+        let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();
+
+        let writer_props = match write_page_index {
+            true => WriterProperties::builder()
+                .set_statistics_enabled(EnabledStatistics::Page)
+                .build(),
+            false => WriterProperties::builder()
+                .set_statistics_enabled(EnabledStatistics::Chunk)
+                .build(),
+        };
+
+        let mut writer = ArrowWriter::try_new(&mut buf, schema, 
Some(writer_props)).unwrap();
+        writer.write(&batch).unwrap();
+        writer.close().unwrap();
+
+        let data = buf.into_inner().freeze();
+
+        let reader_opts = match read_page_index {
+            true => ReadOptionsBuilder::new().with_page_index().build(),
+            false => ReadOptionsBuilder::new().build(),
+        };
+        let reader = SerializedFileReader::new_with_options(data.clone(), 
reader_opts).unwrap();
+        let metadata = reader.metadata().clone();
+        TestMetadata {
+            file_size: data.len(),
+            metadata,
+        }
+    }
+
+    fn has_page_index(metadata: &ParquetMetaData) -> bool {
+        match metadata.column_index() {
+            Some(column_index) => column_index
+                .iter()
+                .any(|rg_idx| rg_idx.iter().all(|col_idx| !matches!(col_idx, 
Index::NONE))),
+            None => false,
+        }
+    }
+
+    #[test]
+    fn test_roundtrip_parquet_metadata_without_page_index() {
+        // We currently don't have an ad-hoc ParquetMetadata loader that can 
load page indexes so
+        // we at least test round trip without them
+        let metadata = get_test_metadata(false, false);
+        assert!(!has_page_index(&metadata.metadata));
+
+        let mut buf = BytesMut::new().writer();
+        {
+            let mut writer = ParquetMetadataWriter::new(&mut buf, 
&metadata.metadata);
+            writer.finish().unwrap();
+        }
+
+        let data = buf.into_inner().freeze();
+
+        let decoded_metadata = parse_metadata(&data).unwrap();
+        assert!(!has_page_index(&metadata.metadata));
+
+        assert_eq!(metadata.metadata, decoded_metadata);
+    }
+
+    /// Temporary function so we can test loading metadata with page indexes
+    /// while we haven't fully figured out how to load it cleanly
+    #[cfg(feature = "async")]
+    async fn load_metadata_from_bytes(file_size: usize, data: Bytes) -> 
ParquetMetaData {
+        use crate::arrow::async_reader::{MetadataFetch, MetadataLoader};
+        use crate::errors::Result as ParquetResult;
+        use bytes::Bytes;
+        use futures::future::BoxFuture;
+        use futures::FutureExt;
+        use std::ops::Range;
+
+        /// Adapt a `Bytes` to a `MetadataFetch` implementation.
+        struct AsyncBytes {
+            data: Bytes,
+        }
+
+        impl AsyncBytes {
+            fn new(data: Bytes) -> Self {
+                Self { data }
+            }
+        }
+
+        impl MetadataFetch for AsyncBytes {
+            fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, 
ParquetResult<Bytes>> {
+                async move { Ok(self.data.slice(range.start..range.end)) 
}.boxed()
+            }
+        }
+
+        /// A `MetadataFetch` implementation that reads from a subset of the 
full data
+        /// while accepting ranges that address the full data.
+        struct MaskedBytes {
+            inner: Box<dyn MetadataFetch + Send>,
+            inner_range: Range<usize>,
+        }
+
+        impl MaskedBytes {
+            fn new(inner: Box<dyn MetadataFetch + Send>, inner_range: 
Range<usize>) -> Self {
+                Self { inner, inner_range }
+            }
+        }
+
+        impl MetadataFetch for &mut MaskedBytes {
+            fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, 
ParquetResult<Bytes>> {
+                let inner_range = self.inner_range.clone();
+                println!("inner_range: {:?}", inner_range);
+                println!("range: {:?}", range);
+                assert!(inner_range.start <= range.start && inner_range.end >= 
range.end);
+                let range =
+                    range.start - self.inner_range.start..range.end - 
self.inner_range.start;
+                self.inner.fetch(range)
+            }
+        }
+
+        let metadata_length = data.len();
+        let mut reader = MaskedBytes::new(
+            Box::new(AsyncBytes::new(data)),
+            file_size - metadata_length..file_size,
+        );
+        let metadata = MetadataLoader::load(&mut reader, file_size, None)
+            .await
+            .unwrap();
+        let loaded_metadata = metadata.finish();
+        let mut metadata = MetadataLoader::new(&mut reader, loaded_metadata);
+        metadata.load_page_index(true, true).await.unwrap();
+        metadata.finish()
+    }
+
+    #[tokio::test]
+    #[cfg(feature = "async")]
+    async fn test_encode_parquet_metadata_with_page_index() {
+        // Create a ParquetMetadata with page index information
+        let metadata = get_test_metadata(true, true);
+        assert!(has_page_index(&metadata.metadata));
+
+        let mut buf = BytesMut::new().writer();
+        {
+            let mut writer = ParquetMetadataWriter::new(&mut buf, 
&metadata.metadata);
+            writer.finish().unwrap();
+        }
+
+        let data = buf.into_inner().freeze();
+
+        let decoded_metadata = load_metadata_from_bytes(metadata.file_size, 
data).await;

Review Comment:
   > I could write a custom eq but... that is going to be a pain.
   
   True.  Perhaps just compare parts of the metadata rather than the whole 
thing. Let me see if I can whip something up quick and submit a PR to your 
branch.
   
   > I also need to think about the implications of these things not matching 
up. If I understand correctly those are the offsets to the page index data from 
the start of the file (or from where?) and because we loaded the metadata from 
only a portion of the file they got re-calculated differently?
   
   Well, you've written the page indexes to a new file (well, buffer), so those 
offsets point to their location in the new file as opposed to the old.  Looking 
at the test output, the column index is at offset 0 with length 23, and the 
offset index is at 23 with length 10.  Assuming there's no 'PAR1' header on the 
new file, those offsets seem correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to