alamb commented on code in PR #10549: URL: https://github.com/apache/datafusion/pull/10549#discussion_r1603675508
########## datafusion-examples/examples/parquet_index.rs: ########## @@ -0,0 +1,608 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ + new_null_array, Array, ArrayRef, AsArray, Int32Array, RecordBatch, StringArray, + UInt64Array, +}; +use arrow::datatypes::{Int32Type, UInt64Type}; +use arrow::util::pretty::pretty_format_batches; +use arrow_schema::{DataType, FieldRef, Schema, SchemaRef}; +use async_trait::async_trait; +use datafusion::datasource::listing::PartitionedFile; +use datafusion::datasource::physical_plan::{ + FileGroupPartitioner, FileScanConfig, ParquetExec, +}; +use datafusion::datasource::TableProvider; +use datafusion::execution::context::SessionState; +use datafusion::execution::object_store::ObjectStoreUrl; +use datafusion::parquet; +use datafusion::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; +use datafusion::parquet::arrow::ArrowWriter; +use datafusion::parquet::file::metadata::ColumnChunkMetaData; +use datafusion::parquet::file::properties::WriterProperties; +use datafusion::parquet::file::statistics::ValueStatistics; +use datafusion::parquet::schema::types::SchemaDescriptor; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::*; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_common::config::TableParquetOptions; +use datafusion_common::{ + internal_datafusion_err, DFSchema, DataFusionError, Result, Statistics, +}; +use datafusion_expr::utils::conjunction; +use datafusion_expr::TableType; +use std::any::Any; +use std::fmt::Display; +use std::fs; +use std::fs::{DirEntry, File}; +use std::ops::Range; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use tempfile::TempDir; +use url::Url; + +/// This example demonstrates building an index across multiple Parquet files and +/// using that index to skip reading ("Prune") files that do not contain relevant data. +/// +/// Note this is a low level example to demonstrate how to build such a custom index. +/// If you want to read a directory of parquet files as a table, you should probably use a higher level API such as +/// [`SessionContext::read_parquet`] or [`ListingTable`] instead. +/// +/// [`ListingTable`]: datafusion::datasource::listing::ListingTable +#[tokio::main] +async fn main() -> Result<()> { + // Create some dummy data for this example + let data = DemoData::try_new()?; + + // Create the table provider that knows how to read the parquet files and their metadata + let provider = IndexTableProvider::try_new(data.path())?; + + // Create a SessionContext that knows how to read from the table provider as the "index_table" + let mut config = SessionConfig::new(); + config.options_mut().catalog.information_schema = true; + let ctx = SessionContext::new_with_config(config); + ctx.register_table("index_table", Arc::new(provider))?; + // register file:// object store provider + // Get this error if not there: + // Error: Internal("No suitable object store found for file://") + // TODO: should make the error more helpful (and add an example of how to register local file object store) + // todo add example of how to register local file object store + let url = Url::try_from("file://") + .map_err(|e| internal_datafusion_err!("can't parse file url: {e}"))?; + let object_store = object_store::local::LocalFileSystem::new(); + ctx.runtime_env() + .register_object_store(&url, Arc::new(object_store)); + + println!("Tables in the information schema:"); + ctx.sql("SELECT * FROM information_schema.tables") + .await? + .show() + .await?; + + println!("Schema"); + ctx.sql("describe index_table").await?.show().await?; + + println!("Data in the index table:"); + ctx.sql("SELECT file_name, value FROM index_table LIMIT 10") + .await? + .show() + .await?; + + Ok(()) +} + +/// DataFusion `TableProvider` that uses an index to decide which Parquet files +/// to read and (eventually) what row groups to project using a the predicates +/// and a secondary index +/// +/// It builds data like: +/// ```text +/// data_dir: +/// file1.parquet +/// file2.parquet +/// index: +/// stored index +///``` +pub struct IndexTableProvider { + /// The index of the parquet files in the directory + index: ParquetMetadataIndex, + /// The files (TODO remove) + files: Vec<Vec<PartitionedFile>>, +} + +impl IndexTableProvider { + /// Create a new IndexTableProvider + pub fn try_new(dir: impl Into<PathBuf>) -> Result<Self> { + // Create an index of the parquet files in the directory as we see them. + let mut index_builder = ParquetMetadataIndexBuilder::new(); + + let dir = dir.into(); + + let files = read_dir(&dir)?; + for file in &files { + index_builder.add_file(&file.path())?; + } + + let index = index_builder.build()?; + + println!("Index:\n{index}"); + + // todo make a nicer API for this ("partitioned files" from directory) + let files = files + .iter() + .map(|f| { + let path = fs::canonicalize(f.path())?; + Ok(PartitionedFile::new( + path.display().to_string(), + f.metadata()?.len(), + )) + }) + .collect::<Result<Vec<_>>>()?; + let files = vec![files]; + + Ok(Self { index, files }) + } +} + +#[async_trait] +impl TableProvider for IndexTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.index.schema().clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + state: &SessionState, + projection: Option<&Vec<usize>>, + filters: &[Expr], + limit: Option<usize>, + ) -> Result<Arc<dyn ExecutionPlan>> { + // todo: apply index, etc. + + let df_schema = DFSchema::try_from(self.schema())?; + // convert filters like [`a = 1`, `b = 2`] to a single filter like `a = 1 AND b = 2` + let predicate = conjunction(filters.to_vec()); + let predicate = predicate + .map(|predicate| state.create_physical_expr(predicate, &df_schema)) + .transpose()?; + + let object_store_url = ObjectStoreUrl::parse("file://")?; + + let file_groups = FileGroupPartitioner::new() + .with_target_partitions(state.config().target_partitions()) + .repartition_file_groups(&self.files) + .unwrap_or_else(|| self.files.clone()); + + // for now, simply use ParquetExec + // TODO make a builder for FileScanConfig + let base_config = FileScanConfig { + object_store_url, + file_schema: self.schema(), + file_groups, + statistics: Statistics::new_unknown(self.index.schema()), + projection: projection.cloned(), + limit, + table_partition_cols: vec![], + output_ordering: vec![], + }; + + let metadata_size_hint = None; + + let table_parquet_options = TableParquetOptions::default(); + + // TODO make a builder for parquet exec + let exec = ParquetExec::new( + base_config, + predicate, + metadata_size_hint, + table_parquet_options, + ); + + Ok(Arc::new(exec)) + } +} + +/// Simple in memory index for a set of parquet files +/// +/// The index is represented as an arrow `RecordBatch` that can be passed +/// directly by the DataFusion [`PruningPredicate`] API +/// +/// The index looks like +/// ``` +/// +---------------+-----------+-----------+------------------+------------------+ +/// | file_name | file_size | row_count | value_column_min | value_column_max | +/// +---------------+-----------+-----------+------------------+------------------+ +/// | file1.parquet | 6062 | 100 | 0 | 99 | +/// | file2.parquet | 6062 | 100 | 100 | 199 | +/// | file3.parquet | 163310 | 2800 | 200 | 2999 | +/// +---------------+-----------+-----------+------------------+------------------+ +/// ``` +/// +/// Note a more advanced index would store this information for each row group +/// within a file + +#[derive(Debug)] +struct ParquetMetadataIndex { + file_schema: SchemaRef, + index: RecordBatch, +} + +impl Display for ParquetMetadataIndex { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!(f, "ParquetMetadataIndex")?; + write!( + f, + "{}", + pretty_format_batches(&[self.index.clone()]).unwrap() + ) + } +} + +impl ParquetMetadataIndex { + fn schema(&self) -> &SchemaRef { + &self.file_schema + } +} + +/// Builds [`ParquetMetadataIndex`] from a set of parquet files +#[derive(Debug, Default)] +struct ParquetMetadataIndexBuilder { + file_schema: Option<SchemaRef>, + filenames: Vec<String>, + file_sizes: Vec<u64>, + row_counts: Vec<u64>, + /// Holds the min/max value of the value column + value_column_mins: Vec<i32>, + value_column_maxs: Vec<i32>, +} + +impl ParquetMetadataIndexBuilder { + fn new() -> Self { + Self::default() + } + + /// Add a file to the index + fn add_file(&mut self, file: &Path) -> Result<()> { + let file_name = file + .file_name() + .ok_or_else(|| internal_datafusion_err!("No filename"))? + .to_str() + .ok_or_else(|| internal_datafusion_err!("Invalid filename"))?; + let file_size = file.metadata()?.len(); + + println!("Adding file {file_name}"); + + let file = File::open(file).map_err(|e| { + DataFusionError::from(e).context(format!("Error opening file {file:?}")) + })?; + + let reader = ParquetRecordBatchReaderBuilder::try_new(file)?; + + // Get the schema of the file. A real system might have to handle the + // case where the schema of the file is not the same as the schema of + // the other files e.g. using SchemaAdapter. + if self.file_schema.is_none() { + self.file_schema = Some(reader.schema().clone()); + } + + // extract the statistics from the file + let metadata = reader.metadata(); + + // TODO: extract the min/max values for each row group + let stats = parquet_stats_to_arrow("value", &reader)?; + // our example has no nulls, so this is a sanity check + assert_eq!(stats.row_count.null_count(), 0); + assert_eq!(stats.min.null_count(), 0); + assert_eq!(stats.max.null_count(), 0); + + // compute the total row count, min of the value column and max of the + // value column in this file + let row_count = stats + .row_count + .as_primitive::<UInt64Type>() + .iter() + .flatten() + .sum::<u64>(); + let value_column_min = stats + .min + .as_primitive::<Int32Type>() + .iter() + .flatten() + .min() + .unwrap_or_default(); + let value_column_max = stats + .max + .as_primitive::<Int32Type>() + .iter() + .flatten() + .max() + .unwrap_or_default(); + + // sanity check the statistics + assert_eq!(row_count, metadata.file_metadata().num_rows() as u64); + self.add_row( + file_name, + file_size, + row_count, + value_column_min, + value_column_max, + ); + Ok(()) + } + + /// Add a single row values to all the in progress rows + fn add_row( + &mut self, + file_name: impl Into<String>, + file_size: u64, + row_count: u64, + value_column_min: i32, + value_column_max: i32, + ) { + self.filenames.push(file_name.into()); + self.file_sizes.push(file_size); + self.row_counts.push(row_count); + self.value_column_mins.push(value_column_min); + self.value_column_maxs.push(value_column_max); + } + + /// Build the index from the files added + fn build(self) -> Result<ParquetMetadataIndex> { + let Some(file_schema) = self.file_schema else { + return Err(internal_datafusion_err!("No files added to index")); + }; + + let index = RecordBatch::try_from_iter(vec![ + ( + "file_name", + Arc::new(StringArray::from(self.filenames)) as ArrayRef, + ), + ( + "file_size", + Arc::new(UInt64Array::from(self.file_sizes)) as ArrayRef, + ), + ( + "row_count", + Arc::new(UInt64Array::from(self.row_counts)) as ArrayRef, + ), + ( + "value_column_min", + Arc::new(Int32Array::from(self.value_column_mins)) as ArrayRef, + ), + ( + "value_column_max", + Arc::new(Int32Array::from(self.value_column_maxs)) as ArrayRef, + ), + ])?; + + Ok(ParquetMetadataIndex { file_schema, index }) + } +} + +/// TODO use the new +/// API from https://github.com/apache/datafusion/issues/10453 +pub struct ArrowStatistics { + /// min values + min: ArrayRef, + /// max values + max: ArrayRef, + /// Row counts (UInt64Array) + row_count: ArrayRef, + /// Null Counts (UInt64Array) + #[allow(dead_code)] + null_count: ArrayRef, +} + +/// extract the minimum value in the statistics for the given column, if any +pub fn parquet_stats_to_arrow( Review Comment: Here is an early prototype API of what @NGA-TRAN is working on in https://github.com/apache/datafusion/issues/10453 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
