This is an automated email from the ASF dual-hosted git repository.

baojinri pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new 9f2d0bc8 feat: support merge operator (#1605)
9f2d0bc8 is described below

commit 9f2d0bc8c9d829641f7af552f684e9eb85222d65
Author: Jiacai Liu <[email protected]>
AuthorDate: Fri Dec 13 10:12:26 2024 +0800

    feat: support merge operator (#1605)
    
    ## Rationale
    Close #1583
    
    For rows with same primary key, we need to choose which value to use,
    the answer is MergeOperator
    
    ## Detailed Changes
    - Add MergeOperator trait, and add two implementations.
    
    ## Test Plan
    CI
---
 horaedb/metric_engine/Cargo.toml       |   2 +-
 horaedb/metric_engine/src/lib.rs       |   2 +
 horaedb/metric_engine/src/operator.rs  | 161 +++++++++++++++++++++++
 horaedb/metric_engine/src/read.rs      | 228 +++++++++++++++++++++++----------
 horaedb/metric_engine/src/storage.rs   |  60 +++++----
 horaedb/metric_engine/src/test_util.rs | 159 ++++++++++++++++++++++-
 horaedb/metric_engine/src/types.rs     |  14 +-
 7 files changed, 529 insertions(+), 97 deletions(-)

diff --git a/horaedb/metric_engine/Cargo.toml b/horaedb/metric_engine/Cargo.toml
index 0d42c058..bfb70d44 100644
--- a/horaedb/metric_engine/Cargo.toml
+++ b/horaedb/metric_engine/Cargo.toml
@@ -36,9 +36,9 @@ arrow = { workspace = true }
 arrow-schema = { workspace = true }
 async-scoped = { workspace = true }
 async-trait = { workspace = true }
+byteorder = { workspace = true }
 bytes = { workspace = true }
 bytesize = { workspace = true }
-byteorder = { workspace = true }
 datafusion = { workspace = true }
 futures = { workspace = true }
 itertools = { workspace = true }
diff --git a/horaedb/metric_engine/src/lib.rs b/horaedb/metric_engine/src/lib.rs
index c77cdbb3..26580cb3 100644
--- a/horaedb/metric_engine/src/lib.rs
+++ b/horaedb/metric_engine/src/lib.rs
@@ -22,9 +22,11 @@ mod compaction;
 pub mod error;
 mod macros;
 mod manifest;
+pub mod operator;
 mod read;
 mod sst;
 pub mod storage;
+#[cfg(test)]
 mod test_util;
 pub mod types;
 
diff --git a/horaedb/metric_engine/src/operator.rs 
b/horaedb/metric_engine/src/operator.rs
new file mode 100644
index 00000000..9847c80e
--- /dev/null
+++ b/horaedb/metric_engine/src/operator.rs
@@ -0,0 +1,161 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{fmt::Debug, sync::Arc};
+
+use anyhow::Context;
+use arrow::{
+    array::{Array, BinaryArray, RecordBatch},
+    buffer::OffsetBuffer,
+};
+use arrow_schema::DataType;
+use macros::ensure;
+use tracing::debug;
+
+use crate::Result;
+
+pub trait MergeOperator: Send + Sync + Debug {
+    fn merge(&self, batch: RecordBatch) -> Result<RecordBatch>;
+}
+
+pub type MergeOperatorRef = Arc<dyn MergeOperator>;
+
+#[derive(Debug)]
+pub struct LastValueOperator;
+
+impl MergeOperator for LastValueOperator {
+    fn merge(&self, batch: RecordBatch) -> Result<RecordBatch> {
+        let last_row = batch.slice(batch.num_rows() - 1, 1);
+        Ok(last_row)
+    }
+}
+
+#[derive(Debug)]
+pub struct BytesMergeOperator {
+    /// Column index of the column need to append together
+    /// The column type must be `Binary`.
+    value_idxes: Vec<usize>,
+}
+
+impl BytesMergeOperator {
+    pub fn new(value_idxes: Vec<usize>) -> Self {
+        Self { value_idxes }
+    }
+}
+
+impl MergeOperator for BytesMergeOperator {
+    fn merge(&self, batch: RecordBatch) -> Result<RecordBatch> {
+        assert!(batch.num_rows() > 0);
+
+        for idx in &self.value_idxes {
+            let data_type = batch.column(*idx).data_type();
+            ensure!(
+                data_type == &DataType::Binary,
+                "MergeOperator is only used for binary column, 
current:{data_type}"
+            );
+        }
+        debug!(batch = ?batch, "BytesMergeOperator merge");
+
+        let schema = batch.schema();
+        let columns = batch
+            .columns()
+            .iter()
+            .enumerate()
+            .map(|(idx, column)| {
+                if self.value_idxes.contains(&idx) {
+                    // For value column, we append all elements
+                    let binary_array = 
column.as_any().downcast_ref::<BinaryArray>().unwrap();
+                    if binary_array.is_empty() {
+                       return column.clone();
+                    }
+
+                    let offsets = binary_array.offsets();
+                    let start = offsets[0] as usize;
+                    let length = offsets[offsets.len()-1] as usize - start;
+                    if length == 0 {
+                       return column.clone();
+                    }
+
+                    // bytes buffer is cheap for clone.
+                    let byte_buffer = 
binary_array.values().slice_with_length(start,length). clone();
+                    debug!(byte_buffer = ?byte_buffer, offset = ?offsets, 
"BytesMergeOperator merge");
+                    let offsets = 
OffsetBuffer::from_lengths([byte_buffer.len()]);
+                    let concated_column = BinaryArray::new(offsets, 
byte_buffer, None);
+                    Arc::new(concated_column)
+                } else {
+                    // For other columns, we just take the first element since 
the primary key
+                    // columns are the same.
+                    column.slice(0, 1)
+                }
+            })
+            .collect();
+
+        let merged_batch = RecordBatch::try_new(schema, columns)
+            .context("failed to construct RecordBatch in 
BytesMergeOperator.")?;
+
+        Ok(merged_batch)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+
+    use super::*;
+    use crate::record_batch;
+
+    #[test]
+    fn test_last_value_operator() {
+        let operator = LastValueOperator;
+        let batch = record_batch!(
+            ("pk1", UInt8, vec![11, 11, 11, 11]),
+            ("pk2", UInt8, vec![100, 100, 100, 100]),
+            ("value", Int64, vec![2, 7, 4, 1])
+        )
+        .unwrap();
+
+        let actual = operator.merge(batch).unwrap();
+        let expected = record_batch!(
+            ("pk1", UInt8, vec![11]),
+            ("pk2", UInt8, vec![100]),
+            ("value", Int64, vec![1])
+        )
+        .unwrap();
+        assert_eq!(actual, expected);
+    }
+
+    #[test]
+    fn test_bytes_merge_operator() {
+        let operator = BytesMergeOperator::new(vec![2]);
+
+        let batch = record_batch!(
+            ("pk1", UInt8, vec![11, 11, 11, 11]),
+            ("pk2", UInt8, vec![100, 100, 100, 100]),
+            ("value", Binary, vec![b"one", b"two", b"three", b"four"])
+        )
+        .unwrap();
+
+        let actual = operator.merge(batch).unwrap();
+        let expected = record_batch!(
+            ("pk1", UInt8, vec![11]),
+            ("pk2", UInt8, vec![100]),
+            ("value", Binary, vec![b"onetwothreefour"])
+        )
+        .unwrap();
+
+        assert_eq!(actual, expected);
+    }
+}
diff --git a/horaedb/metric_engine/src/read.rs 
b/horaedb/metric_engine/src/read.rs
index 26564a1a..7769ff21 100644
--- a/horaedb/metric_engine/src/read.rs
+++ b/horaedb/metric_engine/src/read.rs
@@ -15,13 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::{
-    any::Any,
-    pin::Pin,
-    sync::Arc,
-    task::{Context, Poll},
-};
+use std::{any::Any, pin::Pin, sync::Arc, task::Poll};
 
+use anyhow::Context;
 use arrow::{
     array::{AsArray, RecordBatch},
     compute::concat_batches,
@@ -36,7 +32,6 @@ use datafusion::{
     datasource::physical_plan::{FileMeta, ParquetFileReaderFactory},
     error::{DataFusionError, Result as DfResult},
     execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext},
-    logical_expr::AggregateUDFImpl,
     parquet::arrow::async_reader::AsyncFileReader,
     physical_plan::{
         metrics::ExecutionPlanMetricsSet, DisplayAs, Distribution, 
ExecutionPlan, PlanProperties,
@@ -45,10 +40,13 @@ use datafusion::{
 use futures::{Stream, StreamExt};
 use itertools::Itertools;
 use parquet::arrow::async_reader::ParquetObjectReader;
+use tracing::debug;
 
 use crate::{
     compare_primitive_columns,
+    operator::MergeOperator,
     types::{ObjectStoreRef, SEQ_COLUMN_NAME},
+    Result,
 };
 
 #[derive(Debug, Clone)]
@@ -92,9 +90,8 @@ pub(crate) struct MergeExec {
     num_primary_keys: usize,
     /// Sequence column index
     seq_idx: usize,
-    // (idx, merge_op)
-    value_idx: usize,
-    value_op: Arc<dyn AggregateUDFImpl>,
+    /// Operator to merge values when primary keys are the same
+    value_operator: Arc<dyn MergeOperator>,
 }
 
 impl MergeExec {
@@ -102,15 +99,13 @@ impl MergeExec {
         input: Arc<dyn ExecutionPlan>,
         num_primary_keys: usize,
         seq_idx: usize,
-        value_idx: usize,
-        value_op: Arc<dyn AggregateUDFImpl>,
+        value_operator: Arc<dyn MergeOperator>,
     ) -> Self {
         Self {
             input,
             num_primary_keys,
             seq_idx,
-            value_idx,
-            value_op,
+            value_operator,
         }
     }
 }
@@ -162,8 +157,7 @@ impl ExecutionPlan for MergeExec {
             Arc::clone(&children[0]),
             self.num_primary_keys,
             self.seq_idx,
-            self.value_idx,
-            self.value_op.clone(),
+            self.value_operator.clone(),
         )))
     }
 
@@ -180,8 +174,7 @@ impl ExecutionPlan for MergeExec {
             self.input.execute(partition, context)?,
             self.num_primary_keys,
             self.seq_idx,
-            self.value_idx,
-            self.value_op.clone(),
+            self.value_operator.clone(),
         )))
     }
 }
@@ -190,8 +183,7 @@ struct MergeStream {
     stream: SendableRecordBatchStream,
     num_primary_keys: usize,
     seq_idx: usize,
-    value_idx: usize,
-    value_op: Arc<dyn AggregateUDFImpl>,
+    value_operator: Arc<dyn MergeOperator>,
 
     pending_batch: Option<RecordBatch>,
     arrow_schema: SchemaRef,
@@ -202,8 +194,7 @@ impl MergeStream {
         stream: SendableRecordBatchStream,
         num_primary_keys: usize,
         seq_idx: usize,
-        value_idx: usize,
-        value_op: Arc<dyn AggregateUDFImpl>,
+        value_operator: Arc<dyn MergeOperator>,
     ) -> Self {
         let fields = stream
             .schema()
@@ -225,8 +216,7 @@ impl MergeStream {
             stream,
             num_primary_keys,
             seq_idx,
-            value_idx,
-            value_op,
+            value_operator,
             pending_batch: None,
             arrow_schema,
         }
@@ -242,6 +232,7 @@ impl MergeStream {
         for k in 0..self.num_primary_keys {
             let lhs_col = lhs.column(k);
             let rhs_col = rhs.column(k);
+
             compare_primitive_columns!(
                 lhs_col, rhs_col, lhs_idx, rhs_idx, // TODO: Add more types 
here
                 UInt8Type, Int8Type, UInt32Type, Int32Type, UInt64Type, 
Int64Type
@@ -258,9 +249,15 @@ impl MergeStream {
         true
     }
 
-    // TODO: only support deduplication now, merge operation will be added 
later.
-    fn merge_batch(&mut self, batch: RecordBatch) -> DfResult<RecordBatch> {
-        let mut batches = vec![];
+    fn merge_batch(&mut self, batch: RecordBatch) -> 
Result<Option<RecordBatch>> {
+        if batch.num_rows() == 0 {
+            return Ok(None);
+        }
+
+        debug!(pending_batch = ?self.pending_batch, "Merge batch");
+
+        // Group rows with the same primary keys
+        let mut groupby_pk_batches = Vec::new();
         let mut start_idx = 0;
         while start_idx < batch.num_rows() {
             let mut end_idx = start_idx + 1;
@@ -269,58 +266,86 @@ impl MergeStream {
             {
                 end_idx += 1;
             }
-            let rows_with_same_primary_keys = batch.slice(start_idx, end_idx - 
start_idx);
-            if let Some(pending) = self.pending_batch.take() {
-                if !self.primary_key_eq(
-                    &pending,
-                    pending.num_rows() - 1,
-                    &rows_with_same_primary_keys,
-                    0,
-                ) {
-                    // only keep the last row in this batch
-                    batches.push(pending.slice(pending.num_rows() - 1, 1));
-                }
-            }
-            batches.push(
-                
rows_with_same_primary_keys.slice(rows_with_same_primary_keys.num_rows() - 1, 
1),
-            );
-
+            groupby_pk_batches.push(batch.slice(start_idx, end_idx - 
start_idx));
             start_idx = end_idx;
+            debug!(end_idx = end_idx, "Group rows with the same primary keys");
+        }
+
+        let rows_with_same_primary_keys = &groupby_pk_batches[0];
+        let mut output_batches = Vec::new();
+        if let Some(pending) = self.pending_batch.take() {
+            if self.primary_key_eq(
+                &pending,
+                pending.num_rows() - 1,
+                rows_with_same_primary_keys,
+                0,
+            ) {
+                groupby_pk_batches[0] = concat_batches(
+                    &self.stream.schema(),
+                    [&pending, rows_with_same_primary_keys],
+                )
+                .context("concat batch")?;
+            } else {
+                output_batches.push(self.value_operator.merge(pending)?);
+            }
         }
 
         // last batch may have overlapping rows with the next batch, so keep 
them in
         // pending_batch
-        self.pending_batch = batches.pop();
-
-        concat_batches(&self.stream.schema(), batches.iter())
-            .map_err(|e| DataFusionError::ArrowError(e, None))
-            .map(|mut batch| {
-                // Remove seq column
-                batch.remove_column(self.seq_idx);
-                batch
-            })
+        self.pending_batch = groupby_pk_batches.pop();
+
+        for batch in groupby_pk_batches {
+            output_batches.push(self.value_operator.merge(batch)?);
+        }
+        if output_batches.is_empty() {
+            return Ok(None);
+        }
+
+        let mut output_batches =
+            concat_batches(&self.stream.schema(), 
output_batches.iter()).context("concat batch")?;
+        // Remove seq column
+        output_batches.remove_column(self.seq_idx);
+        Ok(Some(output_batches))
     }
 }
 
 impl Stream for MergeStream {
     type Item = DfResult<RecordBatch>;
 
-    fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> 
Poll<Option<Self::Item>> {
-        match self.stream.poll_next_unpin(ctx) {
-            Poll::Pending => Poll::Pending,
-            Poll::Ready(None) => {
-                let value = if let Some(mut pending) = 
self.pending_batch.take() {
-                    pending.remove_column(self.seq_idx);
-                    Some(Ok(pending))
-                } else {
-                    None
-                };
-                Poll::Ready(value)
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        ctx: &mut std::task::Context,
+    ) -> Poll<Option<Self::Item>> {
+        loop {
+            match self.stream.poll_next_unpin(ctx) {
+                Poll::Pending => return Poll::Pending,
+                Poll::Ready(None) => {
+                    let value = if let Some(mut pending) = 
self.pending_batch.take() {
+                        pending.remove_column(self.seq_idx);
+                        let res = self
+                            .value_operator
+                            .merge(pending)
+                            .map_err(|e| 
DataFusionError::External(Box::new(e)));
+                        Some(res)
+                    } else {
+                        None
+                    };
+                    return Poll::Ready(value);
+                }
+                Poll::Ready(Some(v)) => match v {
+                    Ok(v) => match self.merge_batch(v) {
+                        Ok(v) => {
+                            if let Some(v) = v {
+                                return Poll::Ready(Some(Ok(v)));
+                            }
+                        }
+                        Err(e) => {
+                            return 
Poll::Ready(Some(Err(DataFusionError::External(Box::new(e)))))
+                        }
+                    },
+                    Err(e) => return Poll::Ready(Some(Err(e))),
+                },
             }
-            Poll::Ready(Some(v)) => Poll::Ready(Some(v.and_then(|batch| {
-                let batch = self.merge_batch(batch)?;
-                Ok(batch)
-            }))),
         }
     }
 }
@@ -330,3 +355,72 @@ impl RecordBatchStream for MergeStream {
         self.arrow_schema.clone()
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use test_log::test;
+
+    use super::*;
+    use crate::{
+        operator::{BytesMergeOperator, LastValueOperator, MergeOperatorRef},
+        record_batch,
+        test_util::{check_stream, make_sendable_record_batches},
+    };
+
+    #[test(tokio::test)]
+    async fn test_merge_stream() {
+        let expected = [
+            record_batch!(
+                ("pk1", UInt8, vec![11, 12]),
+                ("value", Binary, vec![b"2", b"4"])
+            )
+            .unwrap(),
+            record_batch!(("pk1", UInt8, vec![13]), ("value", Binary, 
vec![b"8"])).unwrap(),
+            record_batch!(("pk1", UInt8, vec![14]), ("value", Binary, 
vec![b"9"])).unwrap(),
+        ];
+
+        test_merge_stream_for_append_mode(Arc::new(LastValueOperator), 
expected).await;
+
+        let expected = [
+            record_batch!(
+                ("pk1", UInt8, vec![11, 12]),
+                ("value", Binary, vec![b"12", b"34"])
+            )
+            .unwrap(),
+            record_batch!(("pk1", UInt8, vec![13]), ("value", Binary, 
vec![b"5678"])).unwrap(),
+            record_batch!(("pk1", UInt8, vec![14]), ("value", Binary, 
vec![b"9"])).unwrap(),
+        ];
+
+        
test_merge_stream_for_append_mode(Arc::new(BytesMergeOperator::new(vec![1])), 
expected)
+            .await;
+    }
+
+    async fn test_merge_stream_for_append_mode<I>(merge_op: MergeOperatorRef, 
expected: I)
+    where
+        I: IntoIterator<Item = RecordBatch>,
+    {
+        let stream = make_sendable_record_batches([
+            record_batch!(
+                ("pk1", UInt8, vec![11, 11, 12, 12, 13]),
+                ("value", Binary, vec![b"1", b"2", b"3", b"4", b"5"]),
+                ("seq", UInt8, vec![1, 2, 3, 4, 5])
+            )
+            .unwrap(),
+            record_batch!(
+                ("pk1", UInt8, vec![13, 13]),
+                ("value", Binary, vec![b"6", b"7"]),
+                ("seq", UInt8, vec![6, 7])
+            )
+            .unwrap(),
+            record_batch!(
+                ("pk1", UInt8, vec![13, 14]),
+                ("value", Binary, vec![b"8", b"9"]),
+                ("seq", UInt8, vec![8, 9])
+            )
+            .unwrap(),
+        ]);
+
+        let stream = MergeStream::new(stream, 1, 2, merge_op);
+        check_stream(Box::pin(stream), expected).await;
+    }
+}
diff --git a/horaedb/metric_engine/src/storage.rs 
b/horaedb/metric_engine/src/storage.rs
index e30cb2a8..f680bd94 100644
--- a/horaedb/metric_engine/src/storage.rs
+++ b/horaedb/metric_engine/src/storage.rs
@@ -31,7 +31,6 @@ use datafusion::{
         physical_plan::{FileScanConfig, ParquetExec},
     },
     execution::{context::ExecutionProps, object_store::ObjectStoreUrl, 
SendableRecordBatchStream},
-    functions_aggregate::first_last::LastValue,
     logical_expr::{utils::conjunction, Expr},
     physical_expr::{create_physical_expr, LexOrdering},
     physical_plan::{
@@ -59,11 +58,12 @@ use tokio::runtime::Runtime;
 use crate::{
     compaction::{CompactionScheduler, SchedulerConfig},
     manifest::{Manifest, ManifestRef},
+    operator::{BytesMergeOperator, LastValueOperator},
     read::{DefaultParquetFileReaderFactory, MergeExec},
     sst::{allocate_id, FileMeta, SstFile, SstPathGenerator},
     types::{
-        ObjectStoreRef, RuntimeOptions, StorageOptions, TimeRange, 
WriteOptions, WriteResult,
-        SEQ_COLUMN_NAME,
+        ObjectStoreRef, RuntimeOptions, StorageOptions, TimeRange, UpdateMode, 
WriteOptions,
+        WriteResult, SEQ_COLUMN_NAME,
     },
     Result,
 };
@@ -138,6 +138,9 @@ pub struct CloudObjectStorage {
     store: ObjectStoreRef,
     arrow_schema: SchemaRef,
     num_primary_keys: usize,
+    seq_idx: usize,
+    value_idxes: Vec<usize>,
+    update_mode: UpdateMode,
     manifest: ManifestRef,
     runtimes: StorageRuntimes,
 
@@ -167,8 +170,10 @@ impl CloudObjectStorage {
         num_primary_keys: usize,
         storage_opts: StorageOptions,
     ) -> Result<Self> {
-        let runtimes = StorageRuntimes::new(storage_opts.runtime_opts)?;
+        let value_idxes = 
(num_primary_keys..arrow_schema.fields.len()).collect::<Vec<_>>();
+        ensure!(!value_idxes.is_empty(), "no value column found");
 
+        let runtimes = StorageRuntimes::new(storage_opts.runtime_opts)?;
         let manifest = Arc::new(
             Manifest::try_new(
                 path.clone(),
@@ -178,12 +183,14 @@ impl CloudObjectStorage {
             )
             .await?,
         );
+
         let mut new_fields = arrow_schema.fields.clone().to_vec();
         new_fields.push(Arc::new(Field::new(
             SEQ_COLUMN_NAME,
             DataType::UInt64,
             true,
         )));
+        let seq_idx = new_fields.len() - 1;
         let arrow_schema = Arc::new(Schema::new_with_metadata(
             new_fields,
             arrow_schema.metadata.clone(),
@@ -199,9 +206,13 @@ impl CloudObjectStorage {
             sst_path_gen.clone(),
             SchedulerConfig::default(),
         );
+        let update_mode = storage_opts.update_mode;
         Ok(Self {
             path,
             num_primary_keys,
+            seq_idx,
+            value_idxes,
+            update_mode,
             segment_duration,
             store,
             arrow_schema,
@@ -329,7 +340,7 @@ impl CloudObjectStorage {
         builder.build()
     }
 
-    async fn build_scan_plan(
+    fn build_scan_plan(
         &self,
         ssts: Vec<SstFile>,
         projections: Option<Vec<usize>>,
@@ -373,9 +384,11 @@ impl CloudObjectStorage {
         let merge_exec = MergeExec::new(
             Arc::new(sort_exec),
             self.num_primary_keys,
-            self.schema().fields.len() - 1,
-            0, // TODO: value_idx, not used now.
-            Arc::new(LastValue::new()),
+            self.seq_idx,
+            match self.update_mode {
+                UpdateMode::Overwrite => Arc::new(LastValueOperator),
+                UpdateMode::Append => 
Arc::new(BytesMergeOperator::new(self.value_idxes.clone())),
+            },
         );
         Ok(Arc::new(merge_exec))
     }
@@ -429,9 +442,8 @@ impl TimeMergeStorage for CloudObjectStorage {
 
         let mut plan_for_all_segments = Vec::new();
         for (_, ssts) in ssts_by_segment.sorted_by(|a, b| a.0.cmp(&b.0)) {
-            let plan = self
-                .build_scan_plan(ssts, req.projections.clone(), 
req.predicate.clone())
-                .await?;
+            let plan =
+                self.build_scan_plan(ssts, req.projections.clone(), 
req.predicate.clone())?;
 
             plan_for_all_segments.push(plan);
         }
@@ -455,16 +467,15 @@ impl TimeMergeStorage for CloudObjectStorage {
 
 #[cfg(test)]
 mod tests {
-    use arrow::array::{self as arrow_array};
-    use datafusion::common::record_batch;
     use object_store::local::LocalFileSystem;
+    use test_log::test;
 
     use super::*;
-    use crate::{arrow_schema, types::Timestamp};
+    use crate::{arrow_schema, record_batch, test_util::check_stream, 
types::Timestamp};
 
     #[tokio::test]
     async fn test_build_scan_plan() {
-        let schema = arrow_schema!(("pk1", UInt8));
+        let schema = arrow_schema!(("pk1", UInt8), ("value", UInt8));
         let store = Arc::new(LocalFileSystem::new());
         let storage = CloudObjectStorage::try_new(
             "mock".to_string(),
@@ -494,21 +505,20 @@ mod tests {
                 None,
                 vec![],
             )
-            .await
             .unwrap();
         let display_plan =
             
datafusion::physical_plan::display::DisplayableExecutionPlan::new(plan.as_ref())
                 .indent(true);
         assert_eq!(
-            r#"MergeExec: [primary_keys: 1, seq_idx: 1]
-  SortPreservingMergeExec: [pk1@0 ASC, __seq__@1 ASC], fetch=1024
-    ParquetExec: file_groups={3 groups: [[mock/data/100.sst], 
[mock/data/101.sst], [mock/data/102.sst]]}, projection=[pk1, __seq__], 
output_orderings=[[pk1@0 ASC, __seq__@1 ASC], [pk1@0 ASC, __seq__@1 ASC], 
[pk1@0 ASC, __seq__@1 ASC]]
+            r#"MergeExec: [primary_keys: 1, seq_idx: 2]
+  SortPreservingMergeExec: [pk1@0 ASC, __seq__@2 ASC], fetch=1024
+    ParquetExec: file_groups={3 groups: [[mock/data/100.sst], 
[mock/data/101.sst], [mock/data/102.sst]]}, projection=[pk1, value, __seq__], 
output_orderings=[[pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC], 
[pk1@0 ASC, __seq__@2 ASC]]
 "#,
             format!("{display_plan}")
         );
     }
 
-    #[tokio::test]
+    #[test(tokio::test)]
     async fn test_storage_write_and_scan() {
         let schema = arrow_schema!(("pk1", UInt8), ("pk2", UInt8), ("value", 
Int64));
         let root_dir = temp_dir::TempDir::new().unwrap();
@@ -554,7 +564,7 @@ mod tests {
             .await
             .unwrap();
 
-        let mut result_stream = storage
+        let result_stream = storage
             .scan(ScanRequest {
                 range: TimeRange::new(Timestamp(0), Timestamp::MAX),
                 predicate: vec![],
@@ -576,12 +586,8 @@ mod tests {
             )
             .unwrap(),
         ];
-        let mut idx = 0;
-        while let Some(batch) = result_stream.next().await {
-            let batch = batch.unwrap();
-            assert_eq!(expected_batch[idx], batch);
-            idx += 1;
-        }
+
+        check_stream(result_stream, expected_batch).await;
     }
 
     #[tokio::test]
diff --git a/horaedb/metric_engine/src/test_util.rs 
b/horaedb/metric_engine/src/test_util.rs
index 9c378b0a..2900990d 100644
--- a/horaedb/metric_engine/src/test_util.rs
+++ b/horaedb/metric_engine/src/test_util.rs
@@ -15,6 +15,20 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::{
+    collections::VecDeque,
+    pin::Pin,
+    task::{Context, Poll},
+};
+
+use arrow::array::RecordBatch;
+use arrow_schema::SchemaRef;
+use datafusion::{
+    error::Result as DfResult,
+    execution::{RecordBatchStream, SendableRecordBatchStream},
+};
+use futures::{Stream, StreamExt};
+
 #[macro_export]
 macro_rules! arrow_schema {
     ($(($field_name:expr, $data_type:ident)),* $(,)?) => {{
@@ -27,8 +41,135 @@ macro_rules! arrow_schema {
     }};
 }
 
-#[cfg(test)]
+#[macro_export]
+macro_rules! create_array {
+    (Boolean, $values: expr) => {
+        std::sync::Arc::new(arrow::array::BooleanArray::from($values))
+    };
+    (Int8, $values: expr) => {
+        std::sync::Arc::new(arrow::array::Int8Array::from($values))
+    };
+    (Int16, $values: expr) => {
+        std::sync::Arc::new(arrow::array::Int16Array::from($values))
+    };
+    (Int32, $values: expr) => {
+        std::sync::Arc::new(arrow::array::Int32Array::from($values))
+    };
+    (Int64, $values: expr) => {
+        std::sync::Arc::new(arrow::array::Int64Array::from($values))
+    };
+    (UInt8, $values: expr) => {
+        std::sync::Arc::new(arrow::array::UInt8Array::from($values))
+    };
+    (UInt16, $values: expr) => {
+        std::sync::Arc::new(arrow::array::UInt16Array::from($values))
+    };
+    (UInt32, $values: expr) => {
+        std::sync::Arc::new(arrow::array::UInt32Array::from($values))
+    };
+    (UInt64, $values: expr) => {
+        std::sync::Arc::new(arrow::array::UInt64Array::from($values))
+    };
+    (Float16, $values: expr) => {
+        std::sync::Arc::new(arrow::array::Float16Array::from($values))
+    };
+    (Float32, $values: expr) => {
+        std::sync::Arc::new(arrow::array::Float32Array::from($values))
+    };
+    (Float64, $values: expr) => {
+        std::sync::Arc::new(arrow::array::Float64Array::from($values))
+    };
+    (Utf8, $values: expr) => {
+        std::sync::Arc::new(arrow::array::StringArray::from($values))
+    };
+    (Binary, $values: expr) => {
+        std::sync::Arc::new(arrow::array::BinaryArray::from_vec($values))
+    };
+}
+
+/// Creates a record batch from literal slice of values, suitable for rapid
+/// testing and development.
+///
+/// Example:
+/// ```
+/// let batch = record_batch!(
+///     ("a", Int32, vec![1, 2, 3]),
+///     ("b", Float64, vec![Some(4.0), None, Some(5.0)]),
+///     ("c", Utf8, vec!["alpha", "beta", "gamma"])
+/// );
+/// ```
+#[macro_export]
+macro_rules! record_batch {
+    ($(($name: expr, $type: ident, $values: expr)),*) => {
+        {
+            let schema = std::sync::Arc::new(arrow_schema::Schema::new(vec![
+                $(
+                    arrow_schema::Field::new($name, 
arrow_schema::DataType::$type, true),
+                )*
+            ]));
+
+            let batch = arrow::array::RecordBatch::try_new(
+                schema,
+                vec![$(
+                    $crate::create_array!($type, $values),
+                )*]
+            );
+
+            batch
+        }
+    }
+}
+
+struct DequeBasedStream {
+    batches: VecDeque<RecordBatch>,
+    schema: SchemaRef,
+}
+
+impl RecordBatchStream for DequeBasedStream {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}
+
+impl Stream for DequeBasedStream {
+    type Item = DfResult<RecordBatch>;
+
+    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
+        if self.batches.is_empty() {
+            return Poll::Ready(None);
+        }
+
+        Poll::Ready(Some(Ok(self.batches.pop_front().unwrap())))
+    }
+}
+
+/// Creates an record batch stream for testing purposes.
+pub fn make_sendable_record_batches<I>(batches: I) -> SendableRecordBatchStream
+where
+    I: IntoIterator<Item = RecordBatch>,
+{
+    let batches = VecDeque::from_iter(batches);
+    let schema = batches[0].schema();
+    Box::pin(DequeBasedStream { batches, schema })
+}
+
+pub async fn check_stream<I>(mut stream: SendableRecordBatchStream, expected: 
I)
+where
+    I: IntoIterator<Item = RecordBatch>,
+{
+    let mut iter = expected.into_iter();
+    while let Some(batch) = stream.next().await {
+        let batch = batch.unwrap();
+        assert_eq!(batch, iter.next().unwrap());
+    }
+    assert!(iter.next().is_none());
+}
+
 mod tests {
+    use futures::StreamExt;
+
+    use super::*;
+
     #[test]
     fn test_arrow_schema_macro() {
         let schema = arrow_schema![("a", UInt8), ("b", UInt8), ("c", UInt8), 
("d", UInt8),];
@@ -38,4 +179,20 @@ mod tests {
             assert_eq!(f.name(), expected_names[i]);
         }
     }
+
+    #[tokio::test]
+    async fn test_sendable_record_batch() {
+        let input = [
+            record_batch!(("pk1", UInt8, vec![11, 11]), ("pk2", UInt8, 
vec![100, 100])).unwrap(),
+            record_batch!(("pk1", UInt8, vec![22, 22]), ("pk2", UInt8, 
vec![200, 200])).unwrap(),
+        ];
+        let mut stream = make_sendable_record_batches(input.clone());
+        let mut i = 0;
+        while let Some(batch) = stream.next().await {
+            let batch = batch.unwrap();
+            assert_eq!(batch, input[i]);
+            i += 1;
+        }
+        assert_eq!(2, i);
+    }
 }
diff --git a/horaedb/metric_engine/src/types.rs 
b/horaedb/metric_engine/src/types.rs
index fbb956c4..c4972fe3 100644
--- a/horaedb/metric_engine/src/types.rs
+++ b/horaedb/metric_engine/src/types.rs
@@ -121,6 +121,7 @@ pub struct WriteResult {
     pub size: usize,
 }
 
+#[derive(Debug)]
 pub struct ColumnOptions {
     pub enable_dict: Option<bool>,
     pub enable_bloom_filter: Option<bool>,
@@ -128,6 +129,7 @@ pub struct ColumnOptions {
     pub compression: Option<Compression>,
 }
 
+#[derive(Debug)]
 pub struct WriteOptions {
     pub max_row_group_size: usize,
     pub write_bacth_size: usize,
@@ -156,6 +158,7 @@ impl Default for WriteOptions {
     }
 }
 
+#[derive(Debug)]
 pub struct RuntimeOptions {
     pub manifest_compact_thread_num: usize,
     pub sst_compact_thread_num: usize,
@@ -170,6 +173,7 @@ impl Default for RuntimeOptions {
     }
 }
 
+#[derive(Debug)]
 pub struct ManifestMergeOptions {
     pub channel_size: usize,
     pub merge_interval_seconds: usize,
@@ -190,11 +194,19 @@ impl Default for ManifestMergeOptions {
     }
 }
 
-#[derive(Default)]
+#[derive(Debug, Default)]
+pub enum UpdateMode {
+    #[default]
+    Overwrite,
+    Append,
+}
+
+#[derive(Debug, Default)]
 pub struct StorageOptions {
     pub write_opts: WriteOptions,
     pub manifest_merge_opts: ManifestMergeOptions,
     pub runtime_opts: RuntimeOptions,
+    pub update_mode: UpdateMode,
 }
 
 #[cfg(test)]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to