baojinri commented on code in PR #1595: URL: https://github.com/apache/horaedb/pull/1595#discussion_r1849858214
########## horaedb/metric_engine/src/read.rs: ########## @@ -53,3 +79,254 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { Ok(Box::new(reader)) } } + +/// Execution plan for merge RecordBatch values, like Merge Operator in RocksDB. +/// +/// Input record batches are sorted by the primary key columns and seq +/// column. +#[derive(Debug)] +pub(crate) struct MergeExec { + /// Input plan + input: Arc<dyn ExecutionPlan>, + /// (0..num_primary_keys) are primary key columns + num_primary_keys: usize, + /// Sequence column index + seq_idx: usize, + // (idx, merge_op) + value_idx: usize, + value_op: Arc<dyn AggregateUDFImpl>, +} + +impl MergeExec { + pub fn new( + input: Arc<dyn ExecutionPlan>, + num_primary_keys: usize, + seq_idx: usize, + value_idx: usize, + value_op: Arc<dyn AggregateUDFImpl>, + ) -> Self { + Self { + input, + num_primary_keys, + seq_idx, + value_idx, + value_op, + } + } +} +impl DisplayAs for MergeExec { + fn fmt_as( + &self, + _t: datafusion::physical_plan::DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!( + f, + "MergeExec: [primary_keys: {}, seq_idx: {}]", + self.num_primary_keys, self.seq_idx + )?; + Ok(()) + } +} + +impl ExecutionPlan for MergeExec { + fn name(&self) -> &str { + "MergeExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + self.input.properties() + } + + fn required_input_distribution(&self) -> Vec<Distribution> { + vec![Distribution::SinglePartition; self.children().len()] + } + + fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { + vec![&self.input] + } + + fn maintains_input_order(&self) -> Vec<bool> { + vec![true; self.children().len()] + } + + fn with_new_children( + self: Arc<Self>, + children: Vec<Arc<dyn ExecutionPlan>>, + ) -> DfResult<Arc<dyn ExecutionPlan>> { + Ok(Arc::new(MergeExec::new( + Arc::clone(&children[0]), + self.num_primary_keys, + self.seq_idx, + self.value_idx, + self.value_op.clone(), + ))) + } + + fn execute( + &self, + partition: usize, + context: Arc<TaskContext>, + ) -> DfResult<SendableRecordBatchStream> { + if 0 != partition { + return internal_err!("MergeExec invalid partition {partition}"); + } + + Ok(Box::pin(MergeStream::new( + self.input.execute(partition, context)?, + self.num_primary_keys, + self.seq_idx, + self.value_idx, + self.value_op.clone(), + ))) + } +} + +struct MergeStream { + stream: SendableRecordBatchStream, + num_primary_keys: usize, + seq_idx: usize, + value_idx: usize, + value_op: Arc<dyn AggregateUDFImpl>, + + pending_batch: Option<RecordBatch>, + arrow_schema: SchemaRef, +} + +impl MergeStream { + fn new( + stream: SendableRecordBatchStream, + num_primary_keys: usize, + seq_idx: usize, + value_idx: usize, + value_op: Arc<dyn AggregateUDFImpl>, + ) -> Self { + let fields = stream + .schema() + .fields() + .into_iter() + .filter_map(|f| { + if f.name() == SEQ_COLUMN_NAME { + None + } else { + Some(f.clone()) + } + }) + .collect_vec(); Review Comment: may we use `Fields::remove` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@horaedb.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@horaedb.apache.org For additional commands, e-mail: commits-h...@horaedb.apache.org