zhuqi-lucas commented on code in PR #16395:
URL: https://github.com/apache/datafusion/pull/16395#discussion_r2159141300


##########
datafusion-examples/examples/embedding_parquet_indexes.rs:
##########
@@ -0,0 +1,380 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Example: embedding and using a custom “distinct values” index in Parquet 
files
+//!
+//! This example shows how to build and leverage a file‑level distinct‑values 
index
+//! for pruning in DataFusion’s Parquet scans.
+//!
+//! Steps:
+//! 1. Compute the distinct values for a target column and serialize them into 
bytes.
+//! 2. Write each Parquet file with:
+//!    - regular data pages for your column
+//!    - the magic marker `IDX1` and a little‑endian length, to identify our 
custom index format
+//!    - the serialized distinct‑values bytes
+//!    - footer key/value metadata entries (`distinct_index_offset` and 
`distinct_index_length`)
+//! 3. Read back each file’s footer metadata to locate and deserialize the 
index.
+//! 4. Build a `DistinctIndexTable` (a custom `TableProvider`) that scans 
footers
+//!    into a map of filename → `HashSet<String>` of distinct values.
+//! 5. In `scan()`, prune out any Parquet files whose distinct set doesn’t 
match the
+//!    `category = 'X'` filter, then only read data from the remaining files.
+//!
+//! This technique embeds a lightweight, application‑specific index directly 
in Parquet
+//! metadata to achieve efficient file‑level pruning without modifying the 
Parquet format.
+//!
+//! And it's very efficient, since we don't add any additional info to the 
metadata, we write the custom index
+//! after the data pages, and we only read it when needed.
+
+use arrow::array::{ArrayRef, StringArray};
+use arrow::record_batch::RecordBatch;
+use arrow_schema::{DataType, Field, Schema, SchemaRef};
+use async_trait::async_trait;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::{HashMap, HashSet, Result};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::memory::DataSourceExec;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder, 
ParquetSource};
+use datafusion::datasource::TableType;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::{Operator, TableProviderFilterPushDown};
+use datafusion::parquet::arrow::ArrowSchemaConverter;
+use datafusion::parquet::data_type::{ByteArray, ByteArrayType};
+use datafusion::parquet::errors::ParquetError;
+use datafusion::parquet::file::metadata::KeyValue;
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::parquet::file::reader::{FileReader, SerializedFileReader};
+use datafusion::parquet::file::writer::SerializedFileWriter;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::*;
+use datafusion::scalar::ScalarValue;
+use std::fs::{create_dir_all, read_dir, File};
+use std::io::{Read, Seek, SeekFrom, Write};
+use std::path::{Path, PathBuf};
+use std::sync::Arc;
+use tempfile::TempDir;
+
+///
+/// Example creating the Parquet file that
+/// contains specialized indexes and a page‑index offset
+///
+/// Note: the page index offset will after the custom index, which
+/// is originally after the data pages.
+///
+/// ```text
+///         ┌──────────────────────┐
+///         │┌───────────────────┐ │
+///         ││     DataPage      │ │      Standard Parquet
+///         │└───────────────────┘ │      Data pages
+///         │┌───────────────────┐ │
+///         ││     DataPage      │ │
+///         │└───────────────────┘ │
+///         │        ...           │
+///         │                      │
+///         │┌───────────────────┐ │
+///         ││     DataPage      │ │
+///         │└───────────────────┘ │
+///         │┏━━━━━━━━━━━━━━━━━━━┓ │
+///         │┃                   ┃ │        key/value metadata
+///         │┃   Special Index   ┃◀┼────    that points to the
+///         │┃                   ┃ │     │  custom index blob
+///         │┗━━━━━━━━━━━━━━━━━━━┛ │
+///         │┏───────────────────┓ │
+///         │┃ Page Index Offset ┃◀┼────    little‑endian u64
+///         │┗───────────────────┛ │     │  sitting after the custom index
+///         │╔═══════════════════╗ │     │
+///         │║                   ║ │
+///         │║  Parquet Footer   ║ │     │  thrift‑encoded
+///         │║                   ║ ┼──────  ParquetMetadata
+///         │║                   ║ │
+///         │╚═══════════════════╝ │
+///         └──────────────────────┘
+///
+///               Parquet File
+/// ```
+/// DistinctIndexTable is a custom TableProvider that reads Parquet files
+
+#[derive(Debug)]
+struct DistinctIndexTable {
+    schema: SchemaRef,
+    index: HashMap<String, HashSet<String>>,
+    dir: PathBuf,
+}
+
+impl DistinctIndexTable {
+    /// Scan a directory, read each file's footer metadata into a map
+    fn try_new(dir: impl Into<PathBuf>, schema: SchemaRef) -> Result<Self> {
+        let dir = dir.into();
+        let mut index = HashMap::new();
+
+        for entry in read_dir(&dir)? {
+            let path = entry?.path();
+            if path.extension().and_then(|s| s.to_str()) != Some("parquet") {
+                continue;
+            }
+            let file_name = 
path.file_name().unwrap().to_string_lossy().to_string();
+
+            let distinct_set = read_distinct_index(&path)?;
+
+            println!("Read distinct index for {file_name}: {file_name:?}");
+            index.insert(file_name, distinct_set);
+        }
+
+        Ok(Self { schema, index, dir })
+    }
+}
+
+pub struct IndexedParquetWriter<W: Write + Seek> {
+    writer: SerializedFileWriter<W>,
+}
+
+impl<W: Write + Seek + Send> IndexedParquetWriter<W> {
+    pub fn try_new(
+        sink: W,
+        schema: Arc<Schema>,
+        props: WriterProperties,
+    ) -> Result<Self> {
+        let schema_desc = 
ArrowSchemaConverter::new().convert(schema.as_ref())?;
+        let props_ptr = Arc::new(props);
+        let writer =
+            SerializedFileWriter::new(sink, schema_desc.root_schema_ptr(), 
props_ptr)?;
+        Ok(Self { writer })
+    }
+}
+
+/// Magic bytes to identify our custom index format

Review Comment:
   Thank you @alamb! This is really a better way, perfect! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to