alamb commented on code in PR #16395: URL: https://github.com/apache/datafusion/pull/16395#discussion_r2178596921
########## datafusion-examples/examples/embedding_parquet_indexes.rs: ########## @@ -0,0 +1,402 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Example: embedding and using a custom “distinct values” index in Parquet files +//! +//! This example shows how to build and leverage a file‑level distinct‑values index +//! for pruning in DataFusion’s Parquet scans. +//! +//! Steps: +//! 1. Compute the distinct values for a target column and serialize them into bytes. +//! 2. Write each Parquet file with: +//! - regular data pages for your column +//! - the magic marker `IDX1` and a little‑endian length, to identify our custom index format +//! - the serialized distinct‑values bytes +//! - footer key/value metadata entries (`distinct_index_offset` and `distinct_index_length`) +//! 3. Read back each file’s footer metadata to locate and deserialize the index. +//! 4. Build a `DistinctIndexTable` (a custom `TableProvider`) that scans footers +//! into a map of filename → `HashSet<String>` of distinct values. +//! 5. In `scan()`, prune out any Parquet files whose distinct set doesn’t match the +//! `category = 'X'` filter, then only read data from the remaining files. +//! +//! This technique embeds a lightweight, application‑specific index directly in Parquet +//! metadata to achieve efficient file‑level pruning without modifying the Parquet format. +//! +//! And it's very efficient, since we don't add any additional info to the metadata, we write the custom index +//! after the data pages, and we only read it when needed. +//! +//! **Compatibility note: why other Parquet readers simply skip over our extra index blob** +//! +//! Any standard Parquet reader will: +//! 1. Seek to the end of the file and read the last 8 bytes (a 4‑byte little‑endian footer length followed by the `PAR1` magic). +//! 2. Seek backwards by that length to parse only the Thrift‑encoded footer metadata (including key/value pairs). +//! +//! Since our custom index bytes are appended *before* the footer (and we do not alter Parquet’s metadata schema), readers +//! never scan from the file start or “overflow” into our blob. They will encounter two unknown keys +//! (`distinct_index_offset` and `distinct_index_length`) in the footer metadata, ignore them (or expose as extra metadata), +//! and will not attempt to read or deserialize the raw index bytes. + +use arrow::array::{ArrayRef, StringArray}; +use arrow::record_batch::RecordBatch; +use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use async_trait::async_trait; +use datafusion::catalog::{Session, TableProvider}; +use datafusion::common::{HashMap, HashSet, Result}; +use datafusion::datasource::listing::PartitionedFile; +use datafusion::datasource::memory::DataSourceExec; +use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource}; +use datafusion::datasource::TableType; +use datafusion::execution::object_store::ObjectStoreUrl; +use datafusion::logical_expr::{Operator, TableProviderFilterPushDown}; +use datafusion::parquet::arrow::ArrowWriter; +use datafusion::parquet::errors::ParquetError; +use datafusion::parquet::file::metadata::KeyValue; +use datafusion::parquet::file::reader::{FileReader, SerializedFileReader}; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::*; +use datafusion::scalar::ScalarValue; +use std::fs::{create_dir_all, read_dir, File}; +use std::io::{Read, Seek, SeekFrom, Write}; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use tempfile::TempDir; + +/// +/// Example creating the Parquet file that +/// contains specialized indexes and a page‑index offset +/// +/// Note: the page index offset will after the custom index, which +/// is originally after the data pages. +/// +/// ```text +/// ┌──────────────────────┐ +/// │┌───────────────────┐ │ +/// ││ DataPage │ │ Standard Parquet +/// │└───────────────────┘ │ Data pages +/// │┌───────────────────┐ │ +/// ││ DataPage │ │ +/// │└───────────────────┘ │ +/// │ ... │ +/// │ │ +/// │┌───────────────────┐ │ +/// ││ DataPage │ │ +/// │└───────────────────┘ │ +/// │┏━━━━━━━━━━━━━━━━━━━┓ │ +/// │┃ ┃ │ key/value metadata +/// │┃ Special Index ┃◀┼──── that points to the +/// │┃ ┃ │ │ custom index blob +/// │┗━━━━━━━━━━━━━━━━━━━┛ │ +/// │┏───────────────────┓ │ +/// │┃ Page Index Offset ┃◀┼──── little‑endian u64 +/// │┗───────────────────┛ │ │ sitting after the custom index +/// │╔═══════════════════╗ │ │ +/// │║ ║ │ +/// │║ Parquet Footer ║ │ │ thrift‑encoded +/// │║ ║ ┼────── ParquetMetadata +/// │║ ║ │ +/// │╚═══════════════════╝ │ +/// └──────────────────────┘ +/// +/// Parquet File +/// ``` +/// DistinctIndexTable is a custom TableProvider that reads Parquet files + +#[derive(Debug, Clone)] +struct DistinctIndex { + inner: HashSet<String>, +} + +impl DistinctIndex { + // Init from iterator of distinct values + pub fn new<I: IntoIterator<Item = String>>(iter: I) -> Self { + Self { + inner: iter.into_iter().collect(), + } + } + + // serialize the distinct index to a writer + fn serialize<W: Write + Send>( + &self, + arrow_writer: &mut ArrowWriter<W>, + ) -> Result<()> { + let distinct: HashSet<_> = self.inner.iter().collect(); + let serialized = distinct + .into_iter() + .map(|s| s.as_str()) + .collect::<Vec<&str>>() + .join("\n"); + let index_bytes = serialized.into_bytes(); + + // Set the offset for the index + let offset = arrow_writer.bytes_written(); + let index_len = index_bytes.len() as u64; + + println!("Writing custom index at offset: {offset}, length: {index_len}"); + // Write the index magic and length to the file + arrow_writer.write_all(b"IDX1")?; + arrow_writer.write_all(&index_len.to_le_bytes())?; + + // Write the index bytes + arrow_writer.write_all(&index_bytes)?; + + // Append metadata about the index to the Parquet file footer + arrow_writer.append_key_value_metadata(KeyValue::new( + "distinct_index_offset".to_string(), + offset.to_string(), + )); + arrow_writer.append_key_value_metadata(KeyValue::new( + "distinct_index_length".to_string(), + index_bytes.len().to_string(), + )); + Ok(()) + } + + // create a new distinct index from the specified bytes + fn new_from_bytes(serialized: &[u8]) -> Result<Self> { + let s = String::from_utf8(serialized.to_vec()) + .map_err(|e| ParquetError::General(e.to_string()))?; + + Ok(Self { + inner: s.lines().map(|s| s.to_string()).collect(), + }) + } +} + +#[derive(Debug)] +struct DistinctIndexTable { + schema: SchemaRef, + index: HashMap<String, DistinctIndex>, + dir: PathBuf, +} + +impl DistinctIndexTable { + /// Scan a directory, read each file's footer metadata into a map + fn try_new(dir: impl Into<PathBuf>, schema: SchemaRef) -> Result<Self> { + let dir = dir.into(); + let mut index = HashMap::new(); + + for entry in read_dir(&dir)? { + let path = entry?.path(); + if path.extension().and_then(|s| s.to_str()) != Some("parquet") { + continue; + } + let file_name = path.file_name().unwrap().to_string_lossy().to_string(); + + let distinct_set = read_distinct_index(&path)?; + + println!("Read distinct index for {file_name}: {file_name:?}"); + index.insert(file_name, distinct_set); + } + + Ok(Self { schema, index, dir }) + } +} + +pub struct IndexedParquetWriter<W: Write + Seek> { + writer: ArrowWriter<W>, +} + +impl<W: Write + Seek + Send> IndexedParquetWriter<W> { + pub fn try_new(sink: W, schema: Arc<Schema>) -> Result<Self> { + let writer = ArrowWriter::try_new(sink, schema, None)?; + Ok(Self { writer }) + } +} + +/// Magic bytes to identify our custom index format +const INDEX_MAGIC: &[u8] = b"IDX1"; + +fn write_file_with_index(path: &Path, values: &[&str]) -> Result<()> { + let field = Field::new("category", DataType::Utf8, false); + let schema = Arc::new(Schema::new(vec![field.clone()])); + let arr: ArrayRef = Arc::new(StringArray::from(values.to_vec())); + let batch = RecordBatch::try_new(schema.clone(), vec![arr])?; + + let file = File::create(path)?; + + let mut writer = IndexedParquetWriter::try_new(file, schema.clone())?; + + // Write the data pages + writer.writer.write(&batch)?; + // Close row group + writer.writer.flush()?; + + let distinct_index: DistinctIndex = + DistinctIndex::new(values.iter().map(|s| s.to_string())); + + distinct_index.serialize(&mut writer.writer)?; + + writer.writer.close()?; + + println!("Finished writing file to {}", path.display()); + Ok(()) +} + +fn read_distinct_index(path: &Path) -> Result<DistinctIndex, ParquetError> { + let mut file = File::open(path)?; + + let file_size = file.metadata()?.len(); + println!( + "Reading index from {} (size: {})", + path.display(), + file_size + ); + + let reader = SerializedFileReader::new(file.try_clone()?)?; + let meta = reader.metadata().file_metadata(); + + let offset = meta + .key_value_metadata() + .and_then(|kvs| kvs.iter().find(|kv| kv.key == "distinct_index_offset")) + .and_then(|kv| kv.value.as_ref()) + .ok_or_else(|| ParquetError::General("Missing index offset".into()))? + .parse::<u64>() + .map_err(|e| ParquetError::General(e.to_string()))?; + + let length = meta + .key_value_metadata() + .and_then(|kvs| kvs.iter().find(|kv| kv.key == "distinct_index_length")) + .and_then(|kv| kv.value.as_ref()) + .ok_or_else(|| ParquetError::General("Missing index length".into()))? + .parse::<usize>() + .map_err(|e| ParquetError::General(e.to_string()))?; + + println!("Reading index at offset: {offset}, length: {length}"); + + file.seek(SeekFrom::Start(offset))?; + + let mut magic_buf = [0u8; 4]; + file.read_exact(&mut magic_buf)?; + if magic_buf != INDEX_MAGIC { + return Err(ParquetError::General("Invalid index magic".into())); + } + + let mut len_buf = [0u8; 8]; + file.read_exact(&mut len_buf)?; + let stored_len = u64::from_le_bytes(len_buf) as usize; + + if stored_len != length { + return Err(ParquetError::General("Index length mismatch".into())); + } + + let mut index_buf = vec![0u8; length]; + file.read_exact(&mut index_buf)?; + + let index = DistinctIndex::new_from_bytes(&index_buf) + .map_err(|e| ParquetError::General(e.to_string()))?; + Ok(index) +} + +/// Implement TableProvider for DistinctIndexTable, using the distinct index to prune files +#[async_trait] +impl TableProvider for DistinctIndexTable { + fn as_any(&self) -> &dyn std::any::Any { + self + } + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + fn table_type(&self) -> TableType { + TableType::Base + } + + /// Prune files before reading: only keep files whose distinct set contains the filter value + async fn scan( + &self, + _ctx: &dyn Session, + _proj: Option<&Vec<usize>>, + filters: &[Expr], + _limit: Option<usize>, + ) -> Result<Arc<dyn ExecutionPlan>> { + // Look for a single `category = 'X'` filter + let mut target: Option<String> = None; + + if filters.len() == 1 { + if let Expr::BinaryExpr(expr) = &filters[0] { + if expr.op == Operator::Eq { + if let ( + Expr::Column(c), + Expr::Literal(ScalarValue::Utf8(Some(v)), _), + ) = (&*expr.left, &*expr.right) + { + if c.name == "category" { + println!("Filtering for category: {v}"); + target = Some(v.clone()); + } + } + } + } + } + // Determine which files to scan + let keep: Vec<String> = self + .index + .iter() + .filter(|(_f, set)| target.as_ref().is_none_or(|v| set.inner.contains(v))) + .map(|(f, _)| f.clone()) + .collect(); + + println!("Pruned files: {:?}", keep.clone()); + + // Build ParquetSource for kept files + let url = ObjectStoreUrl::parse("file://")?; + let source = Arc::new(ParquetSource::default().with_enable_page_index(true)); + let mut builder = FileScanConfigBuilder::new(url, self.schema.clone(), source); + for file in keep { + let path = self.dir.join(&file); + let len = std::fs::metadata(&path)?.len(); + builder = builder.with_file(PartitionedFile::new( + path.to_str().unwrap().to_string(), + len, + )); + } + Ok(DataSourceExec::from_data_source(builder.build())) + } + + fn supports_filters_pushdown( + &self, + fs: &[&Expr], + ) -> Result<Vec<TableProviderFilterPushDown>> { + // Mark as inexact since pruning is file‑granular + Ok(vec![TableProviderFilterPushDown::Inexact; fs.len()]) + } +} + +#[tokio::main] +async fn main() -> Result<()> { + // 1. Create temp dir and write 3 Parquet files with different category sets + let tmp = TempDir::new()?; + let dir = tmp.path(); + create_dir_all(dir)?; Review Comment: BTW I also double checked that these files can be read by duckdb, and the had no problems: ```sql D select * from read_parquet('/tmp/parquet_index_data/*'); ┌──────────┐ │ category │ │ varchar │ ├──────────┤ │ foo │ │ bar │ │ foo │ │ baz │ │ qux │ │ foo │ │ quux │ │ quux │ └──────────┘ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org