This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new 106197d2 fix: missing filter index over the primary keys (#1456)
106197d2 is described below

commit 106197d25d8e42df56f82aa27940b396a8becda7
Author: WEI Xikai <[email protected]>
AuthorDate: Tue Jan 23 15:42:17 2024 +0800

    fix: missing filter index over the primary keys (#1456)
    
    ## Rationale
    Current filter index won't be created over the primary key columns, and
    it leads to unexpected low prune rate in some cases.
    
    ## Detailed Changes
    - Separate the filter module as a sub-module of the meta_data
    - Create filter index over the primary key columns except the tsid and
    timestamp column.
    
    ## Test Plan
    Should pass the CI.
---
 .../src/sst/parquet/async_reader.rs                |   2 +-
 .../parquet/{meta_data.rs => meta_data/filter.rs}  | 259 ++-------------------
 .../src/sst/parquet/meta_data/mod.rs               | 247 ++++++++++++++++++++
 .../src/sst/parquet/row_group_pruner.rs            |   2 +-
 src/analytic_engine/src/sst/parquet/writer.rs      |  25 +-
 5 files changed, 280 insertions(+), 255 deletions(-)

diff --git a/src/analytic_engine/src/sst/parquet/async_reader.rs 
b/src/analytic_engine/src/sst/parquet/async_reader.rs
index c56836c9..94feeab2 100644
--- a/src/analytic_engine/src/sst/parquet/async_reader.rs
+++ b/src/analytic_engine/src/sst/parquet/async_reader.rs
@@ -71,7 +71,7 @@ use crate::{
         metrics::MaybeTableLevelMetrics,
         parquet::{
             encoding::ParquetDecoder,
-            meta_data::{ColumnValueSet, ParquetFilter},
+            meta_data::{filter::ParquetFilter, ColumnValueSet},
             row_group_pruner::RowGroupPruner,
         },
         reader::{error::*, Result, SstReader},
diff --git a/src/analytic_engine/src/sst/parquet/meta_data.rs 
b/src/analytic_engine/src/sst/parquet/meta_data/filter.rs
similarity index 59%
rename from src/analytic_engine/src/sst/parquet/meta_data.rs
rename to src/analytic_engine/src/sst/parquet/meta_data/filter.rs
index 4c8b51ce..d64ad974 100644
--- a/src/analytic_engine/src/sst/parquet/meta_data.rs
+++ b/src/analytic_engine/src/sst/parquet/meta_data/filter.rs
@@ -15,66 +15,21 @@
 // specific language governing permissions and limitations
 // under the License.
 
-// MetaData for SST based on parquet.
-
-use std::{collections::HashSet, fmt, ops::Index, sync::Arc};
-
-use bytes_ext::Bytes;
-use common_types::{
-    datum::DatumKind,
-    schema::{RecordSchemaWithKey, Schema},
-    time::TimeRange,
-    SequenceNumber,
-};
-use horaedbproto::{schema as schema_pb, sst as sst_pb};
-use macros::define_result;
-use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
-use xorfilter::xor8::{Xor8, Xor8Builder};
+// TODO: Better module name should be index.
 
-use crate::sst::writer::MetaData;
-
-/// Error of sst file.
-#[derive(Debug, Snafu)]
-pub enum Error {
-    #[snafu(display("Time range is not found.\nBacktrace\n:{}", backtrace))]
-    TimeRangeNotFound { backtrace: Backtrace },
-
-    #[snafu(display("Table schema is not found.\nBacktrace\n:{}", backtrace))]
-    TableSchemaNotFound { backtrace: Backtrace },
-
-    #[snafu(display(
-        "Failed to parse Xor8Filter from bytes, err:{}.\nBacktrace\n:{}",
-        source,
-        backtrace
-    ))]
-    ParseXor8Filter {
-        source: std::io::Error,
-        backtrace: Backtrace,
-    },
-
-    #[snafu(display(
-        "Failed to build Xor8Filter, err:{}.\nBacktrace\n:{}",
-        source,
-        backtrace
-    ))]
-    BuildXor8Filter {
-        source: xorfilter::Error,
-        backtrace: Backtrace,
-    },
-
-    #[snafu(display("Failed to convert time range, err:{}", source))]
-    ConvertTimeRange { source: common_types::time::Error },
-
-    #[snafu(display("Failed to convert table schema, err:{}", source))]
-    ConvertTableSchema { source: common_types::schema::Error },
-}
+use std::{fmt, ops::Index};
 
