This is an automated email from the ASF dual-hosted git repository.
tanruixiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git
The following commit(s) were added to refs/heads/dev by this push:
new 9619810a feat: utilize the column cardinality for deciding whether to
do dict (#1372)
9619810a is described below
commit 9619810aa558ae79ba474144dfaef591c5d9ef55
Author: WEI Xikai <[email protected]>
AuthorDate: Wed Dec 20 16:05:40 2023 +0800
feat: utilize the column cardinality for deciding whether to do dict (#1372)
## Rationale
The column value set has been summarized into the meta data if the
column is low distinct. With such information, the sampling for the
columns can be skipped.
## Detailed Changes
Skip sampling over the low-cardinality columns.
## Test Plan
Updated the unit tests.
---
analytic_engine/src/compaction/scheduler.rs | 1 +
analytic_engine/src/instance/flush_compaction.rs | 146 +++++++++++++++++++--
analytic_engine/src/sst/factory.rs | 21 ++-
analytic_engine/src/sst/meta_data/mod.rs | 11 +-
analytic_engine/src/sst/parquet/encoding.rs | 2 +-
analytic_engine/src/sst/parquet/writer.rs | 159 +++++++++++++++++------
benchmarks/src/sst_tools.rs | 1 +
tools/src/bin/sst-convert.rs | 1 +
8 files changed, 281 insertions(+), 61 deletions(-)
diff --git a/analytic_engine/src/compaction/scheduler.rs
b/analytic_engine/src/compaction/scheduler.rs
index 1496ef51..72fe2f8b 100644
--- a/analytic_engine/src/compaction/scheduler.rs
+++ b/analytic_engine/src/compaction/scheduler.rs
@@ -505,6 +505,7 @@ impl ScheduleWorker {
num_rows_per_row_group:
table_data.table_options().num_rows_per_row_group,
compression: table_data.table_options().compression,
max_buffer_size: self.write_sst_max_buffer_size,
+ column_stats: Default::default(),
};
let scan_options = self.scan_options.clone();
diff --git a/analytic_engine/src/instance/flush_compaction.rs
b/analytic_engine/src/instance/flush_compaction.rs
index 81ac53d7..cbede944 100644
--- a/analytic_engine/src/instance/flush_compaction.rs
+++ b/analytic_engine/src/instance/flush_compaction.rs
@@ -14,7 +14,12 @@
// Flush and compaction logic of instance
-use std::{cmp, collections::Bound, fmt, sync::Arc};
+use std::{
+ cmp,
+ collections::{Bound, HashMap},
+ fmt,
+ sync::Arc,
+};
use common_types::{
projected_schema::ProjectedSchema,
@@ -55,9 +60,9 @@ use crate::{
IterOptions,
},
sst::{
- factory::{self, ScanOptions, SstWriteOptions},
+ factory::{self, ColumnStats, ScanOptions, SstWriteOptions},
file::{FileMeta, Level},
- meta_data::SstMetaReader,
+ meta_data::{SstMetaData, SstMetaReader},
writer::{MetaData, RecordBatchStream},
},
table::{
@@ -541,6 +546,7 @@ impl FlushTask {
num_rows_per_row_group:
self.table_data.table_options().num_rows_per_row_group,
compression: self.table_data.table_options().compression,
max_buffer_size: self.write_sst_max_buffer_size,
+ column_stats: Default::default(),
};
for time_range in &time_ranges {
@@ -712,6 +718,7 @@ impl FlushTask {
num_rows_per_row_group:
self.table_data.table_options().num_rows_per_row_group,
compression: self.table_data.table_options().compression,
max_buffer_size: self.write_sst_max_buffer_size,
+ column_stats: Default::default(),
};
let mut writer = self
.space_store
@@ -943,7 +950,7 @@ impl SpaceStore {
row_iter::record_batch_with_key_iter_to_stream(merge_iter)
};
- let sst_meta = {
+ let (sst_meta, column_stats) = {
let meta_reader = SstMetaReader {
space_id: table_data.space_id,
table_id: table_data.id,
@@ -956,7 +963,9 @@ impl SpaceStore {
.await
.context(ReadSstMeta)?;
- MetaData::merge(sst_metas.into_iter().map(MetaData::from), schema)
+ let column_stats =
collect_column_stats_from_meta_datas(&sst_metas);
+ let merged_meta =
MetaData::merge(sst_metas.into_iter().map(MetaData::from), schema);
+ (merged_meta, column_stats)
};
// Alloc file id for the merged sst.
@@ -966,10 +975,17 @@ impl SpaceStore {
.context(AllocFileId)?;
let sst_file_path = table_data.set_sst_file_path(file_id);
+ let write_options = SstWriteOptions {
+ storage_format_hint: sst_write_options.storage_format_hint,
+ num_rows_per_row_group: sst_write_options.num_rows_per_row_group,
+ compression: sst_write_options.compression,
+ max_buffer_size: sst_write_options.max_buffer_size,
+ column_stats,
+ };
let mut sst_writer = self
.sst_factory
.create_writer(
- sst_write_options,
+ &write_options,
&sst_file_path,
self.store_picker(),
input.output_level,
@@ -1062,6 +1078,42 @@ impl SpaceStore {
}
}
+/// Collect the column stats from a batch of sst meta data.
+fn collect_column_stats_from_meta_datas(metas: &[SstMetaData]) ->
HashMap<String, ColumnStats> {
+ let mut low_cardinality_counts: HashMap<String, usize> = HashMap::new();
+ for meta_data in metas {
+ let SstMetaData::Parquet(meta_data) = meta_data;
+ if let Some(column_values) = &meta_data.column_values {
+ for (col_idx, val_set) in column_values.iter().enumerate() {
+ let low_cardinality = val_set.is_some();
+ if low_cardinality {
+ let col_name =
meta_data.schema.column(col_idx).name.clone();
+ low_cardinality_counts
+ .entry(col_name)
+ .and_modify(|v| *v += 1)
+ .or_insert(1);
+ }
+ }
+ }
+ }
+
+ // Only the column whose cardinality is low in all the metas is a
+ // low-cardinality column.
+ // TODO: shall we merge all the distinct values of the column to check
whether
+ // the cardinality is still thought to be low?
+ let low_cardinality_cols = low_cardinality_counts
+ .into_iter()
+ .filter_map(|(col_name, cnt)| {
+ (cnt == metas.len()).then_some((
+ col_name,
+ ColumnStats {
+ low_cardinality: true,
+ },
+ ))
+ });
+ HashMap::from_iter(low_cardinality_cols)
+}
+
fn split_record_batch_with_time_ranges(
record_batch: RecordBatchWithKey,
time_ranges: &[TimeRange],
@@ -1126,15 +1178,26 @@ fn build_mem_table_iter(
#[cfg(test)]
mod tests {
+ use std::sync::Arc;
+
+ use bytes_ext::Bytes;
use common_types::{
+ schema::Schema,
tests::{
- build_record_batch_with_key_by_rows, build_row, build_row_opt,
+ build_record_batch_with_key_by_rows, build_row, build_row_opt,
build_schema,
check_record_batch_with_key_with_rows,
},
time::TimeRange,
};
- use crate::instance::flush_compaction::split_record_batch_with_time_ranges;
+ use super::collect_column_stats_from_meta_datas;
+ use crate::{
+ instance::flush_compaction::split_record_batch_with_time_ranges,
+ sst::{
+ meta_data::SstMetaData,
+ parquet::meta_data::{ColumnValueSet, ParquetMetaData},
+ },
+ };
#[test]
fn test_split_record_batch_with_time_ranges() {
@@ -1187,4 +1250,71 @@ mod tests {
check_record_batch_with_key_with_rows(&rets[1], rows1.len(),
column_num, rows1);
check_record_batch_with_key_with_rows(&rets[2], rows2.len(),
column_num, rows2);
}
+
+ fn check_collect_column_stats(
+ schema: &Schema,
+ expected_low_cardinality_col_indexes: Vec<usize>,
+ meta_datas: Vec<SstMetaData>,
+ ) {
+ let column_stats = collect_column_stats_from_meta_datas(&meta_datas);
+ assert_eq!(
+ column_stats.len(),
+ expected_low_cardinality_col_indexes.len()
+ );
+
+ for col_idx in expected_low_cardinality_col_indexes {
+ let col_schema = schema.column(col_idx);
+ assert!(column_stats.contains_key(&col_schema.name));
+ }
+ }
+
+ #[test]
+ fn test_collect_column_stats_from_metadata() {
+ let schema = build_schema();
+ let build_meta_data = |low_cardinality_col_indexes: Vec<usize>| {
+ let mut column_values = vec![None; 6];
+ for idx in low_cardinality_col_indexes {
+ column_values[idx] =
Some(ColumnValueSet::StringValue(Default::default()));
+ }
+ let parquet_meta_data = ParquetMetaData {
+ min_key: Bytes::new(),
+ max_key: Bytes::new(),
+ time_range: TimeRange::empty(),
+ max_sequence: 0,
+ schema: schema.clone(),
+ parquet_filter: None,
+ column_values: Some(column_values),
+ };
+ SstMetaData::Parquet(Arc::new(parquet_meta_data))
+ };
+
+ // Normal case 0
+ let meta_datas = vec![
+ build_meta_data(vec![0]),
+ build_meta_data(vec![0]),
+ build_meta_data(vec![0, 1]),
+ build_meta_data(vec![0, 2]),
+ build_meta_data(vec![0, 3]),
+ ];
+ check_collect_column_stats(&schema, vec![0], meta_datas);
+
+ // Normal case 1
+ let meta_datas = vec![
+ build_meta_data(vec![0]),
+ build_meta_data(vec![0]),
+ build_meta_data(vec![]),
+ build_meta_data(vec![1]),
+ build_meta_data(vec![3]),
+ ];
+ check_collect_column_stats(&schema, vec![], meta_datas);
+
+ // Normal case 2
+ let meta_datas = vec![
+ build_meta_data(vec![3, 5]),
+ build_meta_data(vec![0, 3, 5]),
+ build_meta_data(vec![0, 1, 2, 3, 5]),
+ build_meta_data(vec![1, 3, 5]),
+ ];
+ check_collect_column_stats(&schema, vec![3, 5], meta_datas);
+ }
}
diff --git a/analytic_engine/src/sst/factory.rs
b/analytic_engine/src/sst/factory.rs
index 8a585d86..8d507c6e 100644
--- a/analytic_engine/src/sst/factory.rs
+++ b/analytic_engine/src/sst/factory.rs
@@ -14,7 +14,7 @@
//! Factory for different kinds sst writer and reader.
-use std::{fmt::Debug, sync::Arc};
+use std::{collections::HashMap, fmt::Debug, sync::Arc};
use async_trait::async_trait;
use common_types::projected_schema::ProjectedSchema;
@@ -25,6 +25,7 @@ use snafu::{ResultExt, Snafu};
use table_engine::predicate::PredicateRef;
use trace_metric::MetricsCollector;
+use super::parquet::encoding::ColumnEncoding;
use crate::{
sst::{
file::Level,
@@ -146,6 +147,10 @@ pub struct SstReadOptions {
pub runtime: Arc<Runtime>,
}
+#[derive(Clone, Debug)]
+pub struct ColumnStats {
+ pub low_cardinality: bool,
+}
#[derive(Debug, Clone)]
pub struct SstWriteOptions {
@@ -153,6 +158,15 @@ pub struct SstWriteOptions {
pub num_rows_per_row_group: usize,
pub compression: Compression,
pub max_buffer_size: usize,
+ pub column_stats: HashMap<String, ColumnStats>,
+}
+
+impl From<&ColumnStats> for ColumnEncoding {
+ fn from(value: &ColumnStats) -> Self {
+ ColumnEncoding {
+ enable_dict: value.low_cardinality,
+ }
+ }
}
#[derive(Debug, Default)]
@@ -203,11 +217,16 @@ impl Factory for FactoryImpl {
store_picker: &'a ObjectStorePickerRef,
level: Level,
) -> Result<Box<dyn SstWriter + Send + 'a>> {
+ let column_encodings =
+ HashMap::from_iter(options.column_stats.iter().map(|(col_name,
col_stats)| {
+ (col_name.to_owned(), ColumnEncoding::from(col_stats))
+ }));
let write_options = WriteOptions {
num_rows_per_row_group: options.num_rows_per_row_group,
max_buffer_size: options.max_buffer_size,
compression: options.compression.into(),
sst_level: level,
+ column_encodings,
};
Ok(Box::new(ParquetSstWriter::new(
path,
diff --git a/analytic_engine/src/sst/meta_data/mod.rs
b/analytic_engine/src/sst/meta_data/mod.rs
index 7e80afb4..e053ba4f 100644
--- a/analytic_engine/src/sst/meta_data/mod.rs
+++ b/analytic_engine/src/sst/meta_data/mod.rs
@@ -57,7 +57,7 @@ pub enum Error {
#[snafu(display("Key value meta path in parquet is empty\nBacktrace\n:{}",
backtrace))]
KvMetaPathEmpty { backtrace: Backtrace },
- #[snafu(display("Unknown mata version, value:{}.\nBacktrace\n:{}",
version, backtrace))]
+ #[snafu(display("Unknown meta version, value:{}.\nBacktrace\n:{}",
version, backtrace))]
UnknownMetaVersion {
version: String,
backtrace: Backtrace,
@@ -66,9 +66,6 @@ pub enum Error {
#[snafu(display("Metadata in proto struct is not found.\nBacktrace\n:{}",
backtrace))]
MetaDataNotFound { backtrace: Backtrace },
- #[snafu(display("Empty custom metadata in parquet.\nBacktrace\n:{}",
backtrace))]
- EmptyCustomMetaData { backtrace: Backtrace },
-
#[snafu(display("Failed to decode custom metadata in parquet, err:{}",
source))]
DecodeCustomMetaData { source: encoding::Error },
@@ -81,12 +78,6 @@ pub enum Error {
#[snafu(display("Failed to convert parquet meta data, err:{}", source))]
ConvertParquetMetaData { source: parquet::meta_data::Error },
- #[snafu(display("Meet a object store error,
err:{source}\nBacktrace:\n{backtrace}"))]
- ObjectStoreError {
- source: object_store::ObjectStoreError,
- backtrace: Backtrace,
- },
-
#[snafu(display(
"Failed to decode sst meta data, file_path:{file_path},
err:{source}.\nBacktrace:\n{backtrace:?}",
))]
diff --git a/analytic_engine/src/sst/parquet/encoding.rs
b/analytic_engine/src/sst/parquet/encoding.rs
index 9dc4a132..92f94b4c 100644
--- a/analytic_engine/src/sst/parquet/encoding.rs
+++ b/analytic_engine/src/sst/parquet/encoding.rs
@@ -238,7 +238,7 @@ struct ColumnarRecordEncoder<W> {
arrow_schema: ArrowSchemaRef,
}
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ColumnEncoding {
pub enable_dict: bool,
}
diff --git a/analytic_engine/src/sst/parquet/writer.rs
b/analytic_engine/src/sst/parquet/writer.rs
index b2c78745..ef233f70 100644
--- a/analytic_engine/src/sst/parquet/writer.rs
+++ b/analytic_engine/src/sst/parquet/writer.rs
@@ -90,7 +90,7 @@ struct RecordBatchGroupWriter<'a> {
request_id: RequestId,
input: RecordBatchStream,
meta_data: &'a MetaData,
- options: &'a WriteOptions,
+ options: WriteOptions,
// inner status
input_exhausted: bool,
@@ -101,12 +101,13 @@ struct RecordBatchGroupWriter<'a> {
column_values: Option<Vec<Option<ColumnValueSet>>>,
}
-#[derive(Debug, Clone)]
+#[derive(Clone, Debug)]
pub struct WriteOptions {
pub num_rows_per_row_group: usize,
pub max_buffer_size: usize,
pub compression: Compression,
pub sst_level: Level,
+ pub column_encodings: HashMap<String, ColumnEncoding>,
}
impl WriteOptions {
@@ -121,7 +122,7 @@ impl<'a> RecordBatchGroupWriter<'a> {
request_id: RequestId,
input: RecordBatchStream,
meta_data: &'a MetaData,
- options: &'a WriteOptions,
+ options: WriteOptions,
) -> Self {
// No need to build complex index for the min-level sst so there is no
need to
// collect the column values.
@@ -217,12 +218,14 @@ impl<'a> RecordBatchGroupWriter<'a> {
fn build_column_encodings(
&self,
sample_row_groups: &[RecordBatchWithKey],
- ) -> Result<HashMap<String, ColumnEncoding>> {
- let sampler = ColumnEncodingSampler {
+ column_encodings: &mut HashMap<String, ColumnEncoding>,
+ ) -> Result<()> {
+ let mut sampler = ColumnEncodingSampler {
sample_row_groups,
meta_data: self.meta_data,
min_num_sample_rows: MIN_NUM_ROWS_SAMPLE_DICT_ENCODING,
max_unique_value_ratio: MAX_UNIQUE_VALUE_RATIO_DICT_ENCODING,
+ column_encodings,
};
sampler.sample()
}
@@ -321,8 +324,10 @@ impl<'a> RecordBatchGroupWriter<'a> {
let mut arrow_row_group = Vec::new();
let mut total_num_rows = 0;
+ // Build the parquet encoder.
let mut row_group = self.fetch_next_row_group(&mut
prev_record_batch).await?;
- let column_encodings = self.build_column_encodings(&row_group)?;
+ let mut column_encodings = std::mem::take(&mut
self.options.column_encodings);
+ self.build_column_encodings(&row_group, &mut column_encodings)?;
let encode_options = EncodeOptions {
num_rows_per_row_group: self.options.num_rows_per_row_group,
max_buffer_size: self.options.max_buffer_size,
@@ -333,6 +338,7 @@ impl<'a> RecordBatchGroupWriter<'a> {
ParquetEncoder::try_new(sink, &self.meta_data.schema,
&encode_options)
.box_err()
.context(EncodeRecordBatch)?;
+
let mut parquet_filter = self
.options
.need_custom_filter()
@@ -476,7 +482,14 @@ impl<'a> SstWriter for ParquetSstWriter<'a> {
request_id, meta, self.options.num_rows_per_row_group
);
- let group_writer = RecordBatchGroupWriter::new(request_id, input,
meta, &self.options);
+ let write_options = WriteOptions {
+ num_rows_per_row_group: self.options.num_rows_per_row_group,
+ max_buffer_size: self.options.max_buffer_size,
+ compression: self.options.compression,
+ sst_level: self.options.sst_level,
+ column_encodings: std::mem::take(&mut
self.options.column_encodings),
+ };
+ let group_writer = RecordBatchGroupWriter::new(request_id, input,
meta, write_options);
let (aborter, sink) =
ObjectStoreMultiUploadAborter::initialize_upload(self.store,
self.path).await?;
@@ -522,33 +535,34 @@ struct ColumnEncodingSampler<'a> {
meta_data: &'a MetaData,
min_num_sample_rows: usize,
max_unique_value_ratio: f64,
+ column_encodings: &'a mut HashMap<String, ColumnEncoding>,
}
impl<'a> ColumnEncodingSampler<'a> {
- fn sample(&self) -> Result<HashMap<String, ColumnEncoding>> {
+ fn sample(&mut self) -> Result<()> {
let num_total_rows: usize = self.sample_row_groups.iter().map(|v|
v.num_rows()).sum();
- if num_total_rows < self.min_num_sample_rows {
- return Ok(HashMap::new());
+ let ignore_sampling = num_total_rows < self.min_num_sample_rows;
+ if ignore_sampling {
+ self.decide_column_encodings_by_data_type();
+ return Ok(());
}
assert!(self.max_unique_value_ratio <= 1.0 &&
self.max_unique_value_ratio >= 0.0);
let max_unique_values = (num_total_rows as f64 *
self.max_unique_value_ratio) as usize;
let mut column_hashes = HashSet::with_capacity(max_unique_values);
- let mut column_encodings =
HashMap::with_capacity(self.meta_data.schema.num_columns());
for (col_idx, col_schema) in
self.meta_data.schema.columns().iter().enumerate() {
- // Only do dictionary encoding for string or bytes column.
- let allowed_dict_type = matches!(
- col_schema.data_type,
- DatumKind::String | DatumKind::Varbinary
- );
- if !allowed_dict_type {
- column_encodings.insert(
+ if !Self::is_dictionary_type(col_schema.data_type) {
+ self.column_encodings.insert(
col_schema.name.clone(),
ColumnEncoding { enable_dict: false },
);
continue;
}
+ if self.column_encodings.contains_key(&col_schema.name) {
+ continue;
+ }
+
for row_group in self.sample_row_groups {
let col_block = &row_group.columns()[col_idx];
for idx in 0..row_group.num_rows() {
@@ -567,10 +581,28 @@ impl<'a> ColumnEncodingSampler<'a> {
// small.
let enable_dict = column_hashes.len() < max_unique_values;
column_hashes.clear();
- column_encodings.insert(col_schema.name.clone(), ColumnEncoding {
enable_dict });
+ self.column_encodings
+ .insert(col_schema.name.clone(), ColumnEncoding { enable_dict
});
+ }
+
+ Ok(())
+ }
+
+ fn decide_column_encodings_by_data_type(&mut self) {
+ for col_schema in self.meta_data.schema.columns().iter() {
+ if !Self::is_dictionary_type(col_schema.data_type) {
+ self.column_encodings.insert(
+ col_schema.name.clone(),
+ ColumnEncoding { enable_dict: false },
+ );
+ }
}
+ }
- Ok(column_encodings)
+ #[inline]
+ fn is_dictionary_type(data_type: DatumKind) -> bool {
+ // Only do dictionary encoding for string or bytes column.
+ matches!(data_type, DatumKind::String | DatumKind::Varbinary)
}
}
@@ -630,6 +662,7 @@ mod tests {
num_rows_per_row_group,
compression: table_options::Compression::Uncompressed,
max_buffer_size: 0,
+ column_stats: Default::default(),
};
let dir = tempdir().unwrap();
@@ -867,6 +900,7 @@ mod tests {
max_buffer_size: 0,
compression: Compression::UNCOMPRESSED,
sst_level: Level::default(),
+ column_encodings: Default::default(),
};
let meta_data = MetaData {
min_key: Default::default(),
@@ -879,7 +913,7 @@ mod tests {
RequestId::next_id(),
record_batch_stream,
&meta_data,
- &write_options,
+ write_options,
);
let mut prev_record_batch = None;
@@ -895,21 +929,16 @@ mod tests {
}
fn check_sample_column_encoding(
- sampler: ColumnEncodingSampler<'_>,
- expect_enable_dicts: Option<Vec<bool>>,
+ mut sampler: ColumnEncodingSampler<'_>,
+ expect_enable_dicts: Vec<Option<bool>>,
) {
- let column_encodings = sampler.sample().unwrap();
- if expect_enable_dicts.is_none() {
- assert!(column_encodings.is_empty());
- return;
- }
-
- let expect_enable_dicts = expect_enable_dicts.unwrap();
+ sampler.sample().unwrap();
for (col_idx, col_schema) in
sampler.meta_data.schema.columns().iter().enumerate() {
- let expect_enable_dict = expect_enable_dicts[col_idx];
- let column_encoding =
column_encodings.get(&col_schema.name).unwrap();
+ let expect_enable_dict =
+ expect_enable_dicts[col_idx].map(|v| ColumnEncoding {
enable_dict: v });
+ let column_encoding =
sampler.column_encodings.get(&col_schema.name).cloned();
assert_eq!(
- expect_enable_dict, column_encoding.enable_dict,
+ expect_enable_dict, column_encoding,
"column:{}",
col_schema.name
);
@@ -946,33 +975,81 @@ mod tests {
};
let record_batches_with_key = vec![record_batch_with_key0,
record_batch_with_key1];
- // Normal case 1
+ let mut column_encodings = HashMap::new();
let sampler = ColumnEncodingSampler {
sample_row_groups: &record_batches_with_key,
meta_data: &meta_data,
min_num_sample_rows: 10,
max_unique_value_ratio: 0.6,
+ column_encodings: &mut column_encodings,
};
- let expect_enable_dicts = vec![true, false, false, true, false, false];
- check_sample_column_encoding(sampler, Some(expect_enable_dicts));
+ let expect_enable_dicts = vec![
+ Some(true),
+ Some(false),
+ Some(false),
+ Some(true),
+ Some(false),
+ Some(false),
+ ];
+ check_sample_column_encoding(sampler, expect_enable_dicts);
- // Normal case 2
+ column_encodings.clear();
let sampler = ColumnEncodingSampler {
sample_row_groups: &record_batches_with_key,
meta_data: &meta_data,
min_num_sample_rows: 10,
max_unique_value_ratio: 0.2,
+ column_encodings: &mut column_encodings,
};
- let expect_enable_dicts = vec![true, false, false, false, false,
false];
- check_sample_column_encoding(sampler, Some(expect_enable_dicts));
+ let expect_enable_dicts = vec![
+ Some(true),
+ Some(false),
+ Some(false),
+ Some(false),
+ Some(false),
+ Some(false),
+ ];
+ check_sample_column_encoding(sampler, expect_enable_dicts);
- // Normal case 3
+ column_encodings.clear();
let sampler = ColumnEncodingSampler {
sample_row_groups: &record_batches_with_key,
meta_data: &meta_data,
min_num_sample_rows: 30,
max_unique_value_ratio: 0.2,
+ column_encodings: &mut column_encodings,
};
- check_sample_column_encoding(sampler, None);
+ let expect_enable_dicts = vec![
+ None,
+ Some(false),
+ Some(false),
+ None,
+ Some(false),
+ Some(false),
+ ];
+ check_sample_column_encoding(sampler, expect_enable_dicts);
+
+ column_encodings.clear();
+ // `field1` is double type, it will still be changed to false even if
it is set
+ // as true.
+ // `field2` is string type, it will be kept as the pre-set.
+ column_encodings.insert("field1".to_string(), ColumnEncoding {
enable_dict: true });
+ column_encodings.insert("field2".to_string(), ColumnEncoding {
enable_dict: true });
+ let sampler = ColumnEncodingSampler {
+ sample_row_groups: &record_batches_with_key,
+ meta_data: &meta_data,
+ min_num_sample_rows: 10,
+ max_unique_value_ratio: 0.2,
+ column_encodings: &mut column_encodings,
+ };
+ let expect_enable_dicts = vec![
+ Some(true),
+ Some(false),
+ Some(false),
+ Some(true),
+ Some(false),
+ Some(false),
+ ];
+ check_sample_column_encoding(sampler, expect_enable_dicts);
}
}
diff --git a/benchmarks/src/sst_tools.rs b/benchmarks/src/sst_tools.rs
index b9d41cbe..62653d3c 100644
--- a/benchmarks/src/sst_tools.rs
+++ b/benchmarks/src/sst_tools.rs
@@ -66,6 +66,7 @@ async fn create_sst_from_stream(config: SstConfig,
record_batch_stream: RecordBa
num_rows_per_row_group: config.num_rows_per_row_group,
compression: config.compression,
max_buffer_size: 1024 * 1024 * 10,
+ column_stats: Default::default(),
};
info!(
diff --git a/tools/src/bin/sst-convert.rs b/tools/src/bin/sst-convert.rs
index 7944a62a..0021a142 100644
--- a/tools/src/bin/sst-convert.rs
+++ b/tools/src/bin/sst-convert.rs
@@ -122,6 +122,7 @@ async fn run(args: Args, runtime: Arc<Runtime>) ->
Result<()> {
compression: Compression::parse_from(&args.compression)
.with_context(|| format!("invalid compression:{}",
args.compression))?,
max_buffer_size: 10 * 1024 * 1024,
+ column_stats: Default::default(),
};
let output = Path::from(args.output);
let mut writer = factory
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]