etseidl commented on code in PR #6081: URL: https://github.com/apache/arrow-rs/pull/6081#discussion_r1792540699
########## parquet/examples/external_metadata.rs: ########## @@ -0,0 +1,237 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray}; +use arrow_cast::pretty::pretty_format_batches; +use futures::TryStreamExt; +use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; +use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder}; +use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter}; +use parquet::file::properties::{EnabledStatistics, WriterProperties}; +use std::fs::File; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use tempfile::TempDir; + +/// This example demonstrates advanced usage of the Parquet metadata APIs. +/// +/// It is sometimes desired to copy the metadata for parquet files stored on +/// remote object storage (e.g. S3) to a local file or an in-memory cache, use a +/// query engine like DataFusion to analyze the metadata to determine which file +/// to read, and then read any matching files with a single subsequent object +/// store request. +/// +/// # Example Overview +/// +/// 1. Reads the metadata of a Parquet file using [`ParquetMetaDataReader`] +/// +/// 2. Removes column statistics from the metadata (to make it smaller) +/// +/// 3. Stores the metadata in a separate file using [`ParquetMetaDataWriter`] +/// +/// 4. Reads the metadata from the separate file and uses that to read the +/// Parquet file, thus avoiding a second IO to read metadata or reparsing +/// the footer. +/// +#[tokio::main(flavor = "current_thread")] +async fn main() -> parquet::errors::Result<()> { + let tempdir = TempDir::new().unwrap(); + let parquet_path = create_parquet_file(&tempdir); + let metadata_path = tempdir.path().join("thrift_metadata.dat"); + + // In this example, we use a tokio file to mimic an async remote data source + let mut remote_parquet_file = tokio::fs::File::open(&parquet_path).await?; + + let metadata = get_metadata_from_remote_parquet_file(&mut remote_parquet_file).await; + println!( + "Metadata from 'remote' Parquet file into memory: {} bytes", + metadata.memory_size() + ); + + // now slim down the metadata and write it to a "local" file + let metadata = prepare_metadata(metadata); + write_metadata_to_local_file(metadata, &metadata_path); + + // now read the metadata from the local file and use it to read the "remote" Parquet file + let metadata = read_metadata_from_local_file(&metadata_path); + println!("Read metadata from file"); + + let batches = read_remote_parquet_file_with_metadata(remote_parquet_file, metadata).await; + + // display the results + let batches_string = pretty_format_batches(&batches).unwrap().to_string(); + let batches_lines: Vec<_> = batches_string.split('\n').collect(); + + assert_eq!( + batches_lines, + [ + "+-----+-------------+", + "| id | description |", + "+-----+-------------+", + "| 100 | oranges |", + "| 200 | apples |", + "| 201 | grapefruit |", + "| 300 | bannanas |", + "| 102 | grapes |", + "| 33 | pears |", + "+-----+-------------+", + ], + "actual output:\n\n{batches_lines:#?}" + ); + + Ok(()) +} + +/// Reads the metadata from a "remote" parquet file +/// +/// Note that this function models reading from a remote file source using a +/// tokio file. In a real application, you would implement [`MetadataFetch`] for +/// your own remote source. +/// +/// [`MetadataFetch`]: parquet::arrow::async_reader::MetadataFetch +async fn get_metadata_from_remote_parquet_file( + remote_file: &mut tokio::fs::File, +) -> ParquetMetaData { + // the remote source must know the total file size (e.g. from an object store LIST operation) + let file_size = remote_file.metadata().await.unwrap().len(); + + // tell the reader to read the page index + ParquetMetaDataReader::new() + .with_page_indexes(true) + .load_and_finish(remote_file, file_size as usize) + .await + .unwrap() +} + +/// modifies the metadata to reduce its size +fn prepare_metadata(metadata: ParquetMetaData) -> ParquetMetaData { + let orig_size = metadata.memory_size(); + + let mut builder = metadata.into_builder(); + + // remove column statistics to reduce the size of the metadata by converting + // the various structures into their respective builders and modifying them + // as needed. + for row_group in builder.take_row_groups() { + let mut row_group_builder = row_group.into_builder(); + for column in row_group_builder.take_columns() { + let column = column.into_builder().clear_statistics().build().unwrap(); + row_group_builder = row_group_builder.add_column_metadata(column); + } + let row_group = row_group_builder.build().unwrap(); + builder = builder.add_row_group(row_group); + } + let metadata = builder.build(); + + // verifiy that the size has indeed been reduced + let new_size = metadata.memory_size(); + assert!(new_size < orig_size, "metadata size did not decrease"); + println!("Reduced metadata size from {} to {}", orig_size, new_size); + metadata +} + +/// writes the metadata to a file +/// +/// The data is stored using the same thrift format as the Parquet file metadata +fn write_metadata_to_local_file(metadata: ParquetMetaData, file: impl AsRef<Path>) { + let file = File::create(file).unwrap(); + ParquetMetaDataWriter::new(file, &metadata) + .finish() + .unwrap() +} + +/// Reads the metadata from a file +/// +/// This function reads the format written by `write_metadata_to_file` +fn read_metadata_from_local_file(file: impl AsRef<Path>) -> ParquetMetaData { + let file = File::open(file).unwrap(); + ParquetMetaDataReader::new() + .with_column_indexes(true) + .with_offset_indexes(true) Review Comment: ```suggestion .with_page_indexes(true) ``` Optional. I can see wanting to show that column and offset indexes can be controlled separately. ########## parquet/src/file/metadata/mod.rs: ########## @@ -36,30 +36,27 @@ //! //! # APIs for working with Parquet Metadata //! -//! The Parquet readers and writers in this crate read and write +//! The Parquet readers and writers in this crate will handle reading and writing Review Comment: ```suggestion //! The Parquet readers and writers in this crate handle reading and writing ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