-define_result!(Error);
+use common_types::{datum::DatumKind, schema::Schema};
+use horaedbproto::sst as sst_pb;
+use snafu::ResultExt;
+use xorfilter::xor8::{Xor8, Xor8Builder};
 
+use crate::sst::parquet::meta_data::{BuildXor8Filter, Error, ParseXor8Filter, 
Result};
+
+// TODO: move this to sst module, and add a FilterBuild trait
 /// Filter can be used to test whether an element is a member of a set.
 /// False positive matches are possible if space-efficient probabilistic data
 /// structure are used.
-// TODO: move this to sst module, and add a FilterBuild trait
 trait Filter: fmt::Debug {
     fn r#type(&self) -> FilterType;
 
@@ -89,7 +44,7 @@ trait Filter: fmt::Debug {
         self.to_bytes().len()
     }
 
-    /// Deserialize the binary array to bitmap index.
+    /// Deserialize the binary array to specific filter.
     fn from_bytes(buf: Vec<u8>) -> Result<Self>
     where
         Self: Sized;
@@ -140,13 +95,19 @@ pub struct RowGroupFilterBuilder {
 }
 
 impl RowGroupFilterBuilder {
-    pub(crate) fn new(record_schema: &RecordSchemaWithKey) -> Self {
-        let builders = record_schema
+    pub(crate) fn new(schema: &Schema) -> Self {
+        let builders = schema
             .columns()
             .iter()
             .enumerate()
             .map(|(i, col)| {
-                if record_schema.is_primary_key_index(i) {
+                // No need to create filter index over the timestamp column.
+                if schema.timestamp_index() == i {
+                    return None;
+                }
+
+                // No need to create filter index over the tsid column.
+                if schema.index_of_tsid().map(|idx| idx == i).unwrap_or(false) 
{
                     return None;
                 }
 
@@ -340,185 +301,6 @@ impl TryFrom<sst_pb::ParquetFilter> for ParquetFilter {
     }
 }
 
-/// Meta data of a sst file
-#[derive(Clone, PartialEq)]
-pub struct ParquetMetaData {
-    pub min_key: Bytes,
-    pub max_key: Bytes,
-    /// Time Range of the sst
-    pub time_range: TimeRange,
-    /// Max sequence number in the sst
-    pub max_sequence: SequenceNumber,
-    pub schema: Schema,
-    pub parquet_filter: Option<ParquetFilter>,
-    pub column_values: Option<Vec<Option<ColumnValueSet>>>,
-}
-
-pub type ParquetMetaDataRef = Arc<ParquetMetaData>;
-
-impl From<&MetaData> for ParquetMetaData {
-    fn from(meta: &MetaData) -> Self {
-        Self {
-            min_key: meta.min_key.clone(),
-            max_key: meta.max_key.clone(),
-            time_range: meta.time_range,
-            max_sequence: meta.max_sequence,
-            schema: meta.schema.clone(),
-            parquet_filter: None,
-            column_values: None,
-        }
-    }
-}
-
-impl From<ParquetMetaData> for MetaData {
-    fn from(meta: ParquetMetaData) -> Self {
-        Self {
-            min_key: meta.min_key,
-            max_key: meta.max_key,
-            time_range: meta.time_range,
-            max_sequence: meta.max_sequence,
-            schema: meta.schema,
-        }
-    }
-}
-
-impl From<Arc<ParquetMetaData>> for MetaData {
-    fn from(meta: Arc<ParquetMetaData>) -> Self {
-        Self {
-            min_key: meta.min_key.clone(),
-            max_key: meta.max_key.clone(),
-            time_range: meta.time_range,
-            max_sequence: meta.max_sequence,
-            schema: meta.schema.clone(),
-        }
-    }
-}
-
-impl fmt::Debug for ParquetMetaData {
-    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        f.debug_struct("ParquetMetaData")
-            .field("min_key", &hex::encode(&self.min_key))
-            .field("max_key", &hex::encode(&self.max_key))
-            .field("time_range", &self.time_range)
-            .field("max_sequence", &self.max_sequence)
-            .field("schema", &self.schema)
-            .field("column_values", &self.column_values)
-            .field(
-                "filter_size",
-                &self
-                    .parquet_filter
-                    .as_ref()
-                    .map(|filter| filter.size())
-                    .unwrap_or(0),
-            )
-            .finish()
-    }
-}
-
-impl From<ParquetMetaData> for sst_pb::ParquetMetaData {
-    fn from(src: ParquetMetaData) -> Self {
-        let column_values = if let Some(v) = src.column_values {
-            v.into_iter()
-                .map(|col| sst_pb::ColumnValueSet {
-                    value: col.map(|col| col.into()),
-                })
-                .collect()
-        } else {
-            Vec::new()
-        };
-        sst_pb::ParquetMetaData {
-            min_key: src.min_key.to_vec(),
-            max_key: src.max_key.to_vec(),
-            max_sequence: src.max_sequence,
-            time_range: Some(src.time_range.into()),
-            schema: Some(schema_pb::TableSchema::from(&src.schema)),
-            filter: src.parquet_filter.map(|v| v.into()),
-            // collapsible_cols_idx is used in hybrid format ,and it's 
deprecated.
-            collapsible_cols_idx: Vec::new(),
-            column_values,
-        }
-    }
-}
-
-impl TryFrom<sst_pb::ParquetMetaData> for ParquetMetaData {
-    type Error = Error;
-
-    fn try_from(src: sst_pb::ParquetMetaData) -> Result<Self> {
-        let time_range = {
-            let time_range = src.time_range.context(TimeRangeNotFound)?;
-            TimeRange::try_from(time_range).context(ConvertTimeRange)?
-        };
-        let schema = {
-            let schema = src.schema.context(TableSchemaNotFound)?;
-            Schema::try_from(schema).context(ConvertTableSchema)?
-        };
-        let parquet_filter = 
src.filter.map(ParquetFilter::try_from).transpose()?;
-        let column_values = if src.column_values.is_empty() {
-            // Old version sst don't has this, so set to none.
-            None
-        } else {
-            Some(
-                src.column_values
-                    .into_iter()
-                    .map(|v| v.value.map(|v| v.into()))
-                    .collect(),
-            )
-        };
-
-        Ok(Self {
-            min_key: src.min_key.into(),
-            max_key: src.max_key.into(),
-            time_range,
-            max_sequence: src.max_sequence,
-            schema,
-            parquet_filter,
-            column_values,
-        })
-    }
-}
-
-#[derive(Debug, PartialEq, Clone)]
-pub enum ColumnValueSet {
-    StringValue(HashSet<String>),
-}
-
-impl ColumnValueSet {
-    pub fn is_empty(&self) -> bool {
-        match self {
-            Self::StringValue(sv) => sv.is_empty(),
-        }
-    }
-
-    pub fn len(&self) -> usize {
-        match self {
-            Self::StringValue(sv) => sv.len(),
-        }
-    }
-}
-
-impl From<ColumnValueSet> for sst_pb::column_value_set::Value {
-    fn from(value: ColumnValueSet) -> Self {
-        match value {
-            ColumnValueSet::StringValue(values) => {
-                let values = values.into_iter().collect();
-                
sst_pb::column_value_set::Value::StringSet(sst_pb::column_value_set::StringSet {
-                    values,
-                })
-            }
-        }
-    }
-}
-
-impl From<sst_pb::column_value_set::Value> for ColumnValueSet {
-    fn from(value: sst_pb::column_value_set::Value) -> Self {
-        match value {
-            sst_pb::column_value_set::Value::StringSet(ss) => {
-                ColumnValueSet::StringValue(HashSet::from_iter(ss.values))
-            }
-        }
-    }
-}
-
 #[cfg(test)]
 mod tests {
     use common_types::tests::build_schema;
@@ -569,8 +351,7 @@ mod tests {
     fn test_row_group_filter_builder() {
         // (key1(varbinary), key2(timestamp), field1(double), field2(string))
         let schema = build_schema();
-        let record_schema = schema.to_record_schema_with_key();
-        let mut builders = RowGroupFilterBuilder::new(&record_schema);
+        let mut builders = RowGroupFilterBuilder::new(&schema);
         for key in ["host-123", "host-456", "host-789"] {
             builders.add_key(3, key.as_bytes());
         }
diff --git a/src/analytic_engine/src/sst/parquet/meta_data/mod.rs 
b/src/analytic_engine/src/sst/parquet/meta_data/mod.rs
new file mode 100644
index 00000000..0120d649
--- /dev/null
+++ b/src/analytic_engine/src/sst/parquet/meta_data/mod.rs
@@ -0,0 +1,247 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// MetaData for SST based on parquet.
+
+use std::{collections::HashSet, fmt, sync::Arc};
+
+use bytes_ext::Bytes;
+use common_types::{schema::Schema, time::TimeRange, SequenceNumber};
+use horaedbproto::{schema as schema_pb, sst as sst_pb};
+use macros::define_result;
+use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
+
+use crate::sst::{parquet::meta_data::filter::ParquetFilter, writer::MetaData};
+
+pub mod filter;
+
+/// Error of sst file.
+#[derive(Debug, Snafu)]
+pub enum Error {
+    #[snafu(display("Time range is not found.\nBacktrace\n:{}", backtrace))]
+    TimeRangeNotFound { backtrace: Backtrace },
+
+    #[snafu(display("Table schema is not found.\nBacktrace\n:{}", backtrace))]
+    TableSchemaNotFound { backtrace: Backtrace },
+
+    #[snafu(display(
+        "Failed to parse Xor8Filter from bytes, err:{}.\nBacktrace\n:{}",
+        source,
+        backtrace
+    ))]
+    ParseXor8Filter {
+        source: std::io::Error,
+        backtrace: Backtrace,
+    },
+
+    #[snafu(display(
+        "Failed to build Xor8Filter, err:{}.\nBacktrace\n:{}",
+        source,
+        backtrace
+    ))]
+    BuildXor8Filter {
+        source: xorfilter::Error,
+        backtrace: Backtrace,
+    },
+
+    #[snafu(display("Failed to convert time range, err:{}", source))]
+    ConvertTimeRange { source: common_types::time::Error },
+
+    #[snafu(display("Failed to convert table schema, err:{}", source))]
+    ConvertTableSchema { source: common_types::schema::Error },
+}
+
+define_result!(Error);
+
+/// Meta data of a sst file
+#[derive(Clone, PartialEq)]
+pub struct ParquetMetaData {
+    pub min_key: Bytes,
+    pub max_key: Bytes,
+    /// Time Range of the sst
+    pub time_range: TimeRange,
+    /// Max sequence number in the sst
+    pub max_sequence: SequenceNumber,
+    pub schema: Schema,
+    pub parquet_filter: Option<ParquetFilter>,
+    pub column_values: Option<Vec<Option<ColumnValueSet>>>,
+}
+
+pub type ParquetMetaDataRef = Arc<ParquetMetaData>;
+
+impl From<&MetaData> for ParquetMetaData {
+    fn from(meta: &MetaData) -> Self {
+        Self {
+            min_key: meta.min_key.clone(),
+            max_key: meta.max_key.clone(),
+            time_range: meta.time_range,
+            max_sequence: meta.max_sequence,
+            schema: meta.schema.clone(),
+            parquet_filter: None,
+            column_values: None,
+        }
+    }
+}
+
+impl From<ParquetMetaData> for MetaData {
+    fn from(meta: ParquetMetaData) -> Self {
+        Self {
+            min_key: meta.min_key,
+            max_key: meta.max_key,
+            time_range: meta.time_range,
+            max_sequence: meta.max_sequence,
+            schema: meta.schema,
+        }
+    }
+}
+
+impl From<Arc<ParquetMetaData>> for MetaData {
+    fn from(meta: Arc<ParquetMetaData>) -> Self {
+        Self {
+            min_key: meta.min_key.clone(),
+            max_key: meta.max_key.clone(),
+            time_range: meta.time_range,
+            max_sequence: meta.max_sequence,
+            schema: meta.schema.clone(),
+        }
+    }
+}
+
+impl fmt::Debug for ParquetMetaData {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("ParquetMetaData")
+            .field("min_key", &hex::encode(&self.min_key))
+            .field("max_key", &hex::encode(&self.max_key))
+            .field("time_range", &self.time_range)
+            .field("max_sequence", &self.max_sequence)
+            .field("schema", &self.schema)
+            .field("column_values", &self.column_values)
+            .field(
+                "filter_size",
+                &self
+                    .parquet_filter
+                    .as_ref()
+                    .map(|filter| filter.size())
+                    .unwrap_or(0),
+            )
+            .finish()
+    }
+}
+
+impl From<ParquetMetaData> for sst_pb::ParquetMetaData {
+    fn from(src: ParquetMetaData) -> Self {
+        let column_values = if let Some(v) = src.column_values {
+            v.into_iter()
+                .map(|col| sst_pb::ColumnValueSet {
+                    value: col.map(|col| col.into()),
+                })
+                .collect()
+        } else {
+            Vec::new()
+        };
+        sst_pb::ParquetMetaData {
+            min_key: src.min_key.to_vec(),
+            max_key: src.max_key.to_vec(),
+            max_sequence: src.max_sequence,
+            time_range: Some(src.time_range.into()),
+            schema: Some(schema_pb::TableSchema::from(&src.schema)),
+            filter: src.parquet_filter.map(|v| v.into()),
+            // collapsible_cols_idx is used in hybrid format ,and it's 
deprecated.
+            collapsible_cols_idx: Vec::new(),
+            column_values,
+        }
+    }
+}
+
+impl TryFrom<sst_pb::ParquetMetaData> for ParquetMetaData {
+    type Error = Error;
+
+    fn try_from(src: sst_pb::ParquetMetaData) -> Result<Self> {
+        let time_range = {
+            let time_range = src.time_range.context(TimeRangeNotFound)?;
+            TimeRange::try_from(time_range).context(ConvertTimeRange)?
+        };
+        let schema = {
+            let schema = src.schema.context(TableSchemaNotFound)?;
+            Schema::try_from(schema).context(ConvertTableSchema)?
+        };
+        let parquet_filter = 
src.filter.map(ParquetFilter::try_from).transpose()?;
+        let column_values = if src.column_values.is_empty() {
+            // Old version sst don't has this, so set to none.
+            None
+        } else {
+            Some(
+                src.column_values
+                    .into_iter()
+                    .map(|v| v.value.map(|v| v.into()))
+                    .collect(),
+            )
+        };
+
+        Ok(Self {
+            min_key: src.min_key.into(),
+            max_key: src.max_key.into(),
+            time_range,
+            max_sequence: src.max_sequence,
+            schema,
+            parquet_filter,
+            column_values,
+        })
+    }
+}
+
+#[derive(Debug, PartialEq, Clone)]
+pub enum ColumnValueSet {
+    StringValue(HashSet<String>),
+}
+
+impl ColumnValueSet {
+    pub fn is_empty(&self) -> bool {
+        match self {
+            Self::StringValue(sv) => sv.is_empty(),
+        }
+    }
+
+    pub fn len(&self) -> usize {
+        match self {
+            Self::StringValue(sv) => sv.len(),
+        }
+    }
+}
+
+impl From<ColumnValueSet> for sst_pb::column_value_set::Value {
+    fn from(value: ColumnValueSet) -> Self {
+        match value {
+            ColumnValueSet::StringValue(values) => {
+                let values = values.into_iter().collect();
+                
sst_pb::column_value_set::Value::StringSet(sst_pb::column_value_set::StringSet {
+                    values,
+                })
+            }
+        }
+    }
+}
+
+impl From<sst_pb::column_value_set::Value> for ColumnValueSet {
+    fn from(value: sst_pb::column_value_set::Value) -> Self {
+        match value {
+            sst_pb::column_value_set::Value::StringSet(ss) => {
+                ColumnValueSet::StringValue(HashSet::from_iter(ss.values))
+            }
+        }
+    }
+}
diff --git a/src/analytic_engine/src/sst/parquet/row_group_pruner.rs 
b/src/analytic_engine/src/sst/parquet/row_group_pruner.rs
index a101ff05..3aa0c43c 100644
--- a/src/analytic_engine/src/sst/parquet/row_group_pruner.rs
+++ b/src/analytic_engine/src/sst/parquet/row_group_pruner.rs
@@ -40,7 +40,7 @@ use snafu::ensure;
 use trace_metric::{MetricsCollector, TraceMetricWhenDrop};
 
 use crate::sst::{
-    parquet::meta_data::{ColumnValueSet, ParquetFilter},
+    parquet::meta_data::{filter::ParquetFilter, ColumnValueSet},
     reader::error::{OtherNoCause, Result},
 };
 
diff --git a/src/analytic_engine/src/sst/parquet/writer.rs 
b/src/analytic_engine/src/sst/parquet/writer.rs
index 5be8712c..88293497 100644
--- a/src/analytic_engine/src/sst/parquet/writer.rs
+++ b/src/analytic_engine/src/sst/parquet/writer.rs
@@ -21,7 +21,8 @@ use std::collections::{HashMap, HashSet};
 
 use async_trait::async_trait;
 use common_types::{
-    datum::DatumKind, record_batch::FetchedRecordBatch, request_id::RequestId, 
time::TimeRange,
+    datum::DatumKind, record_batch::FetchedRecordBatch, request_id::RequestId, 
schema::Schema,
+    time::TimeRange,
 };
 use datafusion::parquet::basic::Compression;
 use futures::StreamExt;
@@ -39,14 +40,13 @@ use crate::{
         parquet::{
             encoding::{encode_sst_meta_data, ColumnEncoding, EncodeOptions, 
ParquetEncoder},
             meta_data::{
-                ColumnValueSet, ParquetFilter, ParquetMetaData, RowGroupFilter,
-                RowGroupFilterBuilder,
+                filter::{ParquetFilter, RowGroupFilter, RowGroupFilterBuilder},
+                ColumnValueSet, ParquetMetaData,
             },
         },
         writer::{
-            self, BuildParquetFilter, BuildParquetFilterNoCause, EncodePbData, 
EncodeRecordBatch,
-            ExpectTimestampColumn, Io, MetaData, PollRecordBatch, 
RecordBatchStream, Result,
-            SstInfo, SstWriter, Storage,
+            self, BuildParquetFilter, EncodePbData, EncodeRecordBatch, 
ExpectTimestampColumn, Io,
+            MetaData, PollRecordBatch, RecordBatchStream, Result, SstInfo, 
SstWriter, Storage,
         },
     },
     table::sst_util,
@@ -237,15 +237,10 @@ impl<'a> RecordBatchGroupWriter<'a> {
     /// Build the parquet filter for the given `row_group`.
     fn build_row_group_filter(
         &self,
+        schema: &Schema,
         row_group_batch: &[FetchedRecordBatch],
     ) -> Result<RowGroupFilter> {
-        let schema_with_key =
-            row_group_batch[0]
-                .schema_with_key()
-                .with_context(|| BuildParquetFilterNoCause {
-                    msg: "primary key indexes not exist",
-                })?;
-        let mut builder = RowGroupFilterBuilder::new(&schema_with_key);
+        let mut builder = RowGroupFilterBuilder::new(schema);
 
         for partial_batch in row_group_batch {
             for (col_idx, column) in 
partial_batch.columns().iter().enumerate() {
@@ -356,7 +351,9 @@ impl<'a> RecordBatchGroupWriter<'a> {
         let timestamp_index = self.meta_data.schema.timestamp_index();
         while !row_group.is_empty() {
             if let Some(filter) = &mut parquet_filter {
-                
filter.push_row_group_filter(self.build_row_group_filter(&row_group)?);
+                filter.push_row_group_filter(
+                    self.build_row_group_filter(&self.meta_data.schema, 
&row_group)?,
+                );
             }
 
             let num_batches = row_group.len();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to