This is an automated email from the ASF dual-hosted git repository.

xikai pushed a commit to branch memtable-poc
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git

commit fed1d5938d125d375369769f1bb2b6819a719b53
Merge: 52e3859d 3f5d8f45
Author: xikai.wxk <[email protected]>
AuthorDate: Fri Dec 22 16:12:56 2023 +0800

    Merge branch 'dev' into memtable-poc

 Cargo.lock                                         |   1 +
 analytic_engine/Cargo.toml                         |   1 +
 analytic_engine/src/compaction/scheduler.rs        |   1 +
 analytic_engine/src/instance/engine.rs             |   8 +-
 analytic_engine/src/instance/flush_compaction.rs   |  70 +++-
 analytic_engine/src/instance/open.rs               |   4 +-
 analytic_engine/src/instance/serial_executor.rs    |   3 +-
 analytic_engine/src/sst/factory.rs                 |  35 +-
 analytic_engine/src/sst/meta_data/cache.rs         |   2 +-
 .../src/sst/meta_data/metadata_reader.rs           |   5 +-
 analytic_engine/src/sst/meta_data/mod.rs           |  11 +-
 analytic_engine/src/sst/parquet/async_reader.rs    |  34 +-
 analytic_engine/src/sst/parquet/encoding.rs        |  54 +--
 analytic_engine/src/sst/parquet/meta_data.rs       |  10 +-
 .../src/sst/parquet/row_group_pruner.rs            |   3 +-
 analytic_engine/src/sst/parquet/writer.rs          | 402 ++++++++++++++++-----
 benchmarks/src/sst_tools.rs                        |   1 +
 components/codec/src/columnar/timestamp.rs         |   6 +-
 components/id_allocator/src/lib.rs                 |   2 +-
 server/src/grpc/remote_engine_service/mod.rs       |   2 +-
 src/wal/src/rocksdb_impl/manager.rs                |   2 +-
 table_engine/src/partition/rule/random.rs          |   7 +-
 tools/src/bin/sst-convert.rs                       |   1 +
 23 files changed, 489 insertions(+), 176 deletions(-)

diff --cc analytic_engine/src/instance/flush_compaction.rs
index 5b7ec2e8,7094bb35..54eb8ac0
--- a/analytic_engine/src/instance/flush_compaction.rs
+++ b/analytic_engine/src/instance/flush_compaction.rs
@@@ -14,11 -14,16 +14,16 @@@
  
  // Flush and compaction logic of instance
  
- use std::{cmp, collections::Bound, fmt, sync::Arc};
+ use std::{
+     cmp,
+     collections::{Bound, HashMap},
+     fmt,
+     sync::Arc,
+ };
  
  use common_types::{
 -    projected_schema::ProjectedSchema,
 -    record_batch::{RecordBatchWithKey, RecordBatchWithKeyBuilder},
 +    projected_schema::{ProjectedSchema, RecordFetchingContextBuilder},
 +    record_batch::{FetchingRecordBatch, FetchingRecordBatchBuilder},
      request_id::RequestId,
      row::RowViewOnBatch,
      time::TimeRange,
@@@ -1005,9 -992,7 +1012,9 @@@ impl SpaceStore 
              row_iter::record_batch_with_key_iter_to_stream(merge_iter)
          };
  
 +        // TODO: eliminate the duplicated building of `SstReadOptions`.
 +        let sst_read_options = 
