This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new ab51355c fix: compile error due to merge stale PR (#646)
ab51355c is described below
commit ab51355c6b01b4f64b7c0439bb705d42e60d2338
Author: xxchan <[email protected]>
AuthorDate: Tue Sep 24 21:25:45 2024 +0800
fix: compile error due to merge stale PR (#646)
Signed-off-by: xxchan <[email protected]>
---
crates/iceberg/src/arrow/reader.rs | 4 +-
.../src/expr/visitors/page_index_evaluator.rs | 44 +++++++++++++++++-----
2 files changed, 37 insertions(+), 11 deletions(-)
diff --git a/crates/iceberg/src/arrow/reader.rs
b/crates/iceberg/src/arrow/reader.rs
index 10f97153..ed422be9 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -190,7 +190,7 @@ impl ArrowReader {
try_join!(parquet_file.metadata(), parquet_file.reader())?;
let parquet_file_reader = ArrowFileReader::new(parquet_metadata,
parquet_reader);
- let should_load_page_index = row_selection_enabled &&
task.predicate().is_some();
+ let should_load_page_index = row_selection_enabled &&
task.predicate.is_some();
// Start creating the record batch stream, which wraps the parquet
file reader
let mut record_batch_stream_builder =
ParquetRecordBatchStreamBuilder::new_with_options(
@@ -245,7 +245,7 @@ impl ArrowReader {
record_batch_stream_builder.metadata(),
&selected_row_groups,
&field_id_map,
- task.schema(),
+ &task.schema,
)?;
record_batch_stream_builder =
diff --git a/crates/iceberg/src/expr/visitors/page_index_evaluator.rs
b/crates/iceberg/src/expr/visitors/page_index_evaluator.rs
index e8c1849a..af20be0a 100644
--- a/crates/iceberg/src/expr/visitors/page_index_evaluator.rs
+++ b/crates/iceberg/src/expr/visitors/page_index_evaluator.rs
@@ -24,14 +24,14 @@ use ordered_float::OrderedFloat;
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::file::metadata::RowGroupMetaData;
use parquet::file::page_index::index::Index;
-use parquet::format::PageLocation;
+use parquet::file::page_index::offset_index::OffsetIndexMetaData;
use crate::expr::visitors::bound_predicate_visitor::{visit,
BoundPredicateVisitor};
use crate::expr::{BoundPredicate, BoundReference};
use crate::spec::{Datum, PrimitiveLiteral, PrimitiveType, Schema};
use crate::{Error, ErrorKind, Result};
-type OffsetIndex = Vec<Vec<PageLocation>>;
+type OffsetIndex = Vec<OffsetIndexMetaData>;
const IN_PREDICATE_LIMIT: usize = 200;
@@ -206,13 +206,14 @@ impl<'a> PageIndexEvaluator<'a> {
}
/// returns a list of row counts per page
- fn calc_row_counts(&self, offset_index: &[PageLocation]) -> Vec<usize> {
+ fn calc_row_counts(&self, offset_index: &OffsetIndexMetaData) ->
Vec<usize> {
let mut remaining_rows = self.row_group_metadata.num_rows() as usize;
let mut row_counts = Vec::with_capacity(self.offset_index.len());
- for (idx, page_location) in offset_index.iter().enumerate() {
- let row_count = if idx < offset_index.len() - 1 {
- let row_count = (offset_index[idx + 1].first_row_index
+ let page_locations = offset_index.page_locations();
+ for (idx, page_location) in page_locations.iter().enumerate() {
+ let row_count = if idx < page_locations.len() - 1 {
+ let row_count = (page_locations[idx + 1].first_row_index
- page_location.first_row_index) as usize;
remaining_rows -= row_count;
row_count
@@ -868,6 +869,7 @@ mod tests {
use parquet::data_type::ByteArray;
use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
use parquet::file::page_index::index::{Index, NativeIndex, PageIndex};
+ use parquet::file::page_index::offset_index::OffsetIndexMetaData;
use parquet::file::statistics::Statistics;
use parquet::format::{BoundaryOrder, PageLocation};
use parquet::schema::types::{
@@ -1417,28 +1419,36 @@ mod tests {
Ok(row_group_metadata?)
}
- fn create_page_index() -> Result<(Vec<Index>, Vec<Vec<PageLocation>>)> {
+ fn create_page_index() -> Result<(Vec<Index>, Vec<OffsetIndexMetaData>)> {
let idx_float = Index::FLOAT(NativeIndex::<f32> {
indexes: vec![
PageIndex {
min: None,
max: None,
null_count: Some(1024),
+ repetition_level_histogram: None,
+ definition_level_histogram: None,
},
PageIndex {
min: Some(0.0),
max: Some(10.0),
null_count: Some(0),
+ repetition_level_histogram: None,
+ definition_level_histogram: None,
},
PageIndex {
min: Some(10.0),
max: Some(20.0),
null_count: Some(1),
+ repetition_level_histogram: None,
+ definition_level_histogram: None,
},
PageIndex {
min: None,
max: None,
null_count: None,
+ repetition_level_histogram: None,
+ definition_level_histogram: None,
},
],
boundary_order: BoundaryOrder(0), // UNORDERED
@@ -1450,26 +1460,36 @@ mod tests {
min: Some("AA".into()),
max: Some("DD".into()),
null_count: Some(0),
+ repetition_level_histogram: None,
+ definition_level_histogram: None,
},
PageIndex {
min: Some("DE".into()),
max: Some("DE".into()),
null_count: Some(0),
+ repetition_level_histogram: None,
+ definition_level_histogram: None,
},
PageIndex {
min: Some("DF".into()),
max: Some("UJ".into()),
null_count: Some(1),
+ repetition_level_histogram: None,
+ definition_level_histogram: None,
},
PageIndex {
min: None,
max: None,
null_count: Some(48),
+ repetition_level_histogram: None,
+ definition_level_histogram: None,
},
PageIndex {
min: None,
max: None,
null_count: None,
+ repetition_level_histogram: None,
+ definition_level_histogram: None,
},
],
boundary_order: BoundaryOrder(0), // UNORDERED
@@ -1491,8 +1511,14 @@ mod tests {
];
Ok((vec![idx_float, idx_string], vec![
- page_locs_float,
- page_locs_string,
+ OffsetIndexMetaData {
+ page_locations: page_locs_float,
+ unencoded_byte_array_data_bytes: None,
+ },
+ OffsetIndexMetaData {
+ page_locations: page_locs_string,
+ unencoded_byte_array_data_bytes: None,
+ },
]))
}
}