This is an automated email from the ASF dual-hosted git repository. xikai pushed a commit to branch memtable-poc in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git
commit fed1d5938d125d375369769f1bb2b6819a719b53 Merge: 52e3859d 3f5d8f45 Author: xikai.wxk <[email protected]> AuthorDate: Fri Dec 22 16:12:56 2023 +0800 Merge branch 'dev' into memtable-poc Cargo.lock | 1 + analytic_engine/Cargo.toml | 1 + analytic_engine/src/compaction/scheduler.rs | 1 + analytic_engine/src/instance/engine.rs | 8 +- analytic_engine/src/instance/flush_compaction.rs | 70 +++- analytic_engine/src/instance/open.rs | 4 +- analytic_engine/src/instance/serial_executor.rs | 3 +- analytic_engine/src/sst/factory.rs | 35 +- analytic_engine/src/sst/meta_data/cache.rs | 2 +- .../src/sst/meta_data/metadata_reader.rs | 5 +- analytic_engine/src/sst/meta_data/mod.rs | 11 +- analytic_engine/src/sst/parquet/async_reader.rs | 34 +- analytic_engine/src/sst/parquet/encoding.rs | 54 +-- analytic_engine/src/sst/parquet/meta_data.rs | 10 +- .../src/sst/parquet/row_group_pruner.rs | 3 +- analytic_engine/src/sst/parquet/writer.rs | 402 ++++++++++++++++----- benchmarks/src/sst_tools.rs | 1 + components/codec/src/columnar/timestamp.rs | 6 +- components/id_allocator/src/lib.rs | 2 +- server/src/grpc/remote_engine_service/mod.rs | 2 +- src/wal/src/rocksdb_impl/manager.rs | 2 +- table_engine/src/partition/rule/random.rs | 7 +- tools/src/bin/sst-convert.rs | 1 + 23 files changed, 489 insertions(+), 176 deletions(-) diff --cc analytic_engine/src/instance/flush_compaction.rs index 5b7ec2e8,7094bb35..54eb8ac0 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@@ -14,11 -14,16 +14,16 @@@ // Flush and compaction logic of instance - use std::{cmp, collections::Bound, fmt, sync::Arc}; + use std::{ + cmp, + collections::{Bound, HashMap}, + fmt, + sync::Arc, + }; use common_types::{ - projected_schema::ProjectedSchema, - record_batch::{RecordBatchWithKey, RecordBatchWithKeyBuilder}, + projected_schema::{ProjectedSchema, RecordFetchingContextBuilder}, + record_batch::{FetchingRecordBatch, FetchingRecordBatchBuilder}, request_id::RequestId, row::RowViewOnBatch, time::TimeRange, @@@ -1005,9 -992,7 +1012,9 @@@ impl SpaceStore row_iter::record_batch_with_key_iter_to_stream(merge_iter) }; + // TODO: eliminate the duplicated building of `SstReadOptions`. + let sst_read_options = sst_read_options_builder.build(record_fetching_ctx_builder); - let sst_meta = { + let (sst_meta, column_stats) = { let meta_reader = SstMetaReader { space_id: table_data.space_id, table_id: table_data.id, @@@ -1126,18 -1120,49 +1142,54 @@@ } } + /// Collect the column stats from a batch of sst meta data. + fn collect_column_stats_from_meta_datas(metas: &[SstMetaData]) -> HashMap<String, ColumnStats> { + let mut low_cardinality_counts: HashMap<String, usize> = HashMap::new(); + for meta_data in metas { + let SstMetaData::Parquet(meta_data) = meta_data; + if let Some(column_values) = &meta_data.column_values { + for (col_idx, val_set) in column_values.iter().enumerate() { + let low_cardinality = val_set.is_some(); + if low_cardinality { + let col_name = meta_data.schema.column(col_idx).name.clone(); + low_cardinality_counts + .entry(col_name) + .and_modify(|v| *v += 1) + .or_insert(1); + } + } + } + } + + // Only the column whose cardinality is low in all the metas is a + // low-cardinality column. + // TODO: shall we merge all the distinct values of the column to check whether + // the cardinality is still thought to be low? + let low_cardinality_cols = low_cardinality_counts + .into_iter() + .filter_map(|(col_name, cnt)| { + (cnt == metas.len()).then_some(( + col_name, + ColumnStats { + low_cardinality: true, + }, + )) + }); + HashMap::from_iter(low_cardinality_cols) + } + fn split_record_batch_with_time_ranges( - record_batch: RecordBatchWithKey, + record_batch: FetchingRecordBatch, time_ranges: &[TimeRange], timestamp_idx: usize, -) -> Result<Vec<RecordBatchWithKey>> { - let mut builders: Vec<RecordBatchWithKeyBuilder> = (0..time_ranges.len()) - .map(|_| RecordBatchWithKeyBuilder::new(record_batch.schema_with_key().clone())) +) -> Result<Vec<FetchingRecordBatch>> { + let fetching_schema = record_batch.schema(); + let primary_key_indexes = record_batch.primary_key_indexes(); + let mut builders: Vec<FetchingRecordBatchBuilder> = (0..time_ranges.len()) + .map(|_| { + let primary_key_indexes = primary_key_indexes.map(|idxs| idxs.to_vec()); + FetchingRecordBatchBuilder::new(fetching_schema.clone(), primary_key_indexes) + }) .collect(); for row_idx in 0..record_batch.num_rows() { diff --cc analytic_engine/src/sst/factory.rs index 7748169c,8d507c6e..2017bc50 --- a/analytic_engine/src/sst/factory.rs +++ b/analytic_engine/src/sst/factory.rs @@@ -14,10 -14,10 +14,10 @@@ //! Factory for different kinds sst writer and reader. - use std::{fmt::Debug, sync::Arc}; + use std::{collections::HashMap, fmt::Debug, sync::Arc}; use async_trait::async_trait; -use common_types::projected_schema::ProjectedSchema; +use common_types::projected_schema::RecordFetchingContextBuilder; use macros::define_result; use object_store::{ObjectStoreRef, Path}; use runtime::Runtime; diff --cc analytic_engine/src/sst/parquet/async_reader.rs index 9b136d1a,68794918..b969639a --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@@ -139,11 -143,10 +143,11 @@@ impl<'a> Reader<'a> predicate: options.predicate.clone(), frequency: options.frequency, meta_data: None, - row_projector: None, + record_fetching_ctx_builder: options.record_fetching_ctx_builder.clone(), + record_fetching_ctx: None, metrics, df_plan_metrics, - table_level_sst_metrics: options.maybe_table_level_metrics.clone(), + table_level_sst_metrics, } } diff --cc analytic_engine/src/sst/parquet/writer.rs index 90c4984a,ef233f70..d44dffc1 --- a/analytic_engine/src/sst/parquet/writer.rs +++ b/analytic_engine/src/sst/parquet/writer.rs @@@ -29,19 -29,20 +29,21 @@@ use parquet::data_type::AsBytes use snafu::{OptionExt, ResultExt}; use tokio::io::{AsyncWrite, AsyncWriteExt}; - use super::meta_data::{ColumnValueSet, RowGroupFilter}; use crate::{ sst::{ - factory::{ObjectStorePickerRef, SstWriteOptions}, + factory::ObjectStorePickerRef, file::Level, parquet::{ - encoding::{encode_sst_meta_data, ParquetEncoder}, - meta_data::{ParquetFilter, ParquetMetaData, RowGroupFilterBuilder}, + encoding::{encode_sst_meta_data, ColumnEncoding, EncodeOptions, ParquetEncoder}, + meta_data::{ + ColumnValueSet, ParquetFilter, ParquetMetaData, RowGroupFilter, + RowGroupFilterBuilder, + }, }, writer::{ - self, BuildParquetFilter, EncodePbData, EncodeRecordBatch, ExpectTimestampColumn, Io, - MetaData, PollRecordBatch, RecordBatchStream, Result, SstInfo, SstWriter, Storage, + self, BuildParquetFilter, BuildParquetFilterNoCause, EncodePbData, EncodeRecordBatch, + ExpectTimestampColumn, Io, MetaData, PollRecordBatch, RecordBatchStream, Result, + SstInfo, SstWriter, Storage, }, }, table::sst_util, @@@ -155,11 -160,11 +161,11 @@@ impl<'a> RecordBatchGroupWriter<'a> /// the left rows. async fn fetch_next_row_group( &mut self, - prev_record_batch: &mut Option<RecordBatchWithKey>, - ) -> Result<Vec<RecordBatchWithKey>> { + prev_record_batch: &mut Option<FetchingRecordBatch>, + ) -> Result<Vec<FetchingRecordBatch>> { let mut curr_row_group = vec![]; // Used to record the number of remaining rows to fill `curr_row_group`. - let mut remaining = self.num_rows_per_row_group; + let mut remaining = self.options.num_rows_per_row_group; // Keep filling `curr_row_group` until `remaining` is zero. while remaining > 0 { @@@ -210,6 -215,21 +216,21 @@@ Ok(curr_row_group) } + fn build_column_encodings( + &self, - sample_row_groups: &[RecordBatchWithKey], ++ sample_row_groups: &[FetchingRecordBatch], + column_encodings: &mut HashMap<String, ColumnEncoding>, + ) -> Result<()> { + let mut sampler = ColumnEncodingSampler { + sample_row_groups, + meta_data: self.meta_data, + min_num_sample_rows: MIN_NUM_ROWS_SAMPLE_DICT_ENCODING, + max_unique_value_ratio: MAX_UNIQUE_VALUE_RATIO_DICT_ENCODING, + column_encodings, + }; + sampler.sample() + } + /// Build the parquet filter for the given `row_group`. fn build_row_group_filter( &self, @@@ -237,13 -251,9 +258,9 @@@ builder.build().box_err().context(BuildParquetFilter) } - fn need_custom_filter(&self) -> bool { - !self.level.is_min() - } - fn update_column_values( column_values: &mut [Option<ColumnValueSet>], - record_batch: &RecordBatchWithKey, + record_batch: &FetchingRecordBatch, ) { for (col_idx, col_values) in column_values.iter_mut().enumerate() { let mut too_many_values = false; @@@ -518,6 -528,84 +535,84 @@@ impl<'a> SstWriter for ParquetSstWriter } } + /// A sampler to decide the column encoding options (whether to do dictionary + /// encoding) with a bunch of sample row groups. + struct ColumnEncodingSampler<'a> { - sample_row_groups: &'a [RecordBatchWithKey], ++ sample_row_groups: &'a [FetchingRecordBatch], + meta_data: &'a MetaData, + min_num_sample_rows: usize, + max_unique_value_ratio: f64, + column_encodings: &'a mut HashMap<String, ColumnEncoding>, + } + + impl<'a> ColumnEncodingSampler<'a> { + fn sample(&mut self) -> Result<()> { + let num_total_rows: usize = self.sample_row_groups.iter().map(|v| v.num_rows()).sum(); + let ignore_sampling = num_total_rows < self.min_num_sample_rows; + if ignore_sampling { + self.decide_column_encodings_by_data_type(); + return Ok(()); + } + + assert!(self.max_unique_value_ratio <= 1.0 && self.max_unique_value_ratio >= 0.0); + let max_unique_values = (num_total_rows as f64 * self.max_unique_value_ratio) as usize; + let mut column_hashes = HashSet::with_capacity(max_unique_values); + for (col_idx, col_schema) in self.meta_data.schema.columns().iter().enumerate() { + if !Self::is_dictionary_type(col_schema.data_type) { + self.column_encodings.insert( + col_schema.name.clone(), + ColumnEncoding { enable_dict: false }, + ); + continue; + } + + if self.column_encodings.contains_key(&col_schema.name) { + continue; + } + + for row_group in self.sample_row_groups { + let col_block = &row_group.columns()[col_idx]; + for idx in 0..row_group.num_rows() { + if column_hashes.len() >= max_unique_values { + break; + } + let datum_view = col_block.datum_view(idx); + datum_view.do_with_bytes(|val| { + let hash = hash_ext::hash64(val); + column_hashes.insert(hash); + }) + } + } + + // The dictionary encoding make senses only if the number of unique values is + // small. + let enable_dict = column_hashes.len() < max_unique_values; + column_hashes.clear(); + self.column_encodings + .insert(col_schema.name.clone(), ColumnEncoding { enable_dict }); + } + + Ok(()) + } + + fn decide_column_encodings_by_data_type(&mut self) { + for col_schema in self.meta_data.schema.columns().iter() { + if !Self::is_dictionary_type(col_schema.data_type) { + self.column_encodings.insert( + col_schema.name.clone(), + ColumnEncoding { enable_dict: false }, + ); + } + } + } + + #[inline] + fn is_dictionary_type(data_type: DatumKind) -> bool { + // Only do dictionary encoding for string or bytes column. + matches!(data_type, DatumKind::String | DatumKind::Varbinary) + } + } + #[cfg(test)] mod tests { diff --cc server/src/grpc/remote_engine_service/mod.rs index 7894f396,03bd7fb1..1f4914c1 --- a/server/src/grpc/remote_engine_service/mod.rs +++ b/server/src/grpc/remote_engine_service/mod.rs @@@ -73,11 -71,9 +72,12 @@@ use crate:: metrics::{ REMOTE_ENGINE_GRPC_HANDLER_COUNTER_VEC, REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC, + REMOTE_ENGINE_WRITE_BATCH_NUM_ROWS_HISTOGRAM, }, - remote_engine_service::error::{ErrNoCause, ErrWithCause, Result, StatusCode}, + remote_engine_service::{ + error::{ErrNoCause, ErrWithCause, Result, StatusCode}, + metrics::REMOTE_ENGINE_QUERY_COUNTER, + }, }, }; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