sst_read_options_builder.build(record_fetching_ctx_builder);
-         let sst_meta = {
+         let (sst_meta, column_stats) = {
              let meta_reader = SstMetaReader {
                  space_id: table_data.space_id,
                  table_id: table_data.id,
@@@ -1126,18 -1120,49 +1142,54 @@@
      }
  }
  
+ /// Collect the column stats from a batch of sst meta data.
+ fn collect_column_stats_from_meta_datas(metas: &[SstMetaData]) -> 
HashMap<String, ColumnStats> {
+     let mut low_cardinality_counts: HashMap<String, usize> = HashMap::new();
+     for meta_data in metas {
+         let SstMetaData::Parquet(meta_data) = meta_data;
+         if let Some(column_values) = &meta_data.column_values {
+             for (col_idx, val_set) in column_values.iter().enumerate() {
+                 let low_cardinality = val_set.is_some();
+                 if low_cardinality {
+                     let col_name = 
meta_data.schema.column(col_idx).name.clone();
+                     low_cardinality_counts
+                         .entry(col_name)
+                         .and_modify(|v| *v += 1)
+                         .or_insert(1);
+                 }
+             }
+         }
+     }
+ 
+     // Only the column whose cardinality is low in all the metas is a
+     // low-cardinality column.
+     // TODO: shall we merge all the distinct values of the column to check 
whether
+     // the cardinality is still thought to be low?
+     let low_cardinality_cols = low_cardinality_counts
+         .into_iter()
+         .filter_map(|(col_name, cnt)| {
+             (cnt == metas.len()).then_some((
+                 col_name,
+                 ColumnStats {
+                     low_cardinality: true,
+                 },
+             ))
+         });
+     HashMap::from_iter(low_cardinality_cols)
+ }
+ 
  fn split_record_batch_with_time_ranges(
 -    record_batch: RecordBatchWithKey,
 +    record_batch: FetchingRecordBatch,
      time_ranges: &[TimeRange],
      timestamp_idx: usize,
 -) -> Result<Vec<RecordBatchWithKey>> {
 -    let mut builders: Vec<RecordBatchWithKeyBuilder> = (0..time_ranges.len())
 -        .map(|_| 
RecordBatchWithKeyBuilder::new(record_batch.schema_with_key().clone()))
 +) -> Result<Vec<FetchingRecordBatch>> {
 +    let fetching_schema = record_batch.schema();
 +    let primary_key_indexes = record_batch.primary_key_indexes();
 +    let mut builders: Vec<FetchingRecordBatchBuilder> = (0..time_ranges.len())
 +        .map(|_| {
 +            let primary_key_indexes = primary_key_indexes.map(|idxs| 
idxs.to_vec());
 +            FetchingRecordBatchBuilder::new(fetching_schema.clone(), 
primary_key_indexes)
 +        })
          .collect();
  
      for row_idx in 0..record_batch.num_rows() {
diff --cc analytic_engine/src/sst/factory.rs
index 7748169c,8d507c6e..2017bc50
--- a/analytic_engine/src/sst/factory.rs
+++ b/analytic_engine/src/sst/factory.rs
@@@ -14,10 -14,10 +14,10 @@@
  
  //! Factory for different kinds sst writer and reader.
  
- use std::{fmt::Debug, sync::Arc};
+ use std::{collections::HashMap, fmt::Debug, sync::Arc};
  
  use async_trait::async_trait;
 -use common_types::projected_schema::ProjectedSchema;
 +use common_types::projected_schema::RecordFetchingContextBuilder;
  use macros::define_result;
  use object_store::{ObjectStoreRef, Path};
  use runtime::Runtime;
diff --cc analytic_engine/src/sst/parquet/async_reader.rs
index 9b136d1a,68794918..b969639a
--- a/analytic_engine/src/sst/parquet/async_reader.rs
+++ b/analytic_engine/src/sst/parquet/async_reader.rs
@@@ -139,11 -143,10 +143,11 @@@ impl<'a> Reader<'a> 
              predicate: options.predicate.clone(),
              frequency: options.frequency,
              meta_data: None,
 -            row_projector: None,
 +            record_fetching_ctx_builder: 
options.record_fetching_ctx_builder.clone(),
 +            record_fetching_ctx: None,
              metrics,
              df_plan_metrics,
-             table_level_sst_metrics: 
options.maybe_table_level_metrics.clone(),
+             table_level_sst_metrics,
          }
      }
  
diff --cc analytic_engine/src/sst/parquet/writer.rs
index 90c4984a,ef233f70..d44dffc1
--- a/analytic_engine/src/sst/parquet/writer.rs
+++ b/analytic_engine/src/sst/parquet/writer.rs
@@@ -29,19 -29,20 +29,21 @@@ use parquet::data_type::AsBytes
  use snafu::{OptionExt, ResultExt};
  use tokio::io::{AsyncWrite, AsyncWriteExt};
  
- use super::meta_data::{ColumnValueSet, RowGroupFilter};
  use crate::{
      sst::{
-         factory::{ObjectStorePickerRef, SstWriteOptions},
+         factory::ObjectStorePickerRef,
          file::Level,
          parquet::{
-             encoding::{encode_sst_meta_data, ParquetEncoder},
-             meta_data::{ParquetFilter, ParquetMetaData, 
RowGroupFilterBuilder},
+             encoding::{encode_sst_meta_data, ColumnEncoding, EncodeOptions, 
ParquetEncoder},
+             meta_data::{
+                 ColumnValueSet, ParquetFilter, ParquetMetaData, 
RowGroupFilter,
+                 RowGroupFilterBuilder,
+             },
          },
          writer::{
 -            self, BuildParquetFilter, EncodePbData, EncodeRecordBatch, 
ExpectTimestampColumn, Io,
 -            MetaData, PollRecordBatch, RecordBatchStream, Result, SstInfo, 
SstWriter, Storage,
 +            self, BuildParquetFilter, BuildParquetFilterNoCause, 
EncodePbData, EncodeRecordBatch,
 +            ExpectTimestampColumn, Io, MetaData, PollRecordBatch, 
RecordBatchStream, Result,
 +            SstInfo, SstWriter, Storage,
          },
      },
      table::sst_util,
@@@ -155,11 -160,11 +161,11 @@@ impl<'a> RecordBatchGroupWriter<'a> 
      /// the left rows.
      async fn fetch_next_row_group(
          &mut self,
 -        prev_record_batch: &mut Option<RecordBatchWithKey>,
 -    ) -> Result<Vec<RecordBatchWithKey>> {
 +        prev_record_batch: &mut Option<FetchingRecordBatch>,
 +    ) -> Result<Vec<FetchingRecordBatch>> {
          let mut curr_row_group = vec![];
          // Used to record the number of remaining rows to fill 
`curr_row_group`.
-         let mut remaining = self.num_rows_per_row_group;
+         let mut remaining = self.options.num_rows_per_row_group;
  
          // Keep filling `curr_row_group` until `remaining` is zero.
          while remaining > 0 {
@@@ -210,6 -215,21 +216,21 @@@
          Ok(curr_row_group)
      }
  
+     fn build_column_encodings(
+         &self,
 -        sample_row_groups: &[RecordBatchWithKey],
++        sample_row_groups: &[FetchingRecordBatch],
+         column_encodings: &mut HashMap<String, ColumnEncoding>,
+     ) -> Result<()> {
+         let mut sampler = ColumnEncodingSampler {
+             sample_row_groups,
+             meta_data: self.meta_data,
+             min_num_sample_rows: MIN_NUM_ROWS_SAMPLE_DICT_ENCODING,
+             max_unique_value_ratio: MAX_UNIQUE_VALUE_RATIO_DICT_ENCODING,
+             column_encodings,
+         };
+         sampler.sample()
+     }
+ 
      /// Build the parquet filter for the given `row_group`.
      fn build_row_group_filter(
          &self,
@@@ -237,13 -251,9 +258,9 @@@
          builder.build().box_err().context(BuildParquetFilter)
      }
  
-     fn need_custom_filter(&self) -> bool {
-         !self.level.is_min()
-     }
- 
      fn update_column_values(
          column_values: &mut [Option<ColumnValueSet>],
 -        record_batch: &RecordBatchWithKey,
 +        record_batch: &FetchingRecordBatch,
      ) {
          for (col_idx, col_values) in column_values.iter_mut().enumerate() {
              let mut too_many_values = false;
@@@ -518,6 -528,84 +535,84 @@@ impl<'a> SstWriter for ParquetSstWriter
      }
  }
  
+ /// A sampler to decide the column encoding options (whether to do dictionary
+ /// encoding) with a bunch of sample row groups.
+ struct ColumnEncodingSampler<'a> {
 -    sample_row_groups: &'a [RecordBatchWithKey],
++    sample_row_groups: &'a [FetchingRecordBatch],
+     meta_data: &'a MetaData,
+     min_num_sample_rows: usize,
+     max_unique_value_ratio: f64,
+     column_encodings: &'a mut HashMap<String, ColumnEncoding>,
+ }
+ 
+ impl<'a> ColumnEncodingSampler<'a> {
+     fn sample(&mut self) -> Result<()> {
+         let num_total_rows: usize = self.sample_row_groups.iter().map(|v| 
v.num_rows()).sum();
+         let ignore_sampling = num_total_rows < self.min_num_sample_rows;
+         if ignore_sampling {
+             self.decide_column_encodings_by_data_type();
+             return Ok(());
+         }
+ 
+         assert!(self.max_unique_value_ratio <= 1.0 && 
self.max_unique_value_ratio >= 0.0);
+         let max_unique_values = (num_total_rows as f64 * 
self.max_unique_value_ratio) as usize;
+         let mut column_hashes = HashSet::with_capacity(max_unique_values);
+         for (col_idx, col_schema) in 
self.meta_data.schema.columns().iter().enumerate() {
+             if !Self::is_dictionary_type(col_schema.data_type) {
+                 self.column_encodings.insert(
+                     col_schema.name.clone(),
+                     ColumnEncoding { enable_dict: false },
+                 );
+                 continue;
+             }
+ 
+             if self.column_encodings.contains_key(&col_schema.name) {
+                 continue;
+             }
+ 
+             for row_group in self.sample_row_groups {
+                 let col_block = &row_group.columns()[col_idx];
+                 for idx in 0..row_group.num_rows() {
+                     if column_hashes.len() >= max_unique_values {
+                         break;
+                     }
+                     let datum_view = col_block.datum_view(idx);
+                     datum_view.do_with_bytes(|val| {
+                         let hash = hash_ext::hash64(val);
+                         column_hashes.insert(hash);
+                     })
+                 }
+             }
+ 
+             // The dictionary encoding make senses only if the number of 
unique values is
+             // small.
+             let enable_dict = column_hashes.len() < max_unique_values;
+             column_hashes.clear();
+             self.column_encodings
+                 .insert(col_schema.name.clone(), ColumnEncoding { enable_dict 
});
+         }
+ 
+         Ok(())
+     }
+ 
+     fn decide_column_encodings_by_data_type(&mut self) {
+         for col_schema in self.meta_data.schema.columns().iter() {
+             if !Self::is_dictionary_type(col_schema.data_type) {
+                 self.column_encodings.insert(
+                     col_schema.name.clone(),
+                     ColumnEncoding { enable_dict: false },
+                 );
+             }
+         }
+     }
+ 
+     #[inline]
+     fn is_dictionary_type(data_type: DatumKind) -> bool {
+         // Only do dictionary encoding for string or bytes column.
+         matches!(data_type, DatumKind::String | DatumKind::Varbinary)
+     }
+ }
+ 
  #[cfg(test)]
  mod tests {
  
diff --cc server/src/grpc/remote_engine_service/mod.rs
index 7894f396,03bd7fb1..1f4914c1
--- a/server/src/grpc/remote_engine_service/mod.rs
+++ b/server/src/grpc/remote_engine_service/mod.rs
@@@ -73,11 -71,9 +72,12 @@@ use crate::
          metrics::{
              REMOTE_ENGINE_GRPC_HANDLER_COUNTER_VEC,
              REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC,
+             REMOTE_ENGINE_WRITE_BATCH_NUM_ROWS_HISTOGRAM,
          },
 -        remote_engine_service::error::{ErrNoCause, ErrWithCause, Result, 
StatusCode},
 +        remote_engine_service::{
 +            error::{ErrNoCause, ErrWithCause, Result, StatusCode},
 +            metrics::REMOTE_ENGINE_QUERY_COUNTER,
 +        },
      },
  };
  


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to