jcsherin commented on code in PR #16395:
URL: https://github.com/apache/datafusion/pull/16395#discussion_r2179977087


##########
datafusion-examples/examples/parquet_embedded_index.rs:
##########
@@ -0,0 +1,472 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Embedding and using a custom index in Parquet files
+//!
+//! # Background
+//!
+//! This example shows how to add an application‑specific index to an Apache
+//! Parquet file without modifying the Parquet format itself. The resulting
+//! files can be read by any standard Parquet reader, which will simply
+//! ignore the extra index data.
+//!
+//! A “distinct value” index, similar to a  ["set" Skip Index in ClickHouse],
+//! is stored in a custom binary format within the parquet file. Only the
+//! location of index is stored in Parquet footer key/value metadata.
+//! This approach is more efficient than storing the index itself in the footer
+//! metadata because the footer must be read and parsed by all readers,
+//! even those that do not use the index.
+//!
+//! The resulting Parquet file layout is as follows:
+//!
+//! ```text
+//!                   ┌──────────────────────┐                           
+//!                   │┌───────────────────┐ │                           
+//!                   ││     DataPage      │ │                           
+//!                   │└───────────────────┘ │                           
+//!  Standard Parquet │┌───────────────────┐ │                           
+//!  Data Pages       ││     DataPage      │ │                           
+//!                   │└───────────────────┘ │                           
+//!                   │        ...           │                           
+//!                   │┌───────────────────┐ │                           
+//!                   ││     DataPage      │ │                           
+//!                   │└───────────────────┘ │                           
+//!                   │┏━━━━━━━━━━━━━━━━━━━┓ │                           
+//! Non standard      │┃                   ┃ │                           
+//! index (ignored by │┃Custom Binary Index┃ │                           
+//! other Parquet     │┃ (Distinct Values) ┃◀│─ ─ ─                      
+//! readers)          │┃                   ┃ │     │                     
+//!                   │┗━━━━━━━━━━━━━━━━━━━┛ │                           
+//! Standard Parquet  │┏━━━━━━━━━━━━━━━━━━━┓ │     │  key/value metadata
+//! Page Index        │┃    Page Index     ┃ │        contains location  
+//!                   │┗━━━━━━━━━━━━━━━━━━━┛ │     │  of special index   
+//!                   │╔═══════════════════╗ │                           
+//!                   │║ Parquet Footer w/ ║ │     │                     
+//!                   │║     Metadata      ║ ┼ ─ ─                       
+//!                   │║ (Thrift Encoded)  ║ │                           
+//!                   │╚═══════════════════╝ │                           
+//!                   └──────────────────────┘                           
+//!                                                                      
+//!                         Parquet File                                 
+//!
+//! # High Level Flow
+//!
+//! To create a custom Parquet index:
+//!
+//! 1. Compute the index and serialize it to a binary format.
+//!
+//! 2. Write the Parquet file with:
+//!    - regular data pages
+//!    - the serialized index inline
+//!    - footer key/value metadata entry to locate the index
+//!
+//! To read and use the index are:
+//!
+//! 1. Read and deserialize the file’s footer to locate the index.
+//!
+//! 2. Read and deserialize the index.
+//!
+//! 3. Create a `TableProvider` that knows how to use the index to quickly find
+//!   the relevant files, row groups, data pages or rows based on on pushed 
down
+//!   filters.
+//!
+//! # FAQ: Why do other Parquet readers skip over the custom index?
+//!
+//! The flow for reading a parquet file is:
+//!
+//! 1. Seek to the end of the file and read the last 8 bytes (a 4‑byte
+//!    little‑endian footer length followed by the `PAR1` magic bytes).
+//!
+//! 2. Seek backwards by that length to parse the Thrift‑encoded footer
+//!    metadata (including key/value pairs).
+//!
+//! 3. Read data required for decoding such as data pages based on the offsets
+//!    encoded in the metadata.
+//!
+//! Since parquet readers do not scan from the start of the file they will read
+//! data in the file unless it is explicitly referenced in the footer metadata.
+//!
+//! Thus other readers will encounter and ignore an unknown key
+//! (`distinct_index_offset`) in the footer key/value metadata. Unless they
+//! know how to use that information, they will not attempt to read or
+//! the bytes that make up the index.
+//!
+//! ["set" Skip Index in ClickHouse]: 
https://clickhouse.com/docs/optimize/skipping-indexes#set
+
+use arrow::array::{ArrayRef, StringArray};
+use arrow::record_batch::RecordBatch;
+use arrow_schema::{DataType, Field, Schema, SchemaRef};
+use async_trait::async_trait;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::{exec_err, HashMap, HashSet, Result};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::memory::DataSourceExec;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder, 
ParquetSource};
+use datafusion::datasource::TableType;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::{Operator, TableProviderFilterPushDown};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::errors::ParquetError;
+use datafusion::parquet::file::metadata::{FileMetaData, KeyValue};
+use datafusion::parquet::file::reader::{FileReader, SerializedFileReader};
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::*;
+use datafusion::scalar::ScalarValue;
+use std::fs::{create_dir_all, read_dir, File};
+use std::io::{Read, Seek, SeekFrom, Write};
+use std::path::{Path, PathBuf};
+use std::sync::Arc;
+use tempfile::TempDir;
+
+/// An index of distinct values for a single column
+///
+/// In this example the index is a simple set of strings, but in a real
+/// application it could be any arbitrary data structure.
+///
+/// Also, this example indexes the distinct values for an entire file
+/// but a real application could create multiple indexes for multiple
+/// row groups and/or columns, depending on the use case.
+#[derive(Debug, Clone)]
+struct DistinctIndex {
+    inner: HashSet<String>,
+}
+
+impl DistinctIndex {
+    /// Create a DistinctIndex from an iterator of strings
+    pub fn new<I: IntoIterator<Item = String>>(iter: I) -> Self {
+        Self {
+            inner: iter.into_iter().collect(),
+        }
+    }
+
+    /// Returns true if the index contains the given value
+    pub fn contains(&self, value: &str) -> bool {
+        self.inner.contains(value)
+    }
+
+    /// Serialize the distinct index to a writer as bytes
+    ///
+    /// In this example, we use a simple newline-separated format,
+    /// but a real application can use any arbitrary binary format.
+    ///
+    /// Note that we must use the ArrowWriter to write the index so that its
+    /// internal accounting of offsets can correctly track the actual size of
+    /// the file. If we wrote directly to the underlying writer, the PageIndex
+    /// written right before the would be incorrect as they would not account
+    /// for the extra bytes written.
+    fn serialize<W: Write + Send>(
+        &self,
+        arrow_writer: &mut ArrowWriter<W>,
+    ) -> Result<()> {
+        let serialized = self
+            .inner
+            .iter()
+            .map(|s| s.as_str())
+            .collect::<Vec<_>>()
+            .join("\n");
+        let index_bytes = serialized.into_bytes();
+
+        // Set the offset for the index
+        let offset = arrow_writer.bytes_written();
+        let index_len = index_bytes.len() as u64;
+
+        println!("Writing custom index at offset: {offset}, length: 
{index_len}");
+        // Write the index magic and length to the file
+        arrow_writer.write_all(b"IDX1")?;

Review Comment:
   Minor: uses constant
   
   ```suggestion
           arrow_writer.write_all(INDEX_MAGIC)?;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to