This is an automated email from the ASF dual-hosted git repository.

tanruixiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9619810a feat: utilize the column cardinality for deciding whether to 
do dict (#1372)
9619810a is described below

commit 9619810aa558ae79ba474144dfaef591c5d9ef55
Author: WEI Xikai <[email protected]>
AuthorDate: Wed Dec 20 16:05:40 2023 +0800

    feat: utilize the column cardinality for deciding whether to do dict (#1372)
    
    ## Rationale
    The column value set has been summarized into the meta data if the
    column is low distinct. With such information, the sampling for the
    columns can be skipped.
    
    ## Detailed Changes
    Skip sampling over the low-cardinality columns.
    
    ## Test Plan
    Updated the unit tests.
---
 analytic_engine/src/compaction/scheduler.rs      |   1 +
 analytic_engine/src/instance/flush_compaction.rs | 146 +++++++++++++++++++--
 analytic_engine/src/sst/factory.rs               |  21 ++-
 analytic_engine/src/sst/meta_data/mod.rs         |  11 +-
 analytic_engine/src/sst/parquet/encoding.rs      |   2 +-
 analytic_engine/src/sst/parquet/writer.rs        | 159 +++++++++++++++++------
 benchmarks/src/sst_tools.rs                      |   1 +
 tools/src/bin/sst-convert.rs                     |   1 +
 8 files changed, 281 insertions(+), 61 deletions(-)

diff --git a/analytic_engine/src/compaction/scheduler.rs 
b/analytic_engine/src/compaction/scheduler.rs
index 1496ef51..72fe2f8b 100644
--- a/analytic_engine/src/compaction/scheduler.rs
+++ b/analytic_engine/src/compaction/scheduler.rs
@@ -505,6 +505,7 @@ impl ScheduleWorker {
             num_rows_per_row_group: 
table_data.table_options().num_rows_per_row_group,
             compression: table_data.table_options().compression,
             max_buffer_size: self.write_sst_max_buffer_size,
+            column_stats: Default::default(),
         };
         let scan_options = self.scan_options.clone();
 
diff --git a/analytic_engine/src/instance/flush_compaction.rs 
b/analytic_engine/src/instance/flush_compaction.rs
index 81ac53d7..cbede944 100644
--- a/analytic_engine/src/instance/flush_compaction.rs
+++ b/analytic_engine/src/instance/flush_compaction.rs
@@ -14,7 +14,12 @@
 
 // Flush and compaction logic of instance
 
-use std::{cmp, collections::Bound, fmt, sync::Arc};
+use std::{
+    cmp,
+    collections::{Bound, HashMap},
+    fmt,
+    sync::Arc,
+};
 
 use common_types::{
     projected_schema::ProjectedSchema,
@@ -55,9 +60,9 @@ use crate::{
         IterOptions,
     },
     sst::{
-        factory::{self, ScanOptions, SstWriteOptions},
+        factory::{self, ColumnStats, ScanOptions, SstWriteOptions},
         file::{FileMeta, Level},
-        meta_data::SstMetaReader,
+        meta_data::{SstMetaData, SstMetaReader},
         writer::{MetaData, RecordBatchStream},
     },
     table::{
@@ -541,6 +546,7 @@ impl FlushTask {
             num_rows_per_row_group: 
self.table_data.table_options().num_rows_per_row_group,
             compression: self.table_data.table_options().compression,
             max_buffer_size: self.write_sst_max_buffer_size,
+            column_stats: Default::default(),
         };
 
         for time_range in &time_ranges {
@@ -712,6 +718,7 @@ impl FlushTask {
             num_rows_per_row_group: 
self.table_data.table_options().num_rows_per_row_group,
             compression: self.table_data.table_options().compression,
             max_buffer_size: self.write_sst_max_buffer_size,
+            column_stats: Default::default(),
         };
         let mut writer = self
             .space_store
@@ -943,7 +950,7 @@ impl SpaceStore {
             row_iter::record_batch_with_key_iter_to_stream(merge_iter)
         };
 
-        let sst_meta = {
+        let (sst_meta, column_stats) = {
             let meta_reader = SstMetaReader {
                 space_id: table_data.space_id,
                 table_id: table_data.id,
@@ -956,7 +963,9 @@ impl SpaceStore {
                 .await
                 .context(ReadSstMeta)?;
 
-            MetaData::merge(sst_metas.into_iter().map(MetaData::from), schema)
+            let column_stats = 
collect_column_stats_from_meta_datas(&sst_metas);
+            let merged_meta = 
MetaData::merge(sst_metas.into_iter().map(MetaData::from), schema);
+            (merged_meta, column_stats)
         };
 
         // Alloc file id for the merged sst.
@@ -966,10 +975,17 @@ impl SpaceStore {
             .context(AllocFileId)?;
 
         let sst_file_path = table_data.set_sst_file_path(file_id);
+        let write_options = SstWriteOptions {
+            storage_format_hint: sst_write_options.storage_format_hint,
+            num_rows_per_row_group: sst_write_options.num_rows_per_row_group,
+            compression: sst_write_options.compression,
+            max_buffer_size: sst_write_options.max_buffer_size,
+            column_stats,
+        };
         let mut sst_writer = self
             .sst_factory
             .create_writer(
-                sst_write_options,
+                &write_options,
                 &sst_file_path,
                 self.store_picker(),
                 input.output_level,
@@ -1062,6 +1078,42 @@ impl SpaceStore {
     }
 }
 
+/// Collect the column stats from a batch of sst meta data.
+fn collect_column_stats_from_meta_datas(metas: &[SstMetaData]) -> 
HashMap<String, ColumnStats> {
+    let mut low_cardinality_counts: HashMap<String, usize> = HashMap::new();
+    for meta_data in metas {
+        let SstMetaData::Parquet(meta_data) = meta_data;
+        if let Some(column_values) = &meta_data.column_values {
+            for (col_idx, val_set) in column_values.iter().enumerate() {
+                let low_cardinality = val_set.is_some();
+                if low_cardinality {
+                    let col_name = 
meta_data.schema.column(col_idx).name.clone();
+                    low_cardinality_counts
+                        .entry(col_name)
+                        .and_modify(|v| *v += 1)
+                        .or_insert(1);
+                }
+            }
+        }
+    }
+
+    // Only the column whose cardinality is low in all the metas is a
+    // low-cardinality column.
+    // TODO: shall we merge all the distinct values of the column to check 
whether
+    // the cardinality is still thought to be low?
+    let low_cardinality_cols = low_cardinality_counts
+        .into_iter()
+        .filter_map(|(col_name, cnt)| {
+            (cnt == metas.len()).then_some((
+                col_name,
+                ColumnStats {
+                    low_cardinality: true,
+                },
+            ))
+        });
+    HashMap::from_iter(low_cardinality_cols)
+}
+
 fn split_record_batch_with_time_ranges(
     record_batch: RecordBatchWithKey,
     time_ranges: &[TimeRange],
@@ -1126,15 +1178,26 @@ fn build_mem_table_iter(
 
 #[cfg(test)]
 mod tests {
+    use std::sync::Arc;
+
+    use bytes_ext::Bytes;
     use common_types::{
+        schema::Schema,
         tests::{
-            build_record_batch_with_key_by_rows, build_row, build_row_opt,
+            build_record_batch_with_key_by_rows, build_row, build_row_opt, 
build_schema,
             check_record_batch_with_key_with_rows,
         },
         time::TimeRange,
     };
 
-    use crate::instance::flush_compaction::split_record_batch_with_time_ranges;
+    use super::collect_column_stats_from_meta_datas;
+    use crate::{
+        instance::flush_compaction::split_record_batch_with_time_ranges,
+        sst::{
+            meta_data::SstMetaData,
+            parquet::meta_data::{ColumnValueSet, ParquetMetaData},
+        },
+    };
 
     #[test]
     fn test_split_record_batch_with_time_ranges() {
@@ -1187,4 +1250,71 @@ mod tests {
         check_record_batch_with_key_with_rows(&rets[1], rows1.len(), 
column_num, rows1);
         check_record_batch_with_key_with_rows(&rets[2], rows2.len(), 
column_num, rows2);
     }
+
+    fn check_collect_column_stats(
+        schema: &Schema,
+        expected_low_cardinality_col_indexes: Vec<usize>,
+        meta_datas: Vec<SstMetaData>,
+    ) {
+        let column_stats = collect_column_stats_from_meta_datas(&meta_datas);
+        assert_eq!(
+            column_stats.len(),
+            expected_low_cardinality_col_indexes.len()
+        );
+
+        for col_idx in expected_low_cardinality_col_indexes {
+            let col_schema = schema.column(col_idx);
+            assert!(column_stats.contains_key(&col_schema.name));
+        }
+    }
+
+    #[test]
+    fn test_collect_column_stats_from_metadata() {
+        let schema = build_schema();
+        let build_meta_data = |low_cardinality_col_indexes: Vec<usize>| {
+            let mut column_values = vec![None; 6];
+            for idx in low_cardinality_col_indexes {
+                column_values[idx] = 
Some(ColumnValueSet::StringValue(Default::default()));
+            }
+            let parquet_meta_data = ParquetMetaData {
+                min_key: Bytes::new(),
+                max_key: Bytes::new(),
+                time_range: TimeRange::empty(),
+                max_sequence: 0,
+                schema: schema.clone(),
+                parquet_filter: None,
+                column_values: Some(column_values),
+            };
+            SstMetaData::Parquet(Arc::new(parquet_meta_data))
+        };
+
+        // Normal case 0
+        let meta_datas = vec![
+            build_meta_data(vec![0]),
+            build_meta_data(vec![0]),
+            build_meta_data(vec![0, 1]),
+            build_meta_data(vec![0, 2]),
+            build_meta_data(vec![0, 3]),
+        ];
+        check_collect_column_stats(&schema, vec![0], meta_datas);
+
+        // Normal case 1
+        let meta_datas = vec![
+            build_meta_data(vec![0]),
+            build_meta_data(vec![0]),
+            build_meta_data(vec![]),
+            build_meta_data(vec![1]),
+            build_meta_data(vec![3]),
+        ];
+        check_collect_column_stats(&schema, vec![], meta_datas);
+
+        // Normal case 2
+        let meta_datas = vec![
+            build_meta_data(vec![3, 5]),
+            build_meta_data(vec![0, 3, 5]),
+            build_meta_data(vec![0, 1, 2, 3, 5]),
+            build_meta_data(vec![1, 3, 5]),
+        ];
+        check_collect_column_stats(&schema, vec![3, 5], meta_datas);
+    }
 }
diff --git a/analytic_engine/src/sst/factory.rs 
b/analytic_engine/src/sst/factory.rs
index 8a585d86..8d507c6e 100644
--- a/analytic_engine/src/sst/factory.rs
+++ b/analytic_engine/src/sst/factory.rs
@@ -14,7 +14,7 @@
 
 //! Factory for different kinds sst writer and reader.
 
-use std::{fmt::Debug, sync::Arc};
+use std::{collections::HashMap, fmt::Debug, sync::Arc};
 
 use async_trait::async_trait;
 use common_types::projected_schema::ProjectedSchema;
@@ -25,6 +25,7 @@ use snafu::{ResultExt, Snafu};
 use table_engine::predicate::PredicateRef;
 use trace_metric::MetricsCollector;
 
+use super::parquet::encoding::ColumnEncoding;
 use crate::{
     sst::{
         file::Level,
@@ -146,6 +147,10 @@ pub struct SstReadOptions {
 
     pub runtime: Arc<Runtime>,
 }
+#[derive(Clone, Debug)]
+pub struct ColumnStats {
+    pub low_cardinality: bool,
+}
 
 #[derive(Debug, Clone)]
 pub struct SstWriteOptions {
@@ -153,6 +158,15 @@ pub struct SstWriteOptions {
     pub num_rows_per_row_group: usize,
     pub compression: Compression,
     pub max_buffer_size: usize,
+    pub column_stats: HashMap<String, ColumnStats>,
+}
+
+impl From<&ColumnStats> for ColumnEncoding {
+    fn from(value: &ColumnStats) -> Self {
+        ColumnEncoding {
+            enable_dict: value.low_cardinality,
+        }
+    }
 }
 
 #[derive(Debug, Default)]
@@ -203,11 +217,16 @@ impl Factory for FactoryImpl {
         store_picker: &'a ObjectStorePickerRef,
         level: Level,
     ) -> Result<Box<dyn SstWriter + Send + 'a>> {
+        let column_encodings =
+            HashMap::from_iter(options.column_stats.iter().map(|(col_name, 
col_stats)| {
+                (col_name.to_owned(), ColumnEncoding::from(col_stats))
+            }));
         let write_options = WriteOptions {
             num_rows_per_row_group: options.num_rows_per_row_group,
             max_buffer_size: options.max_buffer_size,
             compression: options.compression.into(),
             sst_level: level,
+            column_encodings,
         };
         Ok(Box::new(ParquetSstWriter::new(
             path,
diff --git a/analytic_engine/src/sst/meta_data/mod.rs 
b/analytic_engine/src/sst/meta_data/mod.rs
index 7e80afb4..e053ba4f 100644
--- a/analytic_engine/src/sst/meta_data/mod.rs
+++ b/analytic_engine/src/sst/meta_data/mod.rs
@@ -57,7 +57,7 @@ pub enum Error {
     #[snafu(display("Key value meta path in parquet is empty\nBacktrace\n:{}", 
backtrace))]
     KvMetaPathEmpty { backtrace: Backtrace },
 
-    #[snafu(display("Unknown mata version, value:{}.\nBacktrace\n:{}", 
version, backtrace))]
+    #[snafu(display("Unknown meta version, value:{}.\nBacktrace\n:{}", 
version, backtrace))]
     UnknownMetaVersion {
         version: String,
         backtrace: Backtrace,
@@ -66,9 +66,6 @@ pub enum Error {
     #[snafu(display("Metadata in proto struct is not found.\nBacktrace\n:{}", 
backtrace))]
     MetaDataNotFound { backtrace: Backtrace },
 
-    #[snafu(display("Empty custom metadata in parquet.\nBacktrace\n:{}", 
backtrace))]
-    EmptyCustomMetaData { backtrace: Backtrace },
-
     #[snafu(display("Failed to decode custom metadata in parquet, err:{}", 
source))]
     DecodeCustomMetaData { source: encoding::Error },
 
@@ -81,12 +78,6 @@ pub enum Error {
     #[snafu(display("Failed to convert parquet meta data, err:{}", source))]
     ConvertParquetMetaData { source: parquet::meta_data::Error },
 
-    #[snafu(display("Meet a object store error, 
err:{source}\nBacktrace:\n{backtrace}"))]
-    ObjectStoreError {
-        source: object_store::ObjectStoreError,
-        backtrace: Backtrace,
-    },
-
     #[snafu(display(
         "Failed to decode sst meta data, file_path:{file_path}, 
err:{source}.\nBacktrace:\n{backtrace:?}",
     ))]
diff --git a/analytic_engine/src/sst/parquet/encoding.rs 
b/analytic_engine/src/sst/parquet/encoding.rs
index 9dc4a132..92f94b4c 100644
--- a/analytic_engine/src/sst/parquet/encoding.rs
+++ b/analytic_engine/src/sst/parquet/encoding.rs
@@ -238,7 +238,7 @@ struct ColumnarRecordEncoder<W> {
     arrow_schema: ArrowSchemaRef,
 }
 
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, PartialEq, Eq)]
 pub struct ColumnEncoding {
     pub enable_dict: bool,
 }
diff --git a/analytic_engine/src/sst/parquet/writer.rs 
b/analytic_engine/src/sst/parquet/writer.rs
index b2c78745..ef233f70 100644
--- a/analytic_engine/src/sst/parquet/writer.rs
+++ b/analytic_engine/src/sst/parquet/writer.rs
@@ -90,7 +90,7 @@ struct RecordBatchGroupWriter<'a> {
     request_id: RequestId,
     input: RecordBatchStream,
     meta_data: &'a MetaData,
-    options: &'a WriteOptions,
+    options: WriteOptions,
 
     // inner status
     input_exhausted: bool,
@@ -101,12 +101,13 @@ struct RecordBatchGroupWriter<'a> {
     column_values: Option<Vec<Option<ColumnValueSet>>>,
 }
 
-#[derive(Debug, Clone)]
+#[derive(Clone, Debug)]
 pub struct WriteOptions {
     pub num_rows_per_row_group: usize,
     pub max_buffer_size: usize,
     pub compression: Compression,
     pub sst_level: Level,
+    pub column_encodings: HashMap<String, ColumnEncoding>,
 }
 
 impl WriteOptions {
@@ -121,7 +122,7 @@ impl<'a> RecordBatchGroupWriter<'a> {
         request_id: RequestId,
         input: RecordBatchStream,
         meta_data: &'a MetaData,
-        options: &'a WriteOptions,
+        options: WriteOptions,
     ) -> Self {
         // No need to build complex index for the min-level sst so there is no 
need to
         // collect the column values.
@@ -217,12 +218,14 @@ impl<'a> RecordBatchGroupWriter<'a> {
     fn build_column_encodings(
         &self,
         sample_row_groups: &[RecordBatchWithKey],
-    ) -> Result<HashMap<String, ColumnEncoding>> {
-        let sampler = ColumnEncodingSampler {
+        column_encodings: &mut HashMap<String, ColumnEncoding>,
+    ) -> Result<()> {
+        let mut sampler = ColumnEncodingSampler {
             sample_row_groups,
             meta_data: self.meta_data,
             min_num_sample_rows: MIN_NUM_ROWS_SAMPLE_DICT_ENCODING,
             max_unique_value_ratio: MAX_UNIQUE_VALUE_RATIO_DICT_ENCODING,
+            column_encodings,
         };
         sampler.sample()
     }
@@ -321,8 +324,10 @@ impl<'a> RecordBatchGroupWriter<'a> {
         let mut arrow_row_group = Vec::new();
         let mut total_num_rows = 0;
 
+        // Build the parquet encoder.
         let mut row_group = self.fetch_next_row_group(&mut 
prev_record_batch).await?;
-        let column_encodings = self.build_column_encodings(&row_group)?;
+        let mut column_encodings = std::mem::take(&mut 
self.options.column_encodings);
+        self.build_column_encodings(&row_group, &mut column_encodings)?;
         let encode_options = EncodeOptions {
             num_rows_per_row_group: self.options.num_rows_per_row_group,
             max_buffer_size: self.options.max_buffer_size,
@@ -333,6 +338,7 @@ impl<'a> RecordBatchGroupWriter<'a> {
             ParquetEncoder::try_new(sink, &self.meta_data.schema, 
&encode_options)
                 .box_err()
                 .context(EncodeRecordBatch)?;
+
         let mut parquet_filter = self
             .options
             .need_custom_filter()
@@ -476,7 +482,14 @@ impl<'a> SstWriter for ParquetSstWriter<'a> {
             request_id, meta, self.options.num_rows_per_row_group
         );
 
-        let group_writer = RecordBatchGroupWriter::new(request_id, input, 
meta, &self.options);
+        let write_options = WriteOptions {
+            num_rows_per_row_group: self.options.num_rows_per_row_group,
+            max_buffer_size: self.options.max_buffer_size,
+            compression: self.options.compression,
+            sst_level: self.options.sst_level,
+            column_encodings: std::mem::take(&mut 
self.options.column_encodings),
+        };
+        let group_writer = RecordBatchGroupWriter::new(request_id, input, 
meta, write_options);
 
         let (aborter, sink) =
             ObjectStoreMultiUploadAborter::initialize_upload(self.store, 
self.path).await?;
@@ -522,33 +535,34 @@ struct ColumnEncodingSampler<'a> {
     meta_data: &'a MetaData,
     min_num_sample_rows: usize,
     max_unique_value_ratio: f64,
+    column_encodings: &'a mut HashMap<String, ColumnEncoding>,
 }
 
 impl<'a> ColumnEncodingSampler<'a> {
-    fn sample(&self) -> Result<HashMap<String, ColumnEncoding>> {
+    fn sample(&mut self) -> Result<()> {
         let num_total_rows: usize = self.sample_row_groups.iter().map(|v| 
v.num_rows()).sum();
-        if num_total_rows < self.min_num_sample_rows {
-            return Ok(HashMap::new());
+        let ignore_sampling = num_total_rows < self.min_num_sample_rows;
+        if ignore_sampling {
+            self.decide_column_encodings_by_data_type();
+            return Ok(());
         }
 
         assert!(self.max_unique_value_ratio <= 1.0 && 
self.max_unique_value_ratio >= 0.0);
         let max_unique_values = (num_total_rows as f64 * 
self.max_unique_value_ratio) as usize;
         let mut column_hashes = HashSet::with_capacity(max_unique_values);
-        let mut column_encodings = 
HashMap::with_capacity(self.meta_data.schema.num_columns());
         for (col_idx, col_schema) in 
self.meta_data.schema.columns().iter().enumerate() {
-            // Only do dictionary encoding for string or bytes column.
-            let allowed_dict_type = matches!(
-                col_schema.data_type,
-                DatumKind::String | DatumKind::Varbinary
-            );
-            if !allowed_dict_type {
-                column_encodings.insert(
+            if !Self::is_dictionary_type(col_schema.data_type) {
+                self.column_encodings.insert(
                     col_schema.name.clone(),
                     ColumnEncoding { enable_dict: false },
                 );
                 continue;
             }
 
+            if self.column_encodings.contains_key(&col_schema.name) {
+                continue;
+            }
+
             for row_group in self.sample_row_groups {
                 let col_block = &row_group.columns()[col_idx];
                 for idx in 0..row_group.num_rows() {
@@ -567,10 +581,28 @@ impl<'a> ColumnEncodingSampler<'a> {
             // small.
             let enable_dict = column_hashes.len() < max_unique_values;
             column_hashes.clear();
-            column_encodings.insert(col_schema.name.clone(), ColumnEncoding { 
enable_dict });
+            self.column_encodings
+                .insert(col_schema.name.clone(), ColumnEncoding { enable_dict 
});
+        }
+
+        Ok(())
+    }
+
+    fn decide_column_encodings_by_data_type(&mut self) {
+        for col_schema in self.meta_data.schema.columns().iter() {
+            if !Self::is_dictionary_type(col_schema.data_type) {
+                self.column_encodings.insert(
+                    col_schema.name.clone(),
+                    ColumnEncoding { enable_dict: false },
+                );
+            }
         }
+    }
 
-        Ok(column_encodings)
+    #[inline]
+    fn is_dictionary_type(data_type: DatumKind) -> bool {
+        // Only do dictionary encoding for string or bytes column.
+        matches!(data_type, DatumKind::String | DatumKind::Varbinary)
     }
 }
 
@@ -630,6 +662,7 @@ mod tests {
                 num_rows_per_row_group,
                 compression: table_options::Compression::Uncompressed,
                 max_buffer_size: 0,
+                column_stats: Default::default(),
             };
 
             let dir = tempdir().unwrap();
@@ -867,6 +900,7 @@ mod tests {
             max_buffer_size: 0,
             compression: Compression::UNCOMPRESSED,
             sst_level: Level::default(),
+            column_encodings: Default::default(),
         };
         let meta_data = MetaData {
             min_key: Default::default(),
@@ -879,7 +913,7 @@ mod tests {
             RequestId::next_id(),
             record_batch_stream,
             &meta_data,
-            &write_options,
+            write_options,
         );
 
         let mut prev_record_batch = None;
@@ -895,21 +929,16 @@ mod tests {
     }
 
     fn check_sample_column_encoding(
-        sampler: ColumnEncodingSampler<'_>,
-        expect_enable_dicts: Option<Vec<bool>>,
+        mut sampler: ColumnEncodingSampler<'_>,
+        expect_enable_dicts: Vec<Option<bool>>,
     ) {
-        let column_encodings = sampler.sample().unwrap();
-        if expect_enable_dicts.is_none() {
-            assert!(column_encodings.is_empty());
-            return;
-        }
-
-        let expect_enable_dicts = expect_enable_dicts.unwrap();
+        sampler.sample().unwrap();
         for (col_idx, col_schema) in 
sampler.meta_data.schema.columns().iter().enumerate() {
-            let expect_enable_dict = expect_enable_dicts[col_idx];
-            let column_encoding = 
column_encodings.get(&col_schema.name).unwrap();
+            let expect_enable_dict =
+                expect_enable_dicts[col_idx].map(|v| ColumnEncoding { 
enable_dict: v });
+            let column_encoding = 
sampler.column_encodings.get(&col_schema.name).cloned();
             assert_eq!(
-                expect_enable_dict, column_encoding.enable_dict,
+                expect_enable_dict, column_encoding,
                 "column:{}",
                 col_schema.name
             );
@@ -946,33 +975,81 @@ mod tests {
         };
         let record_batches_with_key = vec![record_batch_with_key0, 
record_batch_with_key1];
 
-        // Normal case 1
+        let mut column_encodings = HashMap::new();
         let sampler = ColumnEncodingSampler {
             sample_row_groups: &record_batches_with_key,
             meta_data: &meta_data,
             min_num_sample_rows: 10,
             max_unique_value_ratio: 0.6,
+            column_encodings: &mut column_encodings,
         };
-        let expect_enable_dicts = vec![true, false, false, true, false, false];
-        check_sample_column_encoding(sampler, Some(expect_enable_dicts));
+        let expect_enable_dicts = vec![
+            Some(true),
+            Some(false),
+            Some(false),
+            Some(true),
+            Some(false),
+            Some(false),
+        ];
+        check_sample_column_encoding(sampler, expect_enable_dicts);
 
-        // Normal case 2
+        column_encodings.clear();
         let sampler = ColumnEncodingSampler {
             sample_row_groups: &record_batches_with_key,
             meta_data: &meta_data,
             min_num_sample_rows: 10,
             max_unique_value_ratio: 0.2,
+            column_encodings: &mut column_encodings,
         };
-        let expect_enable_dicts = vec![true, false, false, false, false, 
false];
-        check_sample_column_encoding(sampler, Some(expect_enable_dicts));
+        let expect_enable_dicts = vec![
+            Some(true),
+            Some(false),
+            Some(false),
+            Some(false),
+            Some(false),
+            Some(false),
+        ];
+        check_sample_column_encoding(sampler, expect_enable_dicts);
 
-        // Normal case 3
+        column_encodings.clear();
         let sampler = ColumnEncodingSampler {
             sample_row_groups: &record_batches_with_key,
             meta_data: &meta_data,
             min_num_sample_rows: 30,
             max_unique_value_ratio: 0.2,
+            column_encodings: &mut column_encodings,
         };
-        check_sample_column_encoding(sampler, None);
+        let expect_enable_dicts = vec![
+            None,
+            Some(false),
+            Some(false),
+            None,
+            Some(false),
+            Some(false),
+        ];
+        check_sample_column_encoding(sampler, expect_enable_dicts);
+
+        column_encodings.clear();
+        // `field1` is double type, it will still be changed to false even if 
it is set
+        // as true.
+        // `field2` is string type, it will be kept as the pre-set.
+        column_encodings.insert("field1".to_string(), ColumnEncoding { 
enable_dict: true });
+        column_encodings.insert("field2".to_string(), ColumnEncoding { 
enable_dict: true });
+        let sampler = ColumnEncodingSampler {
+            sample_row_groups: &record_batches_with_key,
+            meta_data: &meta_data,
+            min_num_sample_rows: 10,
+            max_unique_value_ratio: 0.2,
+            column_encodings: &mut column_encodings,
+        };
+        let expect_enable_dicts = vec![
+            Some(true),
+            Some(false),
+            Some(false),
+            Some(true),
+            Some(false),
+            Some(false),
+        ];
+        check_sample_column_encoding(sampler, expect_enable_dicts);
     }
 }
diff --git a/benchmarks/src/sst_tools.rs b/benchmarks/src/sst_tools.rs
index b9d41cbe..62653d3c 100644
--- a/benchmarks/src/sst_tools.rs
+++ b/benchmarks/src/sst_tools.rs
@@ -66,6 +66,7 @@ async fn create_sst_from_stream(config: SstConfig, 
record_batch_stream: RecordBa
         num_rows_per_row_group: config.num_rows_per_row_group,
         compression: config.compression,
         max_buffer_size: 1024 * 1024 * 10,
+        column_stats: Default::default(),
     };
 
     info!(
diff --git a/tools/src/bin/sst-convert.rs b/tools/src/bin/sst-convert.rs
index 7944a62a..0021a142 100644
--- a/tools/src/bin/sst-convert.rs
+++ b/tools/src/bin/sst-convert.rs
@@ -122,6 +122,7 @@ async fn run(args: Args, runtime: Arc<Runtime>) -> 
Result<()> {
         compression: Compression::parse_from(&args.compression)
             .with_context(|| format!("invalid compression:{}", 
args.compression))?,
         max_buffer_size: 10 * 1024 * 1024,
+        column_stats: Default::default(),
     };
     let output = Path::from(args.output);
     let mut writer = factory


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to