mustafasrepo commented on code in PR #9221:
URL: https://github.com/apache/arrow-datafusion/pull/9221#discussion_r1495579650


##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -1112,719 +1112,719 @@ pub(crate) mod test_util {
     }
 }
 
-#[cfg(test)]
-mod tests {
-    use super::super::test_util::scan_format;
-    use crate::physical_plan::collect;
-    use std::fmt::{Display, Formatter};
-    use std::sync::atomic::{AtomicUsize, Ordering};
-
-    use super::*;
-
-    use crate::datasource::file_format::parquet::test_util::store_parquet;
-    use crate::physical_plan::metrics::MetricValue;
-    use crate::prelude::{SessionConfig, SessionContext};
-    use arrow::array::{Array, ArrayRef, StringArray};
-    use arrow::record_batch::RecordBatch;
-    use async_trait::async_trait;
-    use bytes::Bytes;
-    use datafusion_common::cast::{
-        as_binary_array, as_boolean_array, as_float32_array, as_float64_array,
-        as_int32_array, as_timestamp_nanosecond_array,
-    };
-    use datafusion_common::ScalarValue;
-    use futures::stream::BoxStream;
-    use futures::StreamExt;
-    use log::error;
-    use object_store::local::LocalFileSystem;
-    use object_store::path::Path;
-    use object_store::{
-        GetOptions, GetResult, ListResult, MultipartId, PutOptions, PutResult,
-    };
-    use parquet::arrow::arrow_reader::ArrowReaderOptions;
-    use parquet::arrow::ParquetRecordBatchStreamBuilder;
-    use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex};
-    use parquet::file::page_index::index::Index;
-    use tokio::fs::File;
-    use tokio::io::AsyncWrite;
-
-    #[tokio::test]
-    async fn read_merged_batches() -> Result<()> {
-        let c1: ArrayRef =
-            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
-
-        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), 
None]));
-
-        let batch1 = RecordBatch::try_from_iter(vec![("c1", 
c1.clone())]).unwrap();
-        let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap();
-
-        let store = Arc::new(LocalFileSystem::new()) as _;
-        let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?;
-
-        let session = SessionContext::new();
-        let ctx = session.state();
-        let format = ParquetFormat::default();
-        let schema = format.infer_schema(&ctx, &store, &meta).await.unwrap();
-
-        let stats =
-            fetch_statistics(store.as_ref(), schema.clone(), &meta[0], 
None).await?;
-
-        assert_eq!(stats.num_rows, Precision::Exact(3));
-        let c1_stats = &stats.column_statistics[0];
-        let c2_stats = &stats.column_statistics[1];
-        assert_eq!(c1_stats.null_count, Precision::Exact(1));
-        assert_eq!(c2_stats.null_count, Precision::Exact(3));
-
-        let stats = fetch_statistics(store.as_ref(), schema, &meta[1], 
None).await?;
-        assert_eq!(stats.num_rows, Precision::Exact(3));
-        let c1_stats = &stats.column_statistics[0];
-        let c2_stats = &stats.column_statistics[1];
-        assert_eq!(c1_stats.null_count, Precision::Exact(3));
-        assert_eq!(c2_stats.null_count, Precision::Exact(1));
-        assert_eq!(
-            c2_stats.max_value,
-            Precision::Exact(ScalarValue::Int64(Some(2)))
-        );
-        assert_eq!(
-            c2_stats.min_value,
-            Precision::Exact(ScalarValue::Int64(Some(1)))
-        );
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn is_schema_stable() -> Result<()> {
-        let c1: ArrayRef =
-            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
-
-        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), 
None]));
-
-        let batch1 =
-            RecordBatch::try_from_iter(vec![("a", c1.clone()), ("b", 
c1.clone())])
-                .unwrap();
-        let batch2 =
-            RecordBatch::try_from_iter(vec![("c", c2.clone()), ("d", 
c2.clone())])
-                .unwrap();
-
-        let store = Arc::new(LocalFileSystem::new()) as _;
-        let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?;
-
-        let session = SessionContext::new();
-        let ctx = session.state();
-        let format = ParquetFormat::default();
-        let schema = format.infer_schema(&ctx, &store, &meta).await.unwrap();
-
-        let order: Vec<_> = ["a", "b", "c", "d"]
-            .into_iter()
-            .map(|i| i.to_string())
-            .collect();
-        let coll: Vec<_> = schema
-            .all_fields()
-            .into_iter()
-            .map(|i| i.name().to_string())
-            .collect();
-        assert_eq!(coll, order);
-
-        Ok(())
-    }
-
-    #[derive(Debug)]
-    struct RequestCountingObjectStore {
-        inner: Arc<dyn ObjectStore>,
-        request_count: AtomicUsize,
-    }
-
-    impl Display for RequestCountingObjectStore {
-        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-            write!(f, "RequestCounting({})", self.inner)
-        }
-    }
-
-    impl RequestCountingObjectStore {
-        pub fn new(inner: Arc<dyn ObjectStore>) -> Self {
-            Self {
-                inner,
-                request_count: Default::default(),
-            }
-        }
-
-        pub fn request_count(&self) -> usize {
-            self.request_count.load(Ordering::SeqCst)
-        }
-
-        pub fn upcast(self: &Arc<Self>) -> Arc<dyn ObjectStore> {
-            self.clone()
-        }
-    }
-
-    #[async_trait]
-    impl ObjectStore for RequestCountingObjectStore {
-        async fn put_opts(
-            &self,
-            _location: &Path,
-            _bytes: Bytes,
-            _opts: PutOptions,
-        ) -> object_store::Result<PutResult> {
-            Err(object_store::Error::NotImplemented)
-        }
-
-        async fn put_multipart(
-            &self,
-            _location: &Path,
-        ) -> object_store::Result<(MultipartId, Box<dyn AsyncWrite + Unpin + 
Send>)>
-        {
-            Err(object_store::Error::NotImplemented)
-        }
-
-        async fn abort_multipart(
-            &self,
-            _location: &Path,
-            _multipart_id: &MultipartId,
-        ) -> object_store::Result<()> {
-            Err(object_store::Error::NotImplemented)
-        }
-
-        async fn get_opts(
-            &self,
-            location: &Path,
-            options: GetOptions,
-        ) -> object_store::Result<GetResult> {
-            self.request_count.fetch_add(1, Ordering::SeqCst);
-            self.inner.get_opts(location, options).await
-        }
-
-        async fn head(&self, _location: &Path) -> 
object_store::Result<ObjectMeta> {
-            Err(object_store::Error::NotImplemented)
-        }
-
-        async fn delete(&self, _location: &Path) -> object_store::Result<()> {
-            Err(object_store::Error::NotImplemented)
-        }
-
-        fn list(
-            &self,
-            _prefix: Option<&Path>,
-        ) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
-            Box::pin(futures::stream::once(async {
-                Err(object_store::Error::NotImplemented)
-            }))
-        }
-
-        async fn list_with_delimiter(
-            &self,
-            _prefix: Option<&Path>,
-        ) -> object_store::Result<ListResult> {
-            Err(object_store::Error::NotImplemented)
-        }
-
-        async fn copy(&self, _from: &Path, _to: &Path) -> 
object_store::Result<()> {
-            Err(object_store::Error::NotImplemented)
-        }
-
-        async fn copy_if_not_exists(
-            &self,
-            _from: &Path,
-            _to: &Path,
-        ) -> object_store::Result<()> {
-            Err(object_store::Error::NotImplemented)
-        }
-    }
-
-    #[tokio::test]
-    async fn fetch_metadata_with_size_hint() -> Result<()> {
-        let c1: ArrayRef =
-            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
-
-        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), 
None]));
-
-        let batch1 = RecordBatch::try_from_iter(vec![("c1", 
c1.clone())]).unwrap();
-        let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap();
-
-        let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
-            LocalFileSystem::new(),
-        )));
-        let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?;
-
-        // Use a size hint larger than the parquet footer but smaller than the 
actual metadata, requiring a second fetch
-        // for the remaining metadata
-        fetch_parquet_metadata(store.as_ref() as &dyn ObjectStore, &meta[0], 
Some(9))
-            .await
-            .expect("error reading metadata with hint");
-
-        assert_eq!(store.request_count(), 2);
-
-        let session = SessionContext::new();
-        let ctx = session.state();
-        let format = ParquetFormat::default().with_metadata_size_hint(Some(9));
-        let schema = format
-            .infer_schema(&ctx, &store.upcast(), &meta)
-            .await
-            .unwrap();
-
-        let stats =
-            fetch_statistics(store.upcast().as_ref(), schema.clone(), 
&meta[0], Some(9))
-                .await?;
-
-        assert_eq!(stats.num_rows, Precision::Exact(3));
-        let c1_stats = &stats.column_statistics[0];
-        let c2_stats = &stats.column_statistics[1];
-        assert_eq!(c1_stats.null_count, Precision::Exact(1));
-        assert_eq!(c2_stats.null_count, Precision::Exact(3));
-
-        let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
-            LocalFileSystem::new(),
-        )));
-
-        // Use the file size as the hint so we can get the full metadata from 
the first fetch
-        let size_hint = meta[0].size;
-
-        fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], 
Some(size_hint))
-            .await
-            .expect("error reading metadata with hint");
-
-        // ensure the requests were coalesced into a single request
-        assert_eq!(store.request_count(), 1);
-
-        let format = 
ParquetFormat::default().with_metadata_size_hint(Some(size_hint));
-        let schema = format
-            .infer_schema(&ctx, &store.upcast(), &meta)
-            .await
-            .unwrap();
-        let stats = fetch_statistics(
-            store.upcast().as_ref(),
-            schema.clone(),
-            &meta[0],
-            Some(size_hint),
-        )
-        .await?;
-
-        assert_eq!(stats.num_rows, Precision::Exact(3));
-        let c1_stats = &stats.column_statistics[0];
-        let c2_stats = &stats.column_statistics[1];
-        assert_eq!(c1_stats.null_count, Precision::Exact(1));
-        assert_eq!(c2_stats.null_count, Precision::Exact(3));
-
-        let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
-            LocalFileSystem::new(),
-        )));
-
-        // Use the a size hint larger than the file size to make sure we don't 
panic
-        let size_hint = meta[0].size + 100;
-
-        fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], 
Some(size_hint))
-            .await
-            .expect("error reading metadata with hint");
-
-        assert_eq!(store.request_count(), 1);
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn read_small_batches() -> Result<()> {
-        let config = SessionConfig::new().with_batch_size(2);
-        let session_ctx = SessionContext::new_with_config(config);
-        let state = session_ctx.state();
-        let task_ctx = state.task_ctx();
-        let projection = None;
-        let exec = get_exec(&state, "alltypes_plain.parquet", projection, 
None).await?;
-        let stream = exec.execute(0, task_ctx)?;
-
-        let tt_batches = stream
-            .map(|batch| {
-                let batch = batch.unwrap();
-                assert_eq!(11, batch.num_columns());
-                assert_eq!(2, batch.num_rows());
-            })
-            .fold(0, |acc, _| async move { acc + 1i32 })
-            .await;
-
-        assert_eq!(tt_batches, 4 /* 8/2 */);
-
-        // test metadata
-        assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8));
-        assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn capture_bytes_scanned_metric() -> Result<()> {
-        let config = SessionConfig::new().with_batch_size(2);
-        let session = SessionContext::new_with_config(config);
-        let ctx = session.state();
-
-        // Read the full file
-        let projection = None;
-        let exec = get_exec(&ctx, "alltypes_plain.parquet", projection, 
None).await?;
-
-        // Read only one column. This should scan less data.
-        let projection = Some(vec![0]);
-        let exec_projected =
-            get_exec(&ctx, "alltypes_plain.parquet", projection, None).await?;
-
-        let task_ctx = ctx.task_ctx();
-
-        let _ = collect(exec.clone(), task_ctx.clone()).await?;
-        let _ = collect(exec_projected.clone(), task_ctx).await?;
-
-        assert_bytes_scanned(exec, 671);
-        assert_bytes_scanned(exec_projected, 73);
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn read_limit() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let state = session_ctx.state();
-        let task_ctx = state.task_ctx();
-        let projection = None;
-        let exec =
-            get_exec(&state, "alltypes_plain.parquet", projection, 
Some(1)).await?;
-
-        // note: even if the limit is set, the executor rounds up to the batch 
size
-        assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8));
-        assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
-        let batches = collect(exec, task_ctx).await?;
-        assert_eq!(1, batches.len());
-        assert_eq!(11, batches[0].num_columns());
-        assert_eq!(1, batches[0].num_rows());
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn read_alltypes_plain_parquet() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let state = session_ctx.state();
-        let task_ctx = state.task_ctx();
-        let projection = None;
-        let exec = get_exec(&state, "alltypes_plain.parquet", projection, 
None).await?;
-
-        let x: Vec<String> = exec
-            .schema()
-            .fields()
-            .iter()
-            .map(|f| format!("{}: {:?}", f.name(), f.data_type()))
-            .collect();
-        let y = x.join("\n");
-        assert_eq!(
-            "id: Int32\n\
-             bool_col: Boolean\n\
-             tinyint_col: Int32\n\
-             smallint_col: Int32\n\
-             int_col: Int32\n\
-             bigint_col: Int64\n\
-             float_col: Float32\n\
-             double_col: Float64\n\
-             date_string_col: Binary\n\
-             string_col: Binary\n\
-             timestamp_col: Timestamp(Nanosecond, None)",
-            y
-        );
-
-        let batches = collect(exec, task_ctx).await?;
-
-        assert_eq!(1, batches.len());
-        assert_eq!(11, batches[0].num_columns());
-        assert_eq!(8, batches[0].num_rows());
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn read_bool_alltypes_plain_parquet() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let state = session_ctx.state();
-        let task_ctx = state.task_ctx();
-        let projection = Some(vec![1]);
-        let exec = get_exec(&state, "alltypes_plain.parquet", projection, 
None).await?;
-
-        let batches = collect(exec, task_ctx).await?;
-        assert_eq!(1, batches.len());
-        assert_eq!(1, batches[0].num_columns());
-        assert_eq!(8, batches[0].num_rows());
-
-        let array = as_boolean_array(batches[0].column(0))?;
-        let mut values: Vec<bool> = vec![];
-        for i in 0..batches[0].num_rows() {
-            values.push(array.value(i));
-        }
-
-        assert_eq!(
-            "[true, false, true, false, true, false, true, false]",
-            format!("{values:?}")
-        );
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn read_i32_alltypes_plain_parquet() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let state = session_ctx.state();
-        let task_ctx = state.task_ctx();
-        let projection = Some(vec![0]);
-        let exec = get_exec(&state, "alltypes_plain.parquet", projection, 
None).await?;
-
-        let batches = collect(exec, task_ctx).await?;
-        assert_eq!(1, batches.len());
-        assert_eq!(1, batches[0].num_columns());
-        assert_eq!(8, batches[0].num_rows());
-
-        let array = as_int32_array(batches[0].column(0))?;
-        let mut values: Vec<i32> = vec![];
-        for i in 0..batches[0].num_rows() {
-            values.push(array.value(i));
-        }
-
-        assert_eq!("[4, 5, 6, 7, 2, 3, 0, 1]", format!("{values:?}"));
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn read_i96_alltypes_plain_parquet() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let state = session_ctx.state();
-        let task_ctx = state.task_ctx();
-        let projection = Some(vec![10]);
-        let exec = get_exec(&state, "alltypes_plain.parquet", projection, 
None).await?;
-
-        let batches = collect(exec, task_ctx).await?;
-        assert_eq!(1, batches.len());
-        assert_eq!(1, batches[0].num_columns());
-        assert_eq!(8, batches[0].num_rows());
-
-        let array = as_timestamp_nanosecond_array(batches[0].column(0))?;
-        let mut values: Vec<i64> = vec![];
-        for i in 0..batches[0].num_rows() {
-            values.push(array.value(i));
-        }
-
-        assert_eq!("[1235865600000000000, 1235865660000000000, 
1238544000000000000, 1238544060000000000, 1233446400000000000, 
1233446460000000000, 1230768000000000000, 1230768060000000000]", 
format!("{values:?}"));
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn read_f32_alltypes_plain_parquet() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let state = session_ctx.state();
-        let task_ctx = state.task_ctx();
-        let projection = Some(vec![6]);
-        let exec = get_exec(&state, "alltypes_plain.parquet", projection, 
None).await?;
-
-        let batches = collect(exec, task_ctx).await?;
-        assert_eq!(1, batches.len());
-        assert_eq!(1, batches[0].num_columns());
-        assert_eq!(8, batches[0].num_rows());
-
-        let array = as_float32_array(batches[0].column(0))?;
-        let mut values: Vec<f32> = vec![];
-        for i in 0..batches[0].num_rows() {
-            values.push(array.value(i));
-        }
-
-        assert_eq!(
-            "[0.0, 1.1, 0.0, 1.1, 0.0, 1.1, 0.0, 1.1]",
-            format!("{values:?}")
-        );
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn read_f64_alltypes_plain_parquet() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let state = session_ctx.state();
-        let task_ctx = state.task_ctx();
-        let projection = Some(vec![7]);
-        let exec = get_exec(&state, "alltypes_plain.parquet", projection, 
None).await?;
-
-        let batches = collect(exec, task_ctx).await?;
-        assert_eq!(1, batches.len());
-        assert_eq!(1, batches[0].num_columns());
-        assert_eq!(8, batches[0].num_rows());
-
-        let array = as_float64_array(batches[0].column(0))?;
-        let mut values: Vec<f64> = vec![];
-        for i in 0..batches[0].num_rows() {
-            values.push(array.value(i));
-        }
-
-        assert_eq!(
-            "[0.0, 10.1, 0.0, 10.1, 0.0, 10.1, 0.0, 10.1]",
-            format!("{values:?}")
-        );
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn read_binary_alltypes_plain_parquet() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let state = session_ctx.state();
-        let task_ctx = state.task_ctx();
-        let projection = Some(vec![9]);
-        let exec = get_exec(&state, "alltypes_plain.parquet", projection, 
None).await?;
-
-        let batches = collect(exec, task_ctx).await?;
-        assert_eq!(1, batches.len());
-        assert_eq!(1, batches[0].num_columns());
-        assert_eq!(8, batches[0].num_rows());
-
-        let array = as_binary_array(batches[0].column(0))?;
-        let mut values: Vec<&str> = vec![];
-        for i in 0..batches[0].num_rows() {
-            values.push(std::str::from_utf8(array.value(i)).unwrap());
-        }
-
-        assert_eq!(
-            "[\"0\", \"1\", \"0\", \"1\", \"0\", \"1\", \"0\", \"1\"]",
-            format!("{values:?}")
-        );
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn read_decimal_parquet() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let state = session_ctx.state();
-        let task_ctx = state.task_ctx();
-
-        // parquet use the int32 as the physical type to store decimal
-        let exec = get_exec(&state, "int32_decimal.parquet", None, 
None).await?;
-        let batches = collect(exec, task_ctx.clone()).await?;
-        assert_eq!(1, batches.len());
-        assert_eq!(1, batches[0].num_columns());
-        let column = batches[0].column(0);
-        assert_eq!(&DataType::Decimal128(4, 2), column.data_type());
-
-        // parquet use the int64 as the physical type to store decimal
-        let exec = get_exec(&state, "int64_decimal.parquet", None, 
None).await?;
-        let batches = collect(exec, task_ctx.clone()).await?;
-        assert_eq!(1, batches.len());
-        assert_eq!(1, batches[0].num_columns());
-        let column = batches[0].column(0);
-        assert_eq!(&DataType::Decimal128(10, 2), column.data_type());
-
-        // parquet use the fixed length binary as the physical type to store 
decimal
-        let exec = get_exec(&state, "fixed_length_decimal.parquet", None, 
None).await?;
-        let batches = collect(exec, task_ctx.clone()).await?;
-        assert_eq!(1, batches.len());
-        assert_eq!(1, batches[0].num_columns());
-        let column = batches[0].column(0);
-        assert_eq!(&DataType::Decimal128(25, 2), column.data_type());
-
-        let exec =
-            get_exec(&state, "fixed_length_decimal_legacy.parquet", None, 
None).await?;
-        let batches = collect(exec, task_ctx.clone()).await?;
-        assert_eq!(1, batches.len());
-        assert_eq!(1, batches[0].num_columns());
-        let column = batches[0].column(0);
-        assert_eq!(&DataType::Decimal128(13, 2), column.data_type());
-
-        // parquet use the byte array as the physical type to store decimal
-        let exec = get_exec(&state, "byte_array_decimal.parquet", None, 
None).await?;
-        let batches = collect(exec, task_ctx.clone()).await?;
-        assert_eq!(1, batches.len());
-        assert_eq!(1, batches[0].num_columns());
-        let column = batches[0].column(0);
-        assert_eq!(&DataType::Decimal128(4, 2), column.data_type());
-
-        Ok(())
-    }
-    #[tokio::test]
-    async fn test_read_parquet_page_index() -> Result<()> {
-        let testdata = crate::test_util::parquet_test_data();
-        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
-        let file = File::open(path).await.unwrap();
-        let options = ArrowReaderOptions::new().with_page_index(true);
-        let builder =
-            ParquetRecordBatchStreamBuilder::new_with_options(file, 
options.clone())
-                .await
-                .unwrap()
-                .metadata()
-                .clone();
-        check_page_index_validation(builder.column_index(), 
builder.offset_index());
-
-        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
-        let file = File::open(path).await.unwrap();
-
-        let builder = ParquetRecordBatchStreamBuilder::new_with_options(file, 
options)
-            .await
-            .unwrap()
-            .metadata()
-            .clone();
-        check_page_index_validation(builder.column_index(), 
builder.offset_index());
-
-        Ok(())
-    }
-
-    fn check_page_index_validation(
-        page_index: Option<&ParquetColumnIndex>,
-        offset_index: Option<&ParquetOffsetIndex>,
-    ) {
-        assert!(page_index.is_some());
-        assert!(offset_index.is_some());
-
-        let page_index = page_index.unwrap();
-        let offset_index = offset_index.unwrap();
-
-        // there is only one row group in one file.
-        assert_eq!(page_index.len(), 1);
-        assert_eq!(offset_index.len(), 1);
-        let page_index = page_index.first().unwrap();
-        let offset_index = offset_index.first().unwrap();
-
-        // 13 col in one row group
-        assert_eq!(page_index.len(), 13);
-        assert_eq!(offset_index.len(), 13);
-
-        // test result in int_col
-        let int_col_index = page_index.get(4).unwrap();
-        let int_col_offset = offset_index.get(4).unwrap();
-
-        // 325 pages in int_col
-        assert_eq!(int_col_offset.len(), 325);
-        match int_col_index {
-            Index::INT32(index) => {
-                assert_eq!(index.indexes.len(), 325);
-                for min_max in index.clone().indexes {
-                    assert!(min_max.min.is_some());
-                    assert!(min_max.max.is_some());
-                    assert!(min_max.null_count.is_some());
-                }
-            }
-            _ => {
-                error!("fail to read page index.")
-            }
-        }
-    }
-
-    fn assert_bytes_scanned(exec: Arc<dyn ExecutionPlan>, expected: usize) {
-        let actual = exec
-            .metrics()
-            .expect("Metrics not recorded")
-            .sum(|metric| matches!(metric.value(), MetricValue::Count { name, 
.. } if name == "bytes_scanned"))
-            .map(|t| t.as_usize())
-            .expect("bytes_scanned metric not recorded");
-
-        assert_eq!(actual, expected);
-    }
-
-    async fn get_exec(
-        state: &SessionState,
-        file_name: &str,
-        projection: Option<Vec<usize>>,
-        limit: Option<usize>,
-    ) -> Result<Arc<dyn ExecutionPlan>> {
-        let testdata = crate::test_util::parquet_test_data();
-        let format = ParquetFormat::default();
-        scan_format(state, &format, &testdata, file_name, projection, 
limit).await
-    }
-}
+// #[cfg(test)]

Review Comment:
   Why these tests are commented out



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to