This is an automated email from the ASF dual-hosted git repository.
baojinri pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new 9f2d0bc8 feat: support merge operator (#1605)
9f2d0bc8 is described below
commit 9f2d0bc8c9d829641f7af552f684e9eb85222d65
Author: Jiacai Liu <[email protected]>
AuthorDate: Fri Dec 13 10:12:26 2024 +0800
feat: support merge operator (#1605)
## Rationale
Close #1583
For rows with same primary key, we need to choose which value to use,
the answer is MergeOperator
## Detailed Changes
- Add MergeOperator trait, and add two implementations.
## Test Plan
CI
---
horaedb/metric_engine/Cargo.toml | 2 +-
horaedb/metric_engine/src/lib.rs | 2 +
horaedb/metric_engine/src/operator.rs | 161 +++++++++++++++++++++++
horaedb/metric_engine/src/read.rs | 228 +++++++++++++++++++++++----------
horaedb/metric_engine/src/storage.rs | 60 +++++----
horaedb/metric_engine/src/test_util.rs | 159 ++++++++++++++++++++++-
horaedb/metric_engine/src/types.rs | 14 +-
7 files changed, 529 insertions(+), 97 deletions(-)
diff --git a/horaedb/metric_engine/Cargo.toml b/horaedb/metric_engine/Cargo.toml
index 0d42c058..bfb70d44 100644
--- a/horaedb/metric_engine/Cargo.toml
+++ b/horaedb/metric_engine/Cargo.toml
@@ -36,9 +36,9 @@ arrow = { workspace = true }
arrow-schema = { workspace = true }
async-scoped = { workspace = true }
async-trait = { workspace = true }
+byteorder = { workspace = true }
bytes = { workspace = true }
bytesize = { workspace = true }
-byteorder = { workspace = true }
datafusion = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
diff --git a/horaedb/metric_engine/src/lib.rs b/horaedb/metric_engine/src/lib.rs
index c77cdbb3..26580cb3 100644
--- a/horaedb/metric_engine/src/lib.rs
+++ b/horaedb/metric_engine/src/lib.rs
@@ -22,9 +22,11 @@ mod compaction;
pub mod error;
mod macros;
mod manifest;
+pub mod operator;
mod read;
mod sst;
pub mod storage;
+#[cfg(test)]
mod test_util;
pub mod types;
diff --git a/horaedb/metric_engine/src/operator.rs
b/horaedb/metric_engine/src/operator.rs
new file mode 100644
index 00000000..9847c80e
--- /dev/null
+++ b/horaedb/metric_engine/src/operator.rs
@@ -0,0 +1,161 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{fmt::Debug, sync::Arc};
+
+use anyhow::Context;
+use arrow::{
+ array::{Array, BinaryArray, RecordBatch},
+ buffer::OffsetBuffer,
+};
+use arrow_schema::DataType;
+use macros::ensure;
+use tracing::debug;
+
+use crate::Result;
+
+pub trait MergeOperator: Send + Sync + Debug {
+ fn merge(&self, batch: RecordBatch) -> Result<RecordBatch>;
+}
+
+pub type MergeOperatorRef = Arc<dyn MergeOperator>;
+
+#[derive(Debug)]
+pub struct LastValueOperator;
+
+impl MergeOperator for LastValueOperator {
+ fn merge(&self, batch: RecordBatch) -> Result<RecordBatch> {
+ let last_row = batch.slice(batch.num_rows() - 1, 1);
+ Ok(last_row)
+ }
+}
+
+#[derive(Debug)]
+pub struct BytesMergeOperator {
+ /// Column index of the column need to append together
+ /// The column type must be `Binary`.
+ value_idxes: Vec<usize>,
+}
+
+impl BytesMergeOperator {
+ pub fn new(value_idxes: Vec<usize>) -> Self {
+ Self { value_idxes }
+ }
+}
+
+impl MergeOperator for BytesMergeOperator {
+ fn merge(&self, batch: RecordBatch) -> Result<RecordBatch> {
+ assert!(batch.num_rows() > 0);
+
+ for idx in &self.value_idxes {
+ let data_type = batch.column(*idx).data_type();
+ ensure!(
+ data_type == &DataType::Binary,
+ "MergeOperator is only used for binary column,
current:{data_type}"
+ );
+ }
+ debug!(batch = ?batch, "BytesMergeOperator merge");
+
+ let schema = batch.schema();
+ let columns = batch
+ .columns()
+ .iter()
+ .enumerate()
+ .map(|(idx, column)| {
+ if self.value_idxes.contains(&idx) {
+ // For value column, we append all elements
+ let binary_array =
column.as_any().downcast_ref::<BinaryArray>().unwrap();
+ if binary_array.is_empty() {
+ return column.clone();
+ }
+
+ let offsets = binary_array.offsets();
+ let start = offsets[0] as usize;
+ let length = offsets[offsets.len()-1] as usize - start;
+ if length == 0 {
+ return column.clone();
+ }
+
+ // bytes buffer is cheap for clone.
+ let byte_buffer =
binary_array.values().slice_with_length(start,length). clone();
+ debug!(byte_buffer = ?byte_buffer, offset = ?offsets,
"BytesMergeOperator merge");
+ let offsets =
OffsetBuffer::from_lengths([byte_buffer.len()]);
+ let concated_column = BinaryArray::new(offsets,
byte_buffer, None);
+ Arc::new(concated_column)
+ } else {
+ // For other columns, we just take the first element since
the primary key
+ // columns are the same.
+ column.slice(0, 1)
+ }
+ })
+ .collect();
+
+ let merged_batch = RecordBatch::try_new(schema, columns)
+ .context("failed to construct RecordBatch in
BytesMergeOperator.")?;
+
+ Ok(merged_batch)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+
+ use super::*;
+ use crate::record_batch;
+
+ #[test]
+ fn test_last_value_operator() {
+ let operator = LastValueOperator;
+ let batch = record_batch!(
+ ("pk1", UInt8, vec![11, 11, 11, 11]),
+ ("pk2", UInt8, vec![100, 100, 100, 100]),
+ ("value", Int64, vec![2, 7, 4, 1])
+ )
+ .unwrap();
+
+ let actual = operator.merge(batch).unwrap();
+ let expected = record_batch!(
+ ("pk1", UInt8, vec![11]),
+ ("pk2", UInt8, vec![100]),
+ ("value", Int64, vec![1])
+ )
+ .unwrap();
+ assert_eq!(actual, expected);
+ }
+
+ #[test]
+ fn test_bytes_merge_operator() {
+ let operator = BytesMergeOperator::new(vec![2]);
+
+ let batch = record_batch!(
+ ("pk1", UInt8, vec![11, 11, 11, 11]),
+ ("pk2", UInt8, vec![100, 100, 100, 100]),
+ ("value", Binary, vec![b"one", b"two", b"three", b"four"])
+ )
+ .unwrap();
+
+ let actual = operator.merge(batch).unwrap();
+ let expected = record_batch!(
+ ("pk1", UInt8, vec![11]),
+ ("pk2", UInt8, vec![100]),
+ ("value", Binary, vec![b"onetwothreefour"])
+ )
+ .unwrap();
+
+ assert_eq!(actual, expected);
+ }
+}
diff --git a/horaedb/metric_engine/src/read.rs
b/horaedb/metric_engine/src/read.rs
index 26564a1a..7769ff21 100644
--- a/horaedb/metric_engine/src/read.rs
+++ b/horaedb/metric_engine/src/read.rs
@@ -15,13 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-use std::{
- any::Any,
- pin::Pin,
- sync::Arc,
- task::{Context, Poll},
-};
+use std::{any::Any, pin::Pin, sync::Arc, task::Poll};
+use anyhow::Context;
use arrow::{
array::{AsArray, RecordBatch},
compute::concat_batches,
@@ -36,7 +32,6 @@ use datafusion::{
datasource::physical_plan::{FileMeta, ParquetFileReaderFactory},
error::{DataFusionError, Result as DfResult},
execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext},
- logical_expr::AggregateUDFImpl,
parquet::arrow::async_reader::AsyncFileReader,
physical_plan::{
metrics::ExecutionPlanMetricsSet, DisplayAs, Distribution,
ExecutionPlan, PlanProperties,
@@ -45,10 +40,13 @@ use datafusion::{
use futures::{Stream, StreamExt};
use itertools::Itertools;
use parquet::arrow::async_reader::ParquetObjectReader;
+use tracing::debug;
use crate::{
compare_primitive_columns,
+ operator::MergeOperator,
types::{ObjectStoreRef, SEQ_COLUMN_NAME},
+ Result,
};
#[derive(Debug, Clone)]
@@ -92,9 +90,8 @@ pub(crate) struct MergeExec {
num_primary_keys: usize,
/// Sequence column index
seq_idx: usize,
- // (idx, merge_op)
- value_idx: usize,
- value_op: Arc<dyn AggregateUDFImpl>,
+ /// Operator to merge values when primary keys are the same
+ value_operator: Arc<dyn MergeOperator>,
}
impl MergeExec {
@@ -102,15 +99,13 @@ impl MergeExec {
input: Arc<dyn ExecutionPlan>,
num_primary_keys: usize,
seq_idx: usize,
- value_idx: usize,
- value_op: Arc<dyn AggregateUDFImpl>,
+ value_operator: Arc<dyn MergeOperator>,
) -> Self {
Self {
input,
num_primary_keys,
seq_idx,
- value_idx,
- value_op,
+ value_operator,
}
}
}
@@ -162,8 +157,7 @@ impl ExecutionPlan for MergeExec {
Arc::clone(&children[0]),
self.num_primary_keys,
self.seq_idx,
- self.value_idx,
- self.value_op.clone(),
+ self.value_operator.clone(),
)))
}
@@ -180,8 +174,7 @@ impl ExecutionPlan for MergeExec {
self.input.execute(partition, context)?,
self.num_primary_keys,
self.seq_idx,
- self.value_idx,
- self.value_op.clone(),
+ self.value_operator.clone(),
)))
}
}
@@ -190,8 +183,7 @@ struct MergeStream {
stream: SendableRecordBatchStream,
num_primary_keys: usize,
seq_idx: usize,
- value_idx: usize,
- value_op: Arc<dyn AggregateUDFImpl>,
+ value_operator: Arc<dyn MergeOperator>,
pending_batch: Option<RecordBatch>,
arrow_schema: SchemaRef,
@@ -202,8 +194,7 @@ impl MergeStream {
stream: SendableRecordBatchStream,
num_primary_keys: usize,
seq_idx: usize,
- value_idx: usize,
- value_op: Arc<dyn AggregateUDFImpl>,
+ value_operator: Arc<dyn MergeOperator>,
) -> Self {
let fields = stream
.schema()
@@ -225,8 +216,7 @@ impl MergeStream {
stream,
num_primary_keys,
seq_idx,
- value_idx,
- value_op,
+ value_operator,
pending_batch: None,
arrow_schema,
}
@@ -242,6 +232,7 @@ impl MergeStream {
for k in 0..self.num_primary_keys {
let lhs_col = lhs.column(k);
let rhs_col = rhs.column(k);
+
compare_primitive_columns!(
lhs_col, rhs_col, lhs_idx, rhs_idx, // TODO: Add more types
here
UInt8Type, Int8Type, UInt32Type, Int32Type, UInt64Type,
Int64Type
@@ -258,9 +249,15 @@ impl MergeStream {
true
}
- // TODO: only support deduplication now, merge operation will be added
later.
- fn merge_batch(&mut self, batch: RecordBatch) -> DfResult<RecordBatch> {
- let mut batches = vec![];
+ fn merge_batch(&mut self, batch: RecordBatch) ->
Result<Option<RecordBatch>> {
+ if batch.num_rows() == 0 {
+ return Ok(None);
+ }
+
+ debug!(pending_batch = ?self.pending_batch, "Merge batch");
+
+ // Group rows with the same primary keys
+ let mut groupby_pk_batches = Vec::new();
let mut start_idx = 0;
while start_idx < batch.num_rows() {
let mut end_idx = start_idx + 1;
@@ -269,58 +266,86 @@ impl MergeStream {
{
end_idx += 1;
}
- let rows_with_same_primary_keys = batch.slice(start_idx, end_idx -
start_idx);
- if let Some(pending) = self.pending_batch.take() {
- if !self.primary_key_eq(
- &pending,
- pending.num_rows() - 1,
- &rows_with_same_primary_keys,
- 0,
- ) {
- // only keep the last row in this batch
- batches.push(pending.slice(pending.num_rows() - 1, 1));
- }
- }
- batches.push(
-
rows_with_same_primary_keys.slice(rows_with_same_primary_keys.num_rows() - 1,
1),
- );
-
+ groupby_pk_batches.push(batch.slice(start_idx, end_idx -
start_idx));
start_idx = end_idx;
+ debug!(end_idx = end_idx, "Group rows with the same primary keys");
+ }
+
+ let rows_with_same_primary_keys = &groupby_pk_batches[0];
+ let mut output_batches = Vec::new();
+ if let Some(pending) = self.pending_batch.take() {
+ if self.primary_key_eq(
+ &pending,
+ pending.num_rows() - 1,
+ rows_with_same_primary_keys,
+ 0,
+ ) {
+ groupby_pk_batches[0] = concat_batches(
+ &self.stream.schema(),
+ [&pending, rows_with_same_primary_keys],
+ )
+ .context("concat batch")?;
+ } else {
+ output_batches.push(self.value_operator.merge(pending)?);
+ }
}
// last batch may have overlapping rows with the next batch, so keep
them in
// pending_batch
- self.pending_batch = batches.pop();
-
- concat_batches(&self.stream.schema(), batches.iter())
- .map_err(|e| DataFusionError::ArrowError(e, None))
- .map(|mut batch| {
- // Remove seq column
- batch.remove_column(self.seq_idx);
- batch
- })
+ self.pending_batch = groupby_pk_batches.pop();
+
+ for batch in groupby_pk_batches {
+ output_batches.push(self.value_operator.merge(batch)?);
+ }
+ if output_batches.is_empty() {
+ return Ok(None);
+ }
+
+ let mut output_batches =
+ concat_batches(&self.stream.schema(),
output_batches.iter()).context("concat batch")?;
+ // Remove seq column
+ output_batches.remove_column(self.seq_idx);
+ Ok(Some(output_batches))
}
}
impl Stream for MergeStream {
type Item = DfResult<RecordBatch>;
- fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) ->
Poll<Option<Self::Item>> {
- match self.stream.poll_next_unpin(ctx) {
- Poll::Pending => Poll::Pending,
- Poll::Ready(None) => {
- let value = if let Some(mut pending) =
self.pending_batch.take() {
- pending.remove_column(self.seq_idx);
- Some(Ok(pending))
- } else {
- None
- };
- Poll::Ready(value)
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ ctx: &mut std::task::Context,
+ ) -> Poll<Option<Self::Item>> {
+ loop {
+ match self.stream.poll_next_unpin(ctx) {
+ Poll::Pending => return Poll::Pending,
+ Poll::Ready(None) => {
+ let value = if let Some(mut pending) =
self.pending_batch.take() {
+ pending.remove_column(self.seq_idx);
+ let res = self
+ .value_operator
+ .merge(pending)
+ .map_err(|e|
DataFusionError::External(Box::new(e)));
+ Some(res)
+ } else {
+ None
+ };
+ return Poll::Ready(value);
+ }
+ Poll::Ready(Some(v)) => match v {
+ Ok(v) => match self.merge_batch(v) {
+ Ok(v) => {
+ if let Some(v) = v {
+ return Poll::Ready(Some(Ok(v)));
+ }
+ }
+ Err(e) => {
+ return
Poll::Ready(Some(Err(DataFusionError::External(Box::new(e)))))
+ }
+ },
+ Err(e) => return Poll::Ready(Some(Err(e))),
+ },
}
- Poll::Ready(Some(v)) => Poll::Ready(Some(v.and_then(|batch| {
- let batch = self.merge_batch(batch)?;
- Ok(batch)
- }))),
}
}
}
@@ -330,3 +355,72 @@ impl RecordBatchStream for MergeStream {
self.arrow_schema.clone()
}
}
+
+#[cfg(test)]
+mod tests {
+ use test_log::test;
+
+ use super::*;
+ use crate::{
+ operator::{BytesMergeOperator, LastValueOperator, MergeOperatorRef},
+ record_batch,
+ test_util::{check_stream, make_sendable_record_batches},
+ };
+
+ #[test(tokio::test)]
+ async fn test_merge_stream() {
+ let expected = [
+ record_batch!(
+ ("pk1", UInt8, vec![11, 12]),
+ ("value", Binary, vec![b"2", b"4"])
+ )
+ .unwrap(),
+ record_batch!(("pk1", UInt8, vec![13]), ("value", Binary,
vec![b"8"])).unwrap(),
+ record_batch!(("pk1", UInt8, vec![14]), ("value", Binary,
vec![b"9"])).unwrap(),
+ ];
+
+ test_merge_stream_for_append_mode(Arc::new(LastValueOperator),
expected).await;
+
+ let expected = [
+ record_batch!(
+ ("pk1", UInt8, vec![11, 12]),
+ ("value", Binary, vec![b"12", b"34"])
+ )
+ .unwrap(),
+ record_batch!(("pk1", UInt8, vec![13]), ("value", Binary,
vec![b"5678"])).unwrap(),
+ record_batch!(("pk1", UInt8, vec![14]), ("value", Binary,
vec![b"9"])).unwrap(),
+ ];
+
+
test_merge_stream_for_append_mode(Arc::new(BytesMergeOperator::new(vec![1])),
expected)
+ .await;
+ }
+
+ async fn test_merge_stream_for_append_mode<I>(merge_op: MergeOperatorRef,
expected: I)
+ where
+ I: IntoIterator<Item = RecordBatch>,
+ {
+ let stream = make_sendable_record_batches([
+ record_batch!(
+ ("pk1", UInt8, vec![11, 11, 12, 12, 13]),
+ ("value", Binary, vec![b"1", b"2", b"3", b"4", b"5"]),
+ ("seq", UInt8, vec![1, 2, 3, 4, 5])
+ )
+ .unwrap(),
+ record_batch!(
+ ("pk1", UInt8, vec![13, 13]),
+ ("value", Binary, vec![b"6", b"7"]),
+ ("seq", UInt8, vec![6, 7])
+ )
+ .unwrap(),
+ record_batch!(
+ ("pk1", UInt8, vec![13, 14]),
+ ("value", Binary, vec![b"8", b"9"]),
+ ("seq", UInt8, vec![8, 9])
+ )
+ .unwrap(),
+ ]);
+
+ let stream = MergeStream::new(stream, 1, 2, merge_op);
+ check_stream(Box::pin(stream), expected).await;
+ }
+}
diff --git a/horaedb/metric_engine/src/storage.rs
b/horaedb/metric_engine/src/storage.rs
index e30cb2a8..f680bd94 100644
--- a/horaedb/metric_engine/src/storage.rs
+++ b/horaedb/metric_engine/src/storage.rs
@@ -31,7 +31,6 @@ use datafusion::{
physical_plan::{FileScanConfig, ParquetExec},
},
execution::{context::ExecutionProps, object_store::ObjectStoreUrl,
SendableRecordBatchStream},
- functions_aggregate::first_last::LastValue,
logical_expr::{utils::conjunction, Expr},
physical_expr::{create_physical_expr, LexOrdering},
physical_plan::{
@@ -59,11 +58,12 @@ use tokio::runtime::Runtime;
use crate::{
compaction::{CompactionScheduler, SchedulerConfig},
manifest::{Manifest, ManifestRef},
+ operator::{BytesMergeOperator, LastValueOperator},
read::{DefaultParquetFileReaderFactory, MergeExec},
sst::{allocate_id, FileMeta, SstFile, SstPathGenerator},
types::{
- ObjectStoreRef, RuntimeOptions, StorageOptions, TimeRange,
WriteOptions, WriteResult,
- SEQ_COLUMN_NAME,
+ ObjectStoreRef, RuntimeOptions, StorageOptions, TimeRange, UpdateMode,
WriteOptions,
+ WriteResult, SEQ_COLUMN_NAME,
},
Result,
};
@@ -138,6 +138,9 @@ pub struct CloudObjectStorage {
store: ObjectStoreRef,
arrow_schema: SchemaRef,
num_primary_keys: usize,
+ seq_idx: usize,
+ value_idxes: Vec<usize>,
+ update_mode: UpdateMode,
manifest: ManifestRef,
runtimes: StorageRuntimes,
@@ -167,8 +170,10 @@ impl CloudObjectStorage {
num_primary_keys: usize,
storage_opts: StorageOptions,
) -> Result<Self> {
- let runtimes = StorageRuntimes::new(storage_opts.runtime_opts)?;
+ let value_idxes =
(num_primary_keys..arrow_schema.fields.len()).collect::<Vec<_>>();
+ ensure!(!value_idxes.is_empty(), "no value column found");
+ let runtimes = StorageRuntimes::new(storage_opts.runtime_opts)?;
let manifest = Arc::new(
Manifest::try_new(
path.clone(),
@@ -178,12 +183,14 @@ impl CloudObjectStorage {
)
.await?,
);
+
let mut new_fields = arrow_schema.fields.clone().to_vec();
new_fields.push(Arc::new(Field::new(
SEQ_COLUMN_NAME,
DataType::UInt64,
true,
)));
+ let seq_idx = new_fields.len() - 1;
let arrow_schema = Arc::new(Schema::new_with_metadata(
new_fields,
arrow_schema.metadata.clone(),
@@ -199,9 +206,13 @@ impl CloudObjectStorage {
sst_path_gen.clone(),
SchedulerConfig::default(),
);
+ let update_mode = storage_opts.update_mode;
Ok(Self {
path,
num_primary_keys,
+ seq_idx,
+ value_idxes,
+ update_mode,
segment_duration,
store,
arrow_schema,
@@ -329,7 +340,7 @@ impl CloudObjectStorage {
builder.build()
}
- async fn build_scan_plan(
+ fn build_scan_plan(
&self,
ssts: Vec<SstFile>,
projections: Option<Vec<usize>>,
@@ -373,9 +384,11 @@ impl CloudObjectStorage {
let merge_exec = MergeExec::new(
Arc::new(sort_exec),
self.num_primary_keys,
- self.schema().fields.len() - 1,
- 0, // TODO: value_idx, not used now.
- Arc::new(LastValue::new()),
+ self.seq_idx,
+ match self.update_mode {
+ UpdateMode::Overwrite => Arc::new(LastValueOperator),
+ UpdateMode::Append =>
Arc::new(BytesMergeOperator::new(self.value_idxes.clone())),
+ },
);
Ok(Arc::new(merge_exec))
}
@@ -429,9 +442,8 @@ impl TimeMergeStorage for CloudObjectStorage {
let mut plan_for_all_segments = Vec::new();
for (_, ssts) in ssts_by_segment.sorted_by(|a, b| a.0.cmp(&b.0)) {
- let plan = self
- .build_scan_plan(ssts, req.projections.clone(),
req.predicate.clone())
- .await?;
+ let plan =
+ self.build_scan_plan(ssts, req.projections.clone(),
req.predicate.clone())?;
plan_for_all_segments.push(plan);
}
@@ -455,16 +467,15 @@ impl TimeMergeStorage for CloudObjectStorage {
#[cfg(test)]
mod tests {
- use arrow::array::{self as arrow_array};
- use datafusion::common::record_batch;
use object_store::local::LocalFileSystem;
+ use test_log::test;
use super::*;
- use crate::{arrow_schema, types::Timestamp};
+ use crate::{arrow_schema, record_batch, test_util::check_stream,
types::Timestamp};
#[tokio::test]
async fn test_build_scan_plan() {
- let schema = arrow_schema!(("pk1", UInt8));
+ let schema = arrow_schema!(("pk1", UInt8), ("value", UInt8));
let store = Arc::new(LocalFileSystem::new());
let storage = CloudObjectStorage::try_new(
"mock".to_string(),
@@ -494,21 +505,20 @@ mod tests {
None,
vec![],
)
- .await
.unwrap();
let display_plan =
datafusion::physical_plan::display::DisplayableExecutionPlan::new(plan.as_ref())
.indent(true);
assert_eq!(
- r#"MergeExec: [primary_keys: 1, seq_idx: 1]
- SortPreservingMergeExec: [pk1@0 ASC, __seq__@1 ASC], fetch=1024
- ParquetExec: file_groups={3 groups: [[mock/data/100.sst],
[mock/data/101.sst], [mock/data/102.sst]]}, projection=[pk1, __seq__],
output_orderings=[[pk1@0 ASC, __seq__@1 ASC], [pk1@0 ASC, __seq__@1 ASC],
[pk1@0 ASC, __seq__@1 ASC]]
+ r#"MergeExec: [primary_keys: 1, seq_idx: 2]
+ SortPreservingMergeExec: [pk1@0 ASC, __seq__@2 ASC], fetch=1024
+ ParquetExec: file_groups={3 groups: [[mock/data/100.sst],
[mock/data/101.sst], [mock/data/102.sst]]}, projection=[pk1, value, __seq__],
output_orderings=[[pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC],
[pk1@0 ASC, __seq__@2 ASC]]
"#,
format!("{display_plan}")
);
}
- #[tokio::test]
+ #[test(tokio::test)]
async fn test_storage_write_and_scan() {
let schema = arrow_schema!(("pk1", UInt8), ("pk2", UInt8), ("value",
Int64));
let root_dir = temp_dir::TempDir::new().unwrap();
@@ -554,7 +564,7 @@ mod tests {
.await
.unwrap();
- let mut result_stream = storage
+ let result_stream = storage
.scan(ScanRequest {
range: TimeRange::new(Timestamp(0), Timestamp::MAX),
predicate: vec![],
@@ -576,12 +586,8 @@ mod tests {
)
.unwrap(),
];
- let mut idx = 0;
- while let Some(batch) = result_stream.next().await {
- let batch = batch.unwrap();
- assert_eq!(expected_batch[idx], batch);
- idx += 1;
- }
+
+ check_stream(result_stream, expected_batch).await;
}
#[tokio::test]
diff --git a/horaedb/metric_engine/src/test_util.rs
b/horaedb/metric_engine/src/test_util.rs
index 9c378b0a..2900990d 100644
--- a/horaedb/metric_engine/src/test_util.rs
+++ b/horaedb/metric_engine/src/test_util.rs
@@ -15,6 +15,20 @@
// specific language governing permissions and limitations
// under the License.
+use std::{
+ collections::VecDeque,
+ pin::Pin,
+ task::{Context, Poll},
+};
+
+use arrow::array::RecordBatch;
+use arrow_schema::SchemaRef;
+use datafusion::{
+ error::Result as DfResult,
+ execution::{RecordBatchStream, SendableRecordBatchStream},
+};
+use futures::{Stream, StreamExt};
+
#[macro_export]
macro_rules! arrow_schema {
($(($field_name:expr, $data_type:ident)),* $(,)?) => {{
@@ -27,8 +41,135 @@ macro_rules! arrow_schema {
}};
}
-#[cfg(test)]
+#[macro_export]
+macro_rules! create_array {
+ (Boolean, $values: expr) => {
+ std::sync::Arc::new(arrow::array::BooleanArray::from($values))
+ };
+ (Int8, $values: expr) => {
+ std::sync::Arc::new(arrow::array::Int8Array::from($values))
+ };
+ (Int16, $values: expr) => {
+ std::sync::Arc::new(arrow::array::Int16Array::from($values))
+ };
+ (Int32, $values: expr) => {
+ std::sync::Arc::new(arrow::array::Int32Array::from($values))
+ };
+ (Int64, $values: expr) => {
+ std::sync::Arc::new(arrow::array::Int64Array::from($values))
+ };
+ (UInt8, $values: expr) => {
+ std::sync::Arc::new(arrow::array::UInt8Array::from($values))
+ };
+ (UInt16, $values: expr) => {
+ std::sync::Arc::new(arrow::array::UInt16Array::from($values))
+ };
+ (UInt32, $values: expr) => {
+ std::sync::Arc::new(arrow::array::UInt32Array::from($values))
+ };
+ (UInt64, $values: expr) => {
+ std::sync::Arc::new(arrow::array::UInt64Array::from($values))
+ };
+ (Float16, $values: expr) => {
+ std::sync::Arc::new(arrow::array::Float16Array::from($values))
+ };
+ (Float32, $values: expr) => {
+ std::sync::Arc::new(arrow::array::Float32Array::from($values))
+ };
+ (Float64, $values: expr) => {
+ std::sync::Arc::new(arrow::array::Float64Array::from($values))
+ };
+ (Utf8, $values: expr) => {
+ std::sync::Arc::new(arrow::array::StringArray::from($values))
+ };
+ (Binary, $values: expr) => {
+ std::sync::Arc::new(arrow::array::BinaryArray::from_vec($values))
+ };
+}
+
+/// Creates a record batch from literal slice of values, suitable for rapid
+/// testing and development.
+///
+/// Example:
+/// ```
+/// let batch = record_batch!(
+/// ("a", Int32, vec![1, 2, 3]),
+/// ("b", Float64, vec![Some(4.0), None, Some(5.0)]),
+/// ("c", Utf8, vec!["alpha", "beta", "gamma"])
+/// );
+/// ```
+#[macro_export]
+macro_rules! record_batch {
+ ($(($name: expr, $type: ident, $values: expr)),*) => {
+ {
+ let schema = std::sync::Arc::new(arrow_schema::Schema::new(vec![
+ $(
+ arrow_schema::Field::new($name,
arrow_schema::DataType::$type, true),
+ )*
+ ]));
+
+ let batch = arrow::array::RecordBatch::try_new(
+ schema,
+ vec![$(
+ $crate::create_array!($type, $values),
+ )*]
+ );
+
+ batch
+ }
+ }
+}
+
+struct DequeBasedStream {
+ batches: VecDeque<RecordBatch>,
+ schema: SchemaRef,
+}
+
+impl RecordBatchStream for DequeBasedStream {
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+}
+
+impl Stream for DequeBasedStream {
+ type Item = DfResult<RecordBatch>;
+
+ fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
+ if self.batches.is_empty() {
+ return Poll::Ready(None);
+ }
+
+ Poll::Ready(Some(Ok(self.batches.pop_front().unwrap())))
+ }
+}
+
+/// Creates an record batch stream for testing purposes.
+pub fn make_sendable_record_batches<I>(batches: I) -> SendableRecordBatchStream
+where
+ I: IntoIterator<Item = RecordBatch>,
+{
+ let batches = VecDeque::from_iter(batches);
+ let schema = batches[0].schema();
+ Box::pin(DequeBasedStream { batches, schema })
+}
+
+pub async fn check_stream<I>(mut stream: SendableRecordBatchStream, expected:
I)
+where
+ I: IntoIterator<Item = RecordBatch>,
+{
+ let mut iter = expected.into_iter();
+ while let Some(batch) = stream.next().await {
+ let batch = batch.unwrap();
+ assert_eq!(batch, iter.next().unwrap());
+ }
+ assert!(iter.next().is_none());
+}
+
mod tests {
+ use futures::StreamExt;
+
+ use super::*;
+
#[test]
fn test_arrow_schema_macro() {
let schema = arrow_schema![("a", UInt8), ("b", UInt8), ("c", UInt8),
("d", UInt8),];
@@ -38,4 +179,20 @@ mod tests {
assert_eq!(f.name(), expected_names[i]);
}
}
+
+ #[tokio::test]
+ async fn test_sendable_record_batch() {
+ let input = [
+ record_batch!(("pk1", UInt8, vec![11, 11]), ("pk2", UInt8,
vec![100, 100])).unwrap(),
+ record_batch!(("pk1", UInt8, vec![22, 22]), ("pk2", UInt8,
vec![200, 200])).unwrap(),
+ ];
+ let mut stream = make_sendable_record_batches(input.clone());
+ let mut i = 0;
+ while let Some(batch) = stream.next().await {
+ let batch = batch.unwrap();
+ assert_eq!(batch, input[i]);
+ i += 1;
+ }
+ assert_eq!(2, i);
+ }
}
diff --git a/horaedb/metric_engine/src/types.rs
b/horaedb/metric_engine/src/types.rs
index fbb956c4..c4972fe3 100644
--- a/horaedb/metric_engine/src/types.rs
+++ b/horaedb/metric_engine/src/types.rs
@@ -121,6 +121,7 @@ pub struct WriteResult {
pub size: usize,
}
+#[derive(Debug)]
pub struct ColumnOptions {
pub enable_dict: Option<bool>,
pub enable_bloom_filter: Option<bool>,
@@ -128,6 +129,7 @@ pub struct ColumnOptions {
pub compression: Option<Compression>,
}
+#[derive(Debug)]
pub struct WriteOptions {
pub max_row_group_size: usize,
pub write_bacth_size: usize,
@@ -156,6 +158,7 @@ impl Default for WriteOptions {
}
}
+#[derive(Debug)]
pub struct RuntimeOptions {
pub manifest_compact_thread_num: usize,
pub sst_compact_thread_num: usize,
@@ -170,6 +173,7 @@ impl Default for RuntimeOptions {
}
}
+#[derive(Debug)]
pub struct ManifestMergeOptions {
pub channel_size: usize,
pub merge_interval_seconds: usize,
@@ -190,11 +194,19 @@ impl Default for ManifestMergeOptions {
}
}
-#[derive(Default)]
+#[derive(Debug, Default)]
+pub enum UpdateMode {
+ #[default]
+ Overwrite,
+ Append,
+}
+
+#[derive(Debug, Default)]
pub struct StorageOptions {
pub write_opts: WriteOptions,
pub manifest_merge_opts: ManifestMergeOptions,
pub runtime_opts: RuntimeOptions,
+ pub update_mode: UpdateMode,
}
#[cfg(test)]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]