tustvold commented on code in PR #1762:
URL: https://github.com/apache/arrow-rs/pull/1762#discussion_r885060162


##########
parquet/src/file/page_index/index.rs:
##########
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::basic::Type;
+use crate::data_type::private::ParquetValueType;
+use crate::data_type::Int96;
+use crate::errors::ParquetError;
+use crate::util::bit_util::from_ne_slice;
+use parquet_format::{BoundaryOrder, ColumnIndex};
+use std::fmt::Debug;
+
+/// The statistics in one page
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct PageIndex<T> {
+    /// The minimum value, It is None when all values are null
+    pub min: Option<T>,
+    /// The maximum value, It is None when all values are null
+    pub max: Option<T>,
+    /// Null values in the page
+    pub null_count: Option<i64>,
+}
+
+impl<T> PageIndex<T> {
+    pub fn min(&self) -> Option<&T> {
+        self.min.as_ref()
+    }
+    pub fn max(&self) -> Option<&T> {
+        self.max.as_ref()
+    }
+    pub fn null_count(&self) -> Option<i64> {
+        self.null_count
+    }
+}
+
+#[derive(Debug, Clone, PartialEq)]
+pub enum Index {
+    BOOLEAN(BooleanIndex),

Review Comment:
   CamelCase is generally preferred to shouty case



##########
parquet/src/file/page_index/index_reader.rs:
##########
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::basic::Type;
+use crate::data_type::Int96;
+use crate::errors::ParquetError;
+use crate::file::metadata::ColumnChunkMetaData;
+use crate::file::page_index::index::{BooleanIndex, ByteArrayIndex, Index, 
NativeIndex};
+use crate::file::reader::ChunkReader;
+use parquet_format::{ColumnIndex, OffsetIndex, PageLocation};
+use std::io::{Cursor, Read};
+use thrift::protocol::TCompactInputProtocol;
+
+/// Read on row group's all columns indexes and change into  [`Index`]
+/// If not the format not available return an empty vector.
+pub fn read_columns_indexes<R: ChunkReader>(
+    reader: &R,
+    chunks: &[ColumnChunkMetaData],
+) -> Result<Vec<Index>, ParquetError> {
+    let (offset, lengths) = get_index_offset_and_lengths(chunks)?;
+    let length = lengths.iter().sum::<usize>();
+
+    //read all need data into buffer
+    let mut reader = reader.get_read(offset, reader.len() as usize)?;
+    let mut data = vec![0; length];
+    reader.read_exact(&mut data)?;
+
+    let mut start = 0;
+    let data = lengths.into_iter().map(|length| {
+        let r = &data[start..start + length];
+        start += length;
+        r
+    });
+
+    chunks
+        .iter()
+        .zip(data)
+        .map(|(chunk, data)| {
+            let column_type = chunk.column_type();
+            deserialize_column_index(data, column_type)
+        })
+        .collect()
+}
+
+/// Read on row group's all indexes and change into  [`Index`]
+/// If not the format not available return an empty vector.
+pub fn read_pages_locations<R: ChunkReader>(
+    reader: &R,
+    chunks: &[ColumnChunkMetaData],
+) -> Result<Vec<Vec<PageLocation>>, ParquetError> {
+    let (offset, total_length) = get_location_offset_and_total_length(chunks)?;
+
+    //read all need data into buffer
+    let mut reader = reader.get_read(offset, reader.len() as usize)?;
+    let mut data = vec![0; total_length];
+    reader.read_exact(&mut data)?;
+
+    let mut d = Cursor::new(data);
+    let mut result = vec![];
+
+    for _ in 0..chunks.len() {
+        let mut prot = TCompactInputProtocol::new(&mut d);
+        let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
+        result.push(offset.page_locations);
+    }
+    Ok(result)
+}
+
+fn get_index_offset_and_lengths(
+    chunks: &[ColumnChunkMetaData],
+) -> Result<(u64, Vec<usize>), ParquetError> {
+    let first_col_metadata = if let Some(chunk) = chunks.first() {
+        chunk
+    } else {
+        return Ok((0, vec![]));
+    };
+
+    let offset: u64 = if let Some(offset) = 
first_col_metadata.column_index_offset() {
+        offset.try_into().unwrap()
+    } else {
+        return Ok((0, vec![]));
+    };
+
+    let lengths = chunks
+        .iter()
+        .map(|x| x.column_index_length())
+        .map(|maybe_length| {
+            let index_length = maybe_length.ok_or_else(|| {
+                ParquetError::General(
+                    "The column_index_length must exist if offset_index_offset 
exists"
+                        .to_string(),
+                )
+            })?;
+
+            Ok(index_length.try_into().unwrap())
+        })
+        .collect::<Result<Vec<_>, ParquetError>>()?;
+
+    Ok((offset, lengths))
+}
+
+fn get_location_offset_and_total_length(
+    chunks: &[ColumnChunkMetaData],
+) -> Result<(u64, usize), ParquetError> {
+    let metadata = if let Some(chunk) = chunks.first() {
+        chunk
+    } else {
+        return Ok((0, 0));
+    };
+
+    let offset: u64 = if let Some(offset) = metadata.offset_index_offset() {
+        offset.try_into().unwrap()
+    } else {
+        return Ok((0, 0));
+    };
+
+    let lengths = chunks
+        .iter()
+        .map(|x| x.offset_index_length())
+        .map(|maybe_length| {
+            let index_length = maybe_length.ok_or_else(|| {
+                ParquetError::General(
+                    "The offset_index_length must exist if offset_index_offset 
exists"
+                        .to_string(),
+                )
+            })?;
+
+            Ok(index_length.try_into().unwrap())
+        })
+        .collect::<Result<Vec<_>, ParquetError>>()?;
+
+    Ok((offset, lengths.iter().sum()))

Review Comment:
   Why not just compute the length as you go, instead of collecting to a vec 
and then calling sum?



##########
parquet/src/file/page_index/index_reader.rs:
##########
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::basic::Type;
+use crate::data_type::Int96;
+use crate::errors::ParquetError;
+use crate::file::metadata::ColumnChunkMetaData;
+use crate::file::page_index::index::{BooleanIndex, ByteArrayIndex, Index, 
NativeIndex};
+use crate::file::reader::ChunkReader;
+use parquet_format::{ColumnIndex, OffsetIndex, PageLocation};
+use std::io::{Cursor, Read};
+use thrift::protocol::TCompactInputProtocol;
+
+/// Read on row group's all columns indexes and change into  [`Index`]
+/// If not the format not available return an empty vector.
+pub fn read_columns_indexes<R: ChunkReader>(
+    reader: &R,
+    chunks: &[ColumnChunkMetaData],
+) -> Result<Vec<Index>, ParquetError> {
+    let (offset, lengths) = get_index_offset_and_lengths(chunks)?;
+    let length = lengths.iter().sum::<usize>();
+
+    //read all need data into buffer
+    let mut reader = reader.get_read(offset, reader.len() as usize)?;
+    let mut data = vec![0; length];
+    reader.read_exact(&mut data)?;
+
+    let mut start = 0;
+    let data = lengths.into_iter().map(|length| {
+        let r = &data[start..start + length];
+        start += length;
+        r
+    });
+
+    chunks
+        .iter()
+        .zip(data)
+        .map(|(chunk, data)| {
+            let column_type = chunk.column_type();
+            deserialize_column_index(data, column_type)
+        })
+        .collect()
+}
+
+/// Read on row group's all indexes and change into  [`Index`]
+/// If not the format not available return an empty vector.
+pub fn read_pages_locations<R: ChunkReader>(
+    reader: &R,
+    chunks: &[ColumnChunkMetaData],
+) -> Result<Vec<Vec<PageLocation>>, ParquetError> {
+    let (offset, total_length) = get_location_offset_and_total_length(chunks)?;
+
+    //read all need data into buffer
+    let mut reader = reader.get_read(offset, reader.len() as usize)?;
+    let mut data = vec![0; total_length];
+    reader.read_exact(&mut data)?;
+
+    let mut d = Cursor::new(data);
+    let mut result = vec![];
+
+    for _ in 0..chunks.len() {
+        let mut prot = TCompactInputProtocol::new(&mut d);
+        let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
+        result.push(offset.page_locations);
+    }
+    Ok(result)
+}
+
+fn get_index_offset_and_lengths(
+    chunks: &[ColumnChunkMetaData],
+) -> Result<(u64, Vec<usize>), ParquetError> {
+    let first_col_metadata = if let Some(chunk) = chunks.first() {
+        chunk
+    } else {
+        return Ok((0, vec![]));
+    };
+
+    let offset: u64 = if let Some(offset) = 
first_col_metadata.column_index_offset() {
+        offset.try_into().unwrap()
+    } else {
+        return Ok((0, vec![]));
+    };
+
+    let lengths = chunks
+        .iter()
+        .map(|x| x.column_index_length())
+        .map(|maybe_length| {
+            let index_length = maybe_length.ok_or_else(|| {
+                ParquetError::General(
+                    "The column_index_length must exist if offset_index_offset 
exists"
+                        .to_string(),
+                )
+            })?;
+
+            Ok(index_length.try_into().unwrap())
+        })
+        .collect::<Result<Vec<_>, ParquetError>>()?;
+
+    Ok((offset, lengths))
+}
+
+fn get_location_offset_and_total_length(

Review Comment:
   A docstring explaining the return type would be good



##########
parquet/src/file/page_index/index.rs:
##########
@@ -0,0 +1,284 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::basic::Type;
+use crate::data_type::private::ParquetValueType;
+use crate::data_type::Int96;
+use crate::errors::ParquetError;
+use crate::util::bit_util::from_ne_slice;
+use parquet_format::{BoundaryOrder, ColumnIndex};
+use std::any::Any;
+use std::fmt::Debug;
+
+/// The static in one page
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct PageIndex<T> {
+    /// The minimum value, It is None when all values are null
+    pub min: Option<T>,
+    /// The maximum value, It is None when all values are null
+    pub max: Option<T>,
+    /// Null values in the page
+    pub null_count: Option<i64>,
+}
+
+impl<T> PageIndex<T> {
+    pub fn min(&self) -> &Option<T> {
+        &self.min
+    }
+    pub fn max(&self) -> &Option<T> {
+        &self.max
+    }
+    pub fn null_count(&self) -> &Option<i64> {
+        &self.null_count
+    }
+}
+
+/// Trait object representing a [`ColumnIndex`]
+pub trait Index: Send + Sync + Debug {
+    fn as_any(&self) -> &dyn Any;
+
+    fn physical_type(&self) -> &Type;
+}
+
+impl PartialEq for dyn Index + '_ {
+    fn eq(&self, that: &dyn Index) -> bool {
+        equal(self, that)
+    }
+}
+
+impl Eq for dyn Index + '_ {}
+
+fn equal(lhs: &dyn Index, rhs: &dyn Index) -> bool {
+    if lhs.physical_type() != rhs.physical_type() {
+        return false;
+    }
+
+    match lhs.physical_type() {
+        Type::BOOLEAN => {
+            lhs.as_any().downcast_ref::<BooleanIndex>().unwrap()
+                == rhs.as_any().downcast_ref::<BooleanIndex>().unwrap()
+        }
+        Type::INT32 => {
+            lhs.as_any().downcast_ref::<NativeIndex<i32>>().unwrap()
+                == rhs.as_any().downcast_ref::<NativeIndex<i32>>().unwrap()
+        }
+        Type::INT64 => {
+            lhs.as_any().downcast_ref::<NativeIndex<i64>>().unwrap()
+                == rhs.as_any().downcast_ref::<NativeIndex<i64>>().unwrap()
+        }
+        Type::INT96 => {
+            lhs.as_any().downcast_ref::<NativeIndex<Int96>>().unwrap()
+                == rhs.as_any().downcast_ref::<NativeIndex<Int96>>().unwrap()
+        }
+        Type::FLOAT => {
+            lhs.as_any().downcast_ref::<NativeIndex<f32>>().unwrap()
+                == rhs.as_any().downcast_ref::<NativeIndex<f32>>().unwrap()
+        }
+        Type::DOUBLE => {
+            lhs.as_any().downcast_ref::<NativeIndex<f64>>().unwrap()
+                == rhs.as_any().downcast_ref::<NativeIndex<f64>>().unwrap()
+        }
+        Type::BYTE_ARRAY => {
+            lhs.as_any().downcast_ref::<ByteIndex>().unwrap()
+                == rhs.as_any().downcast_ref::<ByteIndex>().unwrap()
+        }
+        Type::FIXED_LEN_BYTE_ARRAY => {
+            lhs.as_any().downcast_ref::<ByteIndex>().unwrap()
+                == rhs.as_any().downcast_ref::<ByteIndex>().unwrap()
+        }
+    }
+}
+
+/// An index of a column of [`Type`] physical representation
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct NativeIndex<T: ParquetValueType> {
+    /// The physical type
+    pub physical_type: Type,
+    /// The indexes, one item per page
+    pub indexes: Vec<PageIndex<T>>,
+    /// the order
+    pub boundary_order: BoundaryOrder,
+}
+
+impl<T: ParquetValueType> NativeIndex<T> {
+    /// Creates a new [`NativeIndex`]
+    pub(crate) fn try_new(
+        index: ColumnIndex,
+        physical_type: Type,
+    ) -> Result<Self, ParquetError> {
+        let len = index.min_values.len();
+
+        let null_counts = index
+            .null_counts
+            .map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
+            .unwrap_or_else(|| vec![None; len]);
+
+        let indexes = index
+            .min_values
+            .iter()
+            .zip(index.max_values.into_iter())
+            .zip(index.null_pages.into_iter())
+            .zip(null_counts.into_iter())
+            .map(|(((min, max), is_null), null_count)| {
+                let (min, max) = if is_null {
+                    (None, None)
+                } else {
+                    let min = min.as_slice();
+                    let max = max.as_slice();
+                    (Some(from_ne_slice::<T>(min)), 
Some(from_ne_slice::<T>(max)))

Review Comment:
   Aah - should be easy enough to fix FWIW



##########
parquet/src/file/page_index/index_reader.rs:
##########
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::basic::Type;
+use crate::data_type::Int96;
+use crate::errors::ParquetError;
+use crate::file::metadata::ColumnChunkMetaData;
+use crate::file::page_index::index::{BooleanIndex, ByteIndex, Index, 
NativeIndex};
+use crate::file::reader::ChunkReader;
+use parquet_format::{ColumnIndex, OffsetIndex, PageLocation};
+use std::io::{Cursor, Read};
+use std::sync::Arc;
+use thrift::protocol::TCompactInputProtocol;
+
+/// Read on row group's all columns indexes and change into  [`Index`]
+/// If not the format not available return an empty vector.
+pub fn read_columns_indexes<R: ChunkReader>(
+    reader: &R,
+    chunks: &[ColumnChunkMetaData],
+) -> Result<Vec<Arc<dyn Index>>, ParquetError> {
+    let (offset, lengths) = get_index_offset_and_lengths(chunks)?;
+    let length = lengths.iter().sum::<usize>();
+
+    //read all need data into buffer
+    let mut reader = reader.get_read(offset, reader.len() as usize)?;
+    let mut data = vec![0; length];
+    reader.read_exact(&mut data)?;
+
+    let mut start = 0;

Review Comment:
   In read_pages_locations you simply reuse the same cursor to continue where 
the previous one left off, here you instead explicitly slice the `data` buffer 
and feed this into `TCompactInputProtocol`. I couldn't see a particular reason 
why there were two different approaches to reading the same "style" of data



##########
parquet/src/file/serialized_reader.rs:
##########
@@ -189,6 +203,27 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
         })
     }
 
+    /// Creates file reader from a Parquet file with page Index.
+    /// Returns error if Parquet file does not exist or is corrupt.
+    pub fn new_with_page_index(chunk_reader: R) -> Result<Self> {

Review Comment:
   I see you've removed the new_with_options approach, I think I would prefer 
you revert that and remove this additional constructor instead. With this I'm 
not sure how you would set ReadOptions and also enable the page index.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to