This is an automated email from the ASF dual-hosted git repository.
chunshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new 431ae516 refactor: first step to refactor error (#1524)
431ae516 is described below
commit 431ae5169f9c97bd5b6f6664be889df47c360a0a
Author: Jiacai Liu <[email protected]>
AuthorDate: Fri Apr 26 17:04:26 2024 +0800
refactor: first step to refactor error (#1524)
## Rationale
Part of #1513
## Detailed Changes
Replace snafu-based Error to thiserror-based for memtable module.
## Test Plan
CI
---------
Co-authored-by: chunshao.rcs <[email protected]>
---
Cargo.lock | 2 +
Cargo.toml | 1 +
src/analytic_engine/Cargo.toml | 2 +
src/analytic_engine/src/error.rs | 23 +++++
src/analytic_engine/src/instance/wal_replayer.rs | 25 +++---
src/analytic_engine/src/lib.rs | 2 +
src/analytic_engine/src/memtable/columnar/iter.rs | 42 ++++-----
src/analytic_engine/src/memtable/columnar/mod.rs | 40 +++------
src/analytic_engine/src/memtable/error.rs | 51 +++++++++++
src/analytic_engine/src/memtable/layered/iter.rs | 13 ++-
src/analytic_engine/src/memtable/layered/mod.rs | 32 +++----
src/analytic_engine/src/memtable/mod.rs | 101 ++--------------------
src/analytic_engine/src/memtable/reversed_iter.rs | 7 +-
src/analytic_engine/src/memtable/skiplist/iter.rs | 29 ++++---
src/analytic_engine/src/memtable/skiplist/mod.rs | 27 +++---
src/components/macros/src/lib.rs | 22 +++++
16 files changed, 200 insertions(+), 219 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index c8e331b0..8064a066 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -79,6 +79,7 @@ checksum =
"0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5"
name = "analytic_engine"
version = "2.0.0"
dependencies = [
+ "anyhow",
"arc-swap 1.6.0",
"arena",
"arrow 49.0.0",
@@ -126,6 +127,7 @@ dependencies = [
"table_kv",
"tempfile",
"test_util",
+ "thiserror",
"time_ext",
"tokio",
"trace_metric",
diff --git a/Cargo.toml b/Cargo.toml
index 77277e08..4563acf5 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -97,6 +97,7 @@ async-trait = "0.1.72"
atomic_enum = "0.2.0"
base64 = "0.13"
bytes = "1"
+thiserror = "1"
bytes_ext = { path = "src/components/bytes_ext" }
catalog = { path = "src/catalog" }
catalog_impls = { path = "src/catalog_impls" }
diff --git a/src/analytic_engine/Cargo.toml b/src/analytic_engine/Cargo.toml
index 2773420b..1e02406a 100644
--- a/src/analytic_engine/Cargo.toml
+++ b/src/analytic_engine/Cargo.toml
@@ -38,6 +38,7 @@ wal-rocksdb = ["wal/wal-rocksdb"]
[dependencies]
# In alphabetical order
+anyhow = { workspace = true }
arc-swap = "1.4.0"
arena = { workspace = true }
arrow = { workspace = true }
@@ -81,6 +82,7 @@ snafu = { workspace = true }
table_engine = { workspace = true }
table_kv = { workspace = true }
tempfile = { workspace = true, optional = true }
+thiserror = { workspace = true }
time_ext = { workspace = true }
tokio = { workspace = true }
trace_metric = { workspace = true }
diff --git a/src/analytic_engine/src/error.rs b/src/analytic_engine/src/error.rs
new file mode 100644
index 00000000..205ef1f0
--- /dev/null
+++ b/src/analytic_engine/src/error.rs
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// Global Error type for analytic engine.
+#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
+pub enum ErrorKind {
+ KeyTooLarge,
+ Internal,
+}
diff --git a/src/analytic_engine/src/instance/wal_replayer.rs
b/src/analytic_engine/src/instance/wal_replayer.rs
index 6a996fe9..ac1c1a8e 100644
--- a/src/analytic_engine/src/instance/wal_replayer.rs
+++ b/src/analytic_engine/src/instance/wal_replayer.rs
@@ -50,10 +50,11 @@ use crate::{
engine::{Error, ReplayWalWithCause, Result},
flush_compaction::{Flusher, TableFlushOptions},
serial_executor::TableOpSerialExecutor,
- write::MemTableWriter,
+ write::{Error as WriteError, MemTableWriter},
},
payload::{ReadPayload, SingleSchemaProviderAdapter, TableSchemaProvider,
WalDecoder},
table::data::TableDataRef,
+ ErrorKind,
};
// Metrics of wal replayer
@@ -547,22 +548,20 @@ async fn replay_table_log_entries(
let index_in_writer =
IndexInWriterSchema::for_same_schema(row_group.schema().num_columns());
let memtable_writer = MemTableWriter::new(table_data.clone(),
serial_exec);
- let write_res = memtable_writer
- .write(sequence, row_group, index_in_writer)
- .box_err()
- .context(ReplayWalWithCause {
- msg: Some(format!(
- "table_id:{}, table_name:{}, space_id:{}",
- table_data.space_id, table_data.name, table_data.id
- )),
- });
+ let write_res = memtable_writer.write(sequence, row_group,
index_in_writer);
if let Err(e) = write_res {
- // TODO: find a better way to match this.
- if
e.to_string().contains(crate::memtable::TOO_LARGE_MESSAGE) {
+ if matches!(e, WriteError::UpdateMemTableSequence { ref
source } if source.kind() == ErrorKind::KeyTooLarge )
+ {
// ignore this error
warn!("Unable to insert memtable, err:{e}");
} else {
- return Err(e);
+ return Err(Error::ReplayWalWithCause {
+ msg: Some(format!(
+ "table_id:{}, table_name:{}, space_id:{}",
+ table_data.space_id, table_data.name,
table_data.id
+ )),
+ source: Box::new(e),
+ });
}
}
diff --git a/src/analytic_engine/src/lib.rs b/src/analytic_engine/src/lib.rs
index e363e782..c1308d88 100644
--- a/src/analytic_engine/src/lib.rs
+++ b/src/analytic_engine/src/lib.rs
@@ -22,6 +22,7 @@
mod compaction;
mod context;
mod engine;
+pub mod error;
mod instance;
mod manifest;
pub mod memtable;
@@ -39,6 +40,7 @@ pub mod table_meta_set_impl;
#[cfg(any(test, feature = "test"))]
pub mod tests;
+use error::ErrorKind;
use manifest::details::Options as ManifestOptions;
use object_store::config::StorageOptions;
use serde::{Deserialize, Serialize};
diff --git a/src/analytic_engine/src/memtable/columnar/iter.rs
b/src/analytic_engine/src/memtable/columnar/iter.rs
index 7abc24d2..5400fbd9 100644
--- a/src/analytic_engine/src/memtable/columnar/iter.rs
+++ b/src/analytic_engine/src/memtable/columnar/iter.rs
@@ -23,6 +23,7 @@ use std::{
time::Instant,
};
+use anyhow::Context;
use arena::{Arena, BasicStats, MonoIncArena};
use bytes_ext::{ByteVec, Bytes};
use codec::{memcomparable::MemComparable, row, Encoder};
@@ -36,17 +37,14 @@ use common_types::{
schema::Schema,
SequenceNumber,
};
-use generic_error::BoxError;
use logger::trace;
+use macros::ensure;
use parquet::data_type::AsBytes;
use skiplist::{ArenaSlice, BytewiseComparator, IterRef, Skiplist};
-use snafu::{OptionExt, ResultExt};
use crate::memtable::{
- key,
- key::{KeySequence, SequenceCodec},
- AppendRow, BuildRecordBatch, DecodeInternalKey, Internal, InternalNoCause,
IterTimeout,
- ProjectSchema, Result, ScanContext, ScanRequest,
+ key::{self, KeySequence, SequenceCodec},
+ Result, ScanContext, ScanRequest,
};
/// Iterator state
@@ -106,7 +104,7 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send>
ColumnarIterImpl<A> {
let row_projector = request
.row_projector_builder
.build(&schema)
- .context(ProjectSchema)?;
+ .context("ProjectSchema")?;
let mut columnar_iter = Self {
memtable,
row_num,
@@ -147,15 +145,10 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send>
ColumnarIterImpl<A> {
let column_schema = self.memtable_schema.column(*idx);
let column = memtable
.get(&column_schema.id)
- .with_context(|| InternalNoCause {
- msg: format!("column not found, column:{}",
column_schema.name),
- })?;
+ .with_context(|| format!("column not found, column:{}",
column_schema.name))?;
for (i, key) in
key_vec.iter_mut().enumerate().take(self.row_num) {
let datum = column.get_datum(i);
- encoder
- .encode(key, &datum)
- .box_err()
- .context(Internal { msg: "encode key" })?;
+ encoder.encode(key, &datum).context("encode key")?;
}
}
@@ -163,10 +156,7 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send>
ColumnarIterImpl<A> {
for (i, mut key) in key_vec.into_iter().enumerate() {
SequenceCodec
.encode(&mut key, &KeySequence::new(self.last_sequence, i
as u32))
- .box_err()
- .context(Internal {
- msg: "encode key sequence",
- })?;
+ .context("encode key sequence")?;
self.skiplist.put(&key, (i as u32).to_le_bytes().as_slice());
}
@@ -203,9 +193,10 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send>
ColumnarIterImpl<A> {
if !rows.is_empty() {
if let Some(deadline) = self.deadline {
let now = Instant::now();
- if now >= deadline {
- return IterTimeout { now, deadline }.fail();
- }
+ ensure!(
+ now < deadline,
+ "iter timeout, now:{now:?}, deadline:{deadline:?}"
+ );
}
let fetched_schema = self.row_projector.fetched_schema().clone();
@@ -219,10 +210,10 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send>
ColumnarIterImpl<A> {
self.batch_size,
);
for row in rows.into_iter() {
- builder.append_row(row).context(AppendRow)?;
+ builder.append_row(row).context("AppendRow")?;
}
- let batch = builder.build().context(BuildRecordBatch)?;
+ let batch = builder.build().context("BuildRecordBatch")?;
trace!("column iterator send one batch:{:?}", batch);
Ok(Some(batch))
} else {
@@ -245,7 +236,8 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send>
ColumnarIterImpl<A> {
while self.iter.valid() {
// Fetch current entry
let key = self.iter.key();
- let (user_key, _) =
key::user_key_from_internal_key(key).context(DecodeInternalKey)?;
+ let (user_key, _) =
+
key::user_key_from_internal_key(key).context("DecodeInternalKey")?;
// Check user key is still in range
if self.is_after_end_bound(user_key) {
@@ -262,7 +254,7 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send>
ColumnarIterImpl<A> {
// be set as last_internal_key so maybe we can just
// unwrap it?
let (last_user_key, _) =
key::user_key_from_internal_key(last_internal_key)
- .context(DecodeInternalKey)?;
+ .context("DecodeInternalKey")?;
user_key == last_user_key
}
// This is the first user key
diff --git a/src/analytic_engine/src/memtable/columnar/mod.rs
b/src/analytic_engine/src/memtable/columnar/mod.rs
index 6cbe783f..33bd86bd 100644
--- a/src/analytic_engine/src/memtable/columnar/mod.rs
+++ b/src/analytic_engine/src/memtable/columnar/mod.rs
@@ -23,22 +23,21 @@ use std::{
},
};
+use anyhow::Context;
use arena::MonoIncArena;
use bytes_ext::Bytes;
use common_types::{
column::Column, column_schema::ColumnId, datum::Datum, row::Row,
schema::Schema,
time::TimeRange, SequenceNumber,
};
-use generic_error::BoxError;
use logger::debug;
+use macros::ensure;
use skiplist::{BytewiseComparator, Skiplist};
-use snafu::{ensure, OptionExt, ResultExt};
use crate::memtable::{
columnar::iter::ColumnarIterImpl, factory::Options, key::KeySequence,
- reversed_iter::ReversedColumnarIterator, ColumnarIterPtr, Internal,
InternalNoCause,
- InvalidPutSequence, MemTable, Metrics as MemtableMetrics, PutContext,
Result, ScanContext,
- ScanRequest,
+ reversed_iter::ReversedColumnarIterator, ColumnarIterPtr, MemTable,
Metrics as MemtableMetrics,
+ PutContext, Result, ScanContext, ScanRequest,
};
pub mod factory;
@@ -108,16 +107,11 @@ impl MemTable for ColumnarMemTable {
} else {
// TODO: impl append() one row in column, avoid memory
expansion.
let column = Column::with_capacity(1, column_schema.data_type)
- .box_err()
- .context(Internal {
- msg: "new column failed",
- })?;
+ .context("new column failed")?;
columns.insert(column_schema.id, column);
columns
.get_mut(&column_schema.id)
- .context(InternalNoCause {
- msg: "get column failed",
- })?
+ .context("get column failed")?
};
if let Some(writer_index) =
ctx.index_in_writer.column_index_in_writer(i) {
@@ -127,10 +121,7 @@ impl MemTable for ColumnarMemTable {
} else {
column
.append_datum_ref(&row[writer_index])
- .box_err()
- .context(Internal {
- msg: "append datum failed",
- })?
+ .context("append datum failed")?
}
} else {
column.append_nulls(1);
@@ -140,9 +131,7 @@ impl MemTable for ColumnarMemTable {
let mut memtable = self.memtable.write().unwrap();
for (k, v) in columns {
if let Some(column) = memtable.get_mut(&k) {
- column.append_column(v).box_err().context(Internal {
- msg: "append column",
- })?;
+ column.append_column(v).context("append column")?;
} else {
memtable.insert(k, v);
};
@@ -174,18 +163,14 @@ impl MemTable for ColumnarMemTable {
.schema
.columns()
.get(self.schema.timestamp_index())
- .context(InternalNoCause {
- msg: "timestamp column is missing",
- })?;
+ .context("timestamp column is missing")?;
let num_rows = self
.memtable
.read()
.unwrap()
.get(×tamp_column.id)
- .context(InternalNoCause {
- msg: "get timestamp column failed",
- })?
+ .context("get timestamp column failed")?
.len();
let (reverse, batch_size) = (request.reverse, ctx.batch_size);
let arena = MonoIncArena::with_collector(
@@ -219,10 +204,7 @@ impl MemTable for ColumnarMemTable {
let last = self.last_sequence();
ensure!(
sequence >= last,
- InvalidPutSequence {
- given: sequence,
- last
- }
+ "invalid sequence, given:{sequence}, last:{last}"
);
self.last_sequence.store(sequence, Ordering::Relaxed);
diff --git a/src/analytic_engine/src/memtable/error.rs
b/src/analytic_engine/src/memtable/error.rs
new file mode 100644
index 00000000..4389b0e4
--- /dev/null
+++ b/src/analytic_engine/src/memtable/error.rs
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use thiserror::Error;
+
+use crate::ErrorKind;
+
+#[derive(Debug, Error)]
+#[error(transparent)]
+pub struct Error(#[from] InnerError);
+
+impl From<anyhow::Error> for Error {
+ fn from(source: anyhow::Error) -> Self {
+ Self(InnerError::Other { source })
+ }
+}
+
+impl Error {
+ pub fn kind(&self) -> ErrorKind {
+ match self.0 {
+ InnerError::KeyTooLarge { .. } => ErrorKind::KeyTooLarge,
+ InnerError::Other { .. } => ErrorKind::Internal,
+ }
+ }
+}
+
+#[derive(Error, Debug)]
+pub(crate) enum InnerError {
+ #[error("too large key, max:{max}, current:{current}")]
+ KeyTooLarge { current: usize, max: usize },
+
+ #[error(transparent)]
+ Other {
+ #[from]
+ source: anyhow::Error,
+ },
+}
diff --git a/src/analytic_engine/src/memtable/layered/iter.rs
b/src/analytic_engine/src/memtable/layered/iter.rs
index be32b77e..7b051966 100644
--- a/src/analytic_engine/src/memtable/layered/iter.rs
+++ b/src/analytic_engine/src/memtable/layered/iter.rs
@@ -17,13 +17,12 @@
//! Skiplist memtable iterator
+use anyhow::Context;
use common_types::{record_batch::FetchedRecordBatch, schema::Schema,
time::TimeRange};
-use generic_error::BoxError;
-use snafu::ResultExt;
use crate::memtable::{
layered::{ImmutableSegment, MutableSegment},
- ColumnarIterPtr, Internal, ProjectSchema, Result, ScanContext, ScanRequest,
+ ColumnarIterPtr, Error, Result, ScanContext, ScanRequest,
};
/// Columnar iterator for [LayeredMemTable]
@@ -43,7 +42,7 @@ impl ColumnarIterImpl {
let row_projector = request
.row_projector_builder
.build(memtable_schema)
- .context(ProjectSchema)?;
+ .context("build row projector")?;
let (maybe_mutable, selected_immutables) =
Self::filter_by_time_range(mutable, immutables,
request.time_range);
@@ -63,13 +62,11 @@ impl ColumnarIterImpl {
fetched_column_indexes,
batch.clone(),
)
- .box_err()
- .with_context(|| Internal {
- msg: format!("row_projector:{row_projector:?}",),
- })
+ .map_err(|e| Error::from(anyhow::Error::new(e)))
})
})
.collect::<Vec<_>>();
+
let immutable_iter = immutable_batches.into_iter();
let maybe_mutable_iter = match maybe_mutable {
diff --git a/src/analytic_engine/src/memtable/layered/mod.rs
b/src/analytic_engine/src/memtable/layered/mod.rs
index 35d0b4a1..72e992bc 100644
--- a/src/analytic_engine/src/memtable/layered/mod.rs
+++ b/src/analytic_engine/src/memtable/layered/mod.rs
@@ -29,6 +29,7 @@ use std::{
},
};
+use anyhow::Context;
use arena::CollectorRef;
use arrow::record_batch::RecordBatch as ArrowRecordBatch;
use bytes_ext::Bytes;
@@ -36,17 +37,15 @@ use common_types::{
projected_schema::RowProjectorBuilder, row::Row, schema::Schema,
time::TimeRange,
SequenceNumber,
};
-use generic_error::BoxError;
use logger::debug;
use skiplist::{BytewiseComparator, KeyComparator};
-use snafu::{OptionExt, ResultExt};
use crate::memtable::{
factory::{FactoryRef, Options},
key::KeySequence,
layered::iter::ColumnarIterImpl,
- ColumnarIterPtr, Internal, InternalNoCause, MemTable, MemTableRef, Metrics
as MemtableMetrics,
- PutContext, Result, ScanContext, ScanRequest,
+ ColumnarIterPtr, MemTable, MemTableRef, Metrics as MemtableMetrics,
PutContext, Result,
+ ScanContext, ScanRequest,
};
/// MemTable implementation based on skiplist
@@ -238,15 +237,15 @@ impl Inner {
.map(|batch_res| batch_res.map(|batch|
batch.into_arrow_record_batch()))
.collect::<Result<Vec<_>>>()?;
- let time_range = current_mutable.time_range().context(InternalNoCause {
- msg: "failed to get time range from mutable segment",
- })?;
- let max_key = current_mutable.max_key().context(InternalNoCause {
- msg: "failed to get max key from mutable segment",
- })?;
- let min_key = current_mutable.min_key().context(InternalNoCause {
- msg: "failed to get min key from mutable segment",
- })?;
+ let time_range = current_mutable
+ .time_range()
+ .context("failed to get time range from mutable segment")?;
+ let max_key = current_mutable
+ .max_key()
+ .context("failed to get max key from mutable segment")?;
+ let min_key = current_mutable
+ .min_key()
+ .context("failed to get min key from mutable segment")?;
let immutable = ImmutableSegment::new(immutable_batches, time_range,
min_key, max_key);
self.immutable_segments.push(immutable);
@@ -388,10 +387,7 @@ impl MutableSegmentBuilder {
let memtable = self
.memtable_factory
.create_memtable(memtable_opts)
- .box_err()
- .context(Internal {
- msg: "failed to build mutable segment",
- })?;
+ .context("failed to build mutable segment")?;
Ok(MutableSegment(memtable))
}
@@ -409,7 +405,7 @@ struct MutableBuilderOptions {
/// Immutable batch
pub(crate) struct ImmutableSegment {
- /// Record batch converted from `MutableBatch`
+ /// Record batch converted from `MutableBatch`
record_batches: Vec<ArrowRecordBatch>,
/// Min time of source `MutableBatch`
diff --git a/src/analytic_engine/src/memtable/mod.rs
b/src/analytic_engine/src/memtable/mod.rs
index 7df963a2..f53bff14 100644
--- a/src/analytic_engine/src/memtable/mod.rs
+++ b/src/analytic_engine/src/memtable/mod.rs
@@ -18,6 +18,7 @@
//! MemTable
pub mod columnar;
+pub mod error;
pub mod factory;
pub mod key;
pub mod layered;
@@ -27,6 +28,7 @@ pub mod test_util;
use std::{collections::HashMap, ops::Bound, sync::Arc, time::Instant};
+use anyhow::Context;
use bytes_ext::{ByteVec, Bytes};
use common_types::{
projected_schema::RowProjectorBuilder,
@@ -36,12 +38,11 @@ use common_types::{
time::TimeRange,
SequenceNumber, MUTABLE_SEGMENT_SWITCH_THRESHOLD,
};
-use generic_error::{BoxError, GenericError};
+pub use error::Error;
use horaedbproto::manifest;
use macros::define_result;
use serde::{Deserialize, Serialize};
use size_ext::ReadableSize;
-use snafu::{Backtrace, ResultExt, Snafu};
use trace_metric::MetricsCollector;
use crate::memtable::key::KeySequence;
@@ -96,9 +97,9 @@ impl LayeredMemtableOptions {
pub fn parse_from(opts: &HashMap<String, String>) -> Result<Self> {
let mut options = LayeredMemtableOptions::default();
if let Some(v) = opts.get(MUTABLE_SEGMENT_SWITCH_THRESHOLD) {
- let threshold = v.parse::<u64>().box_err().context(Internal {
- msg: format!("invalid mutable segment switch threshold:{v}"),
- })?;
+ let threshold = v
+ .parse::<u64>()
+ .with_context(|| format!("invalid mutable segment switch
threshold:{v}"))?;
options.mutable_segment_switch_threshold = ReadableSize(threshold);
}
@@ -122,95 +123,7 @@ impl From<LayeredMemtableOptions> for
manifest::LayeredMemtableOptions {
}
}
-#[derive(Debug, Snafu)]
-#[snafu(visibility(pub(crate)))]
-pub enum Error {
- #[snafu(display("Failed to encode internal key, err:{}", source))]
- EncodeInternalKey { source: crate::memtable::key::Error },
-
- #[snafu(display("Failed to decode internal key, err:{}", source))]
- DecodeInternalKey { source: crate::memtable::key::Error },
-
- #[snafu(display("Failed to decode row, err:{}", source))]
- DecodeRow { source: codec::row::Error },
-
- #[snafu(display("Failed to append row to batch builder, err:{}", source))]
- AppendRow {
- source: common_types::record_batch::Error,
- },
-
- #[snafu(display("Failed to build record batch, err:{}", source,))]
- BuildRecordBatch {
- source: common_types::record_batch::Error,
- },
-
- #[snafu(display("Failed to decode continuous row, err:{}", source))]
- DecodeContinuousRow {
- source: common_types::row::contiguous::Error,
- },
-
- #[snafu(display("Failed to project memtable schema, err:{}", source))]
- ProjectSchema {
- source: common_types::projected_schema::Error,
- },
-
- #[snafu(display(
- "Invalid sequence number to put, given:{}, last:{}.\nBacktrace:\n{}",
- given,
- last,
- backtrace
- ))]
- InvalidPutSequence {
- given: SequenceNumber,
- last: SequenceNumber,
- backtrace: Backtrace,
- },
-
- #[snafu(display("Invalid row, err:{}", source))]
- InvalidRow { source: GenericError },
-
- #[snafu(display("Fail to iter in reverse order, err:{}", source))]
- IterReverse { source: GenericError },
-
- #[snafu(display(
- "Timeout when iter memtable, now:{:?}, deadline:{:?}.\nBacktrace:\n{}",
- now,
- deadline,
- backtrace
- ))]
- IterTimeout {
- now: Instant,
- deadline: Instant,
- backtrace: Backtrace,
- },
-
- #[snafu(display("msg:{msg}, err:{source}"))]
- Internal { msg: String, source: GenericError },
-
- #[snafu(display("msg:{msg}"))]
- InternalNoCause { msg: String },
-
- #[snafu(display("Timestamp is not found in
row.\nBacktrace:\n{backtrace}"))]
- TimestampNotFound { backtrace: Backtrace },
-
- #[snafu(display(
- "{TOO_LARGE_MESSAGE}, current:{current},
max:{max}.\nBacktrace:\n{backtrace}"
- ))]
- KeyTooLarge {
- current: usize,
- max: usize,
- backtrace: Backtrace,
- },
- #[snafu(display("Factory err, msg:{msg}, err:{source}"))]
- Factory { msg: String, source: GenericError },
-
- #[snafu(display("Factory err, msg:{msg}.\nBacktrace:\n{backtrace}"))]
- FactoryNoCause { msg: String, backtrace: Backtrace },
-}
-
-pub const TOO_LARGE_MESSAGE: &str = "Memtable key length is too large";
-
-define_result!(Error);
+define_result!(error::Error);
/// Options for put and context for tracing
pub struct PutContext {
diff --git a/src/analytic_engine/src/memtable/reversed_iter.rs
b/src/analytic_engine/src/memtable/reversed_iter.rs
index 10d424dd..14b3851f 100644
--- a/src/analytic_engine/src/memtable/reversed_iter.rs
+++ b/src/analytic_engine/src/memtable/reversed_iter.rs
@@ -18,10 +18,8 @@
use std::iter::Rev;
use common_types::record_batch::FetchedRecordBatch;
-use generic_error::BoxError;
-use snafu::ResultExt;
-use crate::memtable::{IterReverse, Result};
+use crate::memtable::{Error, Result};
/// Reversed columnar iterator.
// TODO(xikai): Now the implementation is not perfect: read all the entries
@@ -74,8 +72,7 @@ where
Ok(mut batch_with_key) => {
batch_with_key
.reverse_data()
- .box_err()
- .context(IterReverse)?;
+ .map_err(|e| Error::from(anyhow::Error::new(e)))?;
Ok(batch_with_key)
}
diff --git a/src/analytic_engine/src/memtable/skiplist/iter.rs
b/src/analytic_engine/src/memtable/skiplist/iter.rs
index cce3913d..ab9b59da 100644
--- a/src/analytic_engine/src/memtable/skiplist/iter.rs
+++ b/src/analytic_engine/src/memtable/skiplist/iter.rs
@@ -19,6 +19,7 @@
use std::{cmp::Ordering, ops::Bound, time::Instant};
+use anyhow::Context;
use arena::{Arena, BasicStats};
use bytes_ext::{Bytes, BytesMut};
use codec::row;
@@ -30,14 +31,13 @@ use common_types::{
SequenceNumber,
};
use logger::trace;
+use macros::ensure;
use skiplist::{ArenaSlice, BytewiseComparator, IterRef, Skiplist};
-use snafu::ResultExt;
use crate::memtable::{
key::{self, KeySequence},
skiplist::SkiplistMemTable,
- AppendRow, BuildRecordBatch, DecodeContinuousRow, DecodeInternalKey,
EncodeInternalKey,
- IterTimeout, ProjectSchema, Result, ScanContext, ScanRequest,
+ Result, ScanContext, ScanRequest,
};
/// Iterator state
@@ -91,7 +91,7 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send>
ColumnarIterImpl<A> {
let row_projector = request
.row_projector_builder
.build(&memtable.schema)
- .context(ProjectSchema)?;
+ .context("build projector")?;
let iter = memtable.skiplist.iter();
let mut columnar_iter = Self {
@@ -122,7 +122,7 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send>
ColumnarIterImpl<A> {
// Construct seek key
let mut key_buf = BytesMut::new();
let seek_key = key::internal_key_for_seek(user_key,
self.sequence, &mut key_buf)
- .context(EncodeInternalKey)?;
+ .context("encode internal key")?;
// Seek the skiplist
self.iter.seek(seek_key);
@@ -135,7 +135,7 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send>
ColumnarIterImpl<A> {
let mut key_buf = BytesMut::new();
let seek_key =
key::internal_key_for_seek(&next_user_key, self.sequence,
&mut key_buf)
- .context(EncodeInternalKey)?;
+ .context("encode internal key")?;
// Seek the skiplist
self.iter.seek(seek_key);
@@ -168,14 +168,14 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send>
ColumnarIterImpl<A> {
while self.iter.valid() && num_rows < self.batch_size {
if let Some(row) = self.fetch_next_row()? {
let row_reader = ContiguousRowReader::try_new(&row,
&self.memtable_schema)
- .context(DecodeContinuousRow)?;
+ .context("decode continuous row")?;
let projected_row = ProjectedContiguousRow::new(row_reader,
&self.row_projector);
trace!("Column iterator fetch next row, row:{:?}",
projected_row);
builder
.append_projected_contiguous_row(&projected_row)
- .context(AppendRow)?;
+ .context("append row")?;
num_rows += 1;
} else {
// There is no more row to fetch
@@ -191,12 +191,13 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send>
ColumnarIterImpl<A> {
if num_rows > 0 {
if let Some(deadline) = self.deadline {
let now = Instant::now();
- if now >= deadline {
- return IterTimeout { now, deadline }.fail();
- }
+ ensure!(
+ now < deadline,
+ "iter timeout, now:{now:?}, deadline:{deadline:?}"
+ )
}
- let batch = builder.build().context(BuildRecordBatch)?;
+ let batch = builder.build().context("build record batch")?;
trace!("column iterator send one batch:{:?}", batch);
Ok(Some(batch))
@@ -221,7 +222,7 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send>
ColumnarIterImpl<A> {
// Fetch current entry
let key = self.iter.key();
let (user_key, sequence) =
-
key::user_key_from_internal_key(key).context(DecodeInternalKey)?;
+
key::user_key_from_internal_key(key).context("DecodeInternalKey")?;
// Check user key is still in range
if self.is_after_end_bound(user_key) {
@@ -238,7 +239,7 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send>
ColumnarIterImpl<A> {
// be set as last_internal_key so maybe we can just
// unwrap it?
let (last_user_key, _) =
key::user_key_from_internal_key(last_internal_key)
- .context(DecodeInternalKey)?;
+ .context("DecodeInternalKey")?;
user_key == last_user_key
}
// This is the first user key
diff --git a/src/analytic_engine/src/memtable/skiplist/mod.rs
b/src/analytic_engine/src/memtable/skiplist/mod.rs
index baa4d9c2..ae5b37f8 100644
--- a/src/analytic_engine/src/memtable/skiplist/mod.rs
+++ b/src/analytic_engine/src/memtable/skiplist/mod.rs
@@ -22,6 +22,7 @@ pub mod iter;
use std::sync::atomic::{self, AtomicI64, AtomicU64, AtomicUsize};
+use anyhow::Context;
use arena::{Arena, BasicStats};
use bytes_ext::Bytes;
use codec::Encoder;
@@ -31,17 +32,17 @@ use common_types::{
time::TimeRange,
SequenceNumber,
};
-use generic_error::BoxError;
use logger::{debug, trace};
+use macros::ensure;
use skiplist::{BytewiseComparator, Skiplist};
-use snafu::{ensure, OptionExt, ResultExt};
use crate::memtable::{
+ error::InnerError,
key::{ComparableInternalKey, KeySequence},
reversed_iter::ReversedColumnarIterator,
skiplist::iter::ColumnarIterImpl,
- ColumnarIterPtr, EncodeInternalKey, InvalidPutSequence, InvalidRow,
KeyTooLarge, MemTable,
- Metrics as MemtableMetrics, PutContext, Result, ScanContext, ScanRequest,
TimestampNotFound,
+ ColumnarIterPtr, MemTable, Metrics as MemtableMetrics, PutContext, Result,
ScanContext,
+ ScanRequest,
};
#[derive(Default, Debug)]
@@ -143,27 +144,30 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send +
'static> MemTable
// Encode key
key_encoder
.encode(internal_key, row)
- .context(EncodeInternalKey)?;
+ .context("encode interval key")?;
// TODO: we should check row's primary key size at the beginning of
write
// process, so WAL and memtable can keep in sync.
ensure!(
internal_key.len() <= skiplist::MAX_KEY_SIZE as usize,
- KeyTooLarge {
+ InnerError::KeyTooLarge {
current: internal_key.len(),
- max: skiplist::MAX_KEY_SIZE,
+ max: skiplist::MAX_KEY_SIZE as usize,
}
);
// Encode row value. The ContiguousRowWriter will clear the buf.
let row_value = &mut ctx.value_buf;
let mut row_writer = ContiguousRowWriter::new(row_value, schema,
&ctx.index_in_writer);
- row_writer.write_row(row).box_err().context(InvalidRow)?;
+ row_writer.write_row(row).context("invalid row")?;
let encoded_size = internal_key.len() + row_value.len();
self.skiplist.put(internal_key, row_value);
// Update min/max time
- let timestamp =
row.timestamp(schema).context(TimestampNotFound)?.as_i64();
+ let timestamp = row
+ .timestamp(schema)
+ .context("timestamp not found")?
+ .as_i64();
_ = self
.min_time
.fetch_update(atomic::Ordering::Relaxed,
atomic::Ordering::Relaxed, |v| {
@@ -230,10 +234,7 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send +
'static> MemTable
let last = self.last_sequence();
ensure!(
sequence >= last,
- InvalidPutSequence {
- given: sequence,
- last
- }
+ "invalid sequence, given:{sequence}, last:{last}"
);
self.last_sequence
diff --git a/src/components/macros/src/lib.rs b/src/components/macros/src/lib.rs
index f3e134ae..c627f151 100644
--- a/src/components/macros/src/lib.rs
+++ b/src/components/macros/src/lib.rs
@@ -38,6 +38,28 @@ macro_rules! hash_map(
};
);
+/// Util for working with anyhow + thiserror
+/// Works like anyhow's
[ensure](https://docs.rs/anyhow/latest/anyhow/macro.ensure.html)
+/// But return `Return<T, ErrorFromAnyhow>`
+#[macro_export]
+macro_rules! ensure {
+ ($cond:expr, $msg:literal) => {
+ if !$cond {
+ return Err(anyhow::anyhow!($msg).into());
+ }
+ };
+ ($cond:expr, $err:expr) => {
+ if !$cond {
+ return Err($err.into());
+ }
+ };
+ ($cond:expr, $fmt:expr, $($arg:tt)*) => {
+ if !$cond {
+ return Err(anyhow::anyhow!($fmt, $($arg)*).into());
+ }
+ };
+}
+
#[cfg(test)]
mod tests {
#[test]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]