This is an automated email from the ASF dual-hosted git repository.

chunshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new 431ae516 refactor: first step to refactor error (#1524)
431ae516 is described below

commit 431ae5169f9c97bd5b6f6664be889df47c360a0a
Author: Jiacai Liu <[email protected]>
AuthorDate: Fri Apr 26 17:04:26 2024 +0800

    refactor: first step to refactor error (#1524)
    
    ## Rationale
    Part of #1513
    
    ## Detailed Changes
    Replace snafu-based Error to thiserror-based for memtable module.
    
    ## Test Plan
    CI
    
    ---------
    
    Co-authored-by: chunshao.rcs <[email protected]>
---
 Cargo.lock                                        |   2 +
 Cargo.toml                                        |   1 +
 src/analytic_engine/Cargo.toml                    |   2 +
 src/analytic_engine/src/error.rs                  |  23 +++++
 src/analytic_engine/src/instance/wal_replayer.rs  |  25 +++---
 src/analytic_engine/src/lib.rs                    |   2 +
 src/analytic_engine/src/memtable/columnar/iter.rs |  42 ++++-----
 src/analytic_engine/src/memtable/columnar/mod.rs  |  40 +++------
 src/analytic_engine/src/memtable/error.rs         |  51 +++++++++++
 src/analytic_engine/src/memtable/layered/iter.rs  |  13 ++-
 src/analytic_engine/src/memtable/layered/mod.rs   |  32 +++----
 src/analytic_engine/src/memtable/mod.rs           | 101 ++--------------------
 src/analytic_engine/src/memtable/reversed_iter.rs |   7 +-
 src/analytic_engine/src/memtable/skiplist/iter.rs |  29 ++++---
 src/analytic_engine/src/memtable/skiplist/mod.rs  |  27 +++---
 src/components/macros/src/lib.rs                  |  22 +++++
 16 files changed, 200 insertions(+), 219 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index c8e331b0..8064a066 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -79,6 +79,7 @@ checksum = 
"0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5"
 name = "analytic_engine"
 version = "2.0.0"
 dependencies = [
+ "anyhow",
  "arc-swap 1.6.0",
  "arena",
  "arrow 49.0.0",
@@ -126,6 +127,7 @@ dependencies = [
  "table_kv",
  "tempfile",
  "test_util",
+ "thiserror",
  "time_ext",
  "tokio",
  "trace_metric",
diff --git a/Cargo.toml b/Cargo.toml
index 77277e08..4563acf5 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -97,6 +97,7 @@ async-trait = "0.1.72"
 atomic_enum = "0.2.0"
 base64 = "0.13"
 bytes = "1"
+thiserror = "1"
 bytes_ext = { path = "src/components/bytes_ext" }
 catalog = { path = "src/catalog" }
 catalog_impls = { path = "src/catalog_impls" }
diff --git a/src/analytic_engine/Cargo.toml b/src/analytic_engine/Cargo.toml
index 2773420b..1e02406a 100644
--- a/src/analytic_engine/Cargo.toml
+++ b/src/analytic_engine/Cargo.toml
@@ -38,6 +38,7 @@ wal-rocksdb = ["wal/wal-rocksdb"]
 
 [dependencies]
 # In alphabetical order
+anyhow = { workspace = true }
 arc-swap = "1.4.0"
 arena = { workspace = true }
 arrow = { workspace = true }
@@ -81,6 +82,7 @@ snafu = { workspace = true }
 table_engine = { workspace = true }
 table_kv = { workspace = true }
 tempfile = { workspace = true, optional = true }
+thiserror = { workspace = true }
 time_ext = { workspace = true }
 tokio = { workspace = true }
 trace_metric = { workspace = true }
diff --git a/src/analytic_engine/src/error.rs b/src/analytic_engine/src/error.rs
new file mode 100644
index 00000000..205ef1f0
--- /dev/null
+++ b/src/analytic_engine/src/error.rs
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// Global Error type for analytic engine.
+#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
+pub enum ErrorKind {
+    KeyTooLarge,
+    Internal,
+}
diff --git a/src/analytic_engine/src/instance/wal_replayer.rs 
b/src/analytic_engine/src/instance/wal_replayer.rs
index 6a996fe9..ac1c1a8e 100644
--- a/src/analytic_engine/src/instance/wal_replayer.rs
+++ b/src/analytic_engine/src/instance/wal_replayer.rs
@@ -50,10 +50,11 @@ use crate::{
         engine::{Error, ReplayWalWithCause, Result},
         flush_compaction::{Flusher, TableFlushOptions},
         serial_executor::TableOpSerialExecutor,
-        write::MemTableWriter,
+        write::{Error as WriteError, MemTableWriter},
     },
     payload::{ReadPayload, SingleSchemaProviderAdapter, TableSchemaProvider, 
WalDecoder},
     table::data::TableDataRef,
+    ErrorKind,
 };
 
 // Metrics of wal replayer
@@ -547,22 +548,20 @@ async fn replay_table_log_entries(
                 let index_in_writer =
                     
IndexInWriterSchema::for_same_schema(row_group.schema().num_columns());
                 let memtable_writer = MemTableWriter::new(table_data.clone(), 
serial_exec);
-                let write_res = memtable_writer
-                    .write(sequence, row_group, index_in_writer)
-                    .box_err()
-                    .context(ReplayWalWithCause {
-                        msg: Some(format!(
-                            "table_id:{}, table_name:{}, space_id:{}",
-                            table_data.space_id, table_data.name, table_data.id
-                        )),
-                    });
+                let write_res = memtable_writer.write(sequence, row_group, 
index_in_writer);
                 if let Err(e) = write_res {
-                    // TODO: find a better way to match this.
-                    if 
e.to_string().contains(crate::memtable::TOO_LARGE_MESSAGE) {
+                    if matches!(e, WriteError::UpdateMemTableSequence { ref 
source } if source.kind() == ErrorKind::KeyTooLarge )
+                    {
                         // ignore this error
                         warn!("Unable to insert memtable, err:{e}");
                     } else {
-                        return Err(e);
+                        return Err(Error::ReplayWalWithCause {
+                            msg: Some(format!(
+                                "table_id:{}, table_name:{}, space_id:{}",
+                                table_data.space_id, table_data.name, 
table_data.id
+                            )),
+                            source: Box::new(e),
+                        });
                     }
                 }
 
diff --git a/src/analytic_engine/src/lib.rs b/src/analytic_engine/src/lib.rs
index e363e782..c1308d88 100644
--- a/src/analytic_engine/src/lib.rs
+++ b/src/analytic_engine/src/lib.rs
@@ -22,6 +22,7 @@
 mod compaction;
 mod context;
 mod engine;
+pub mod error;
 mod instance;
 mod manifest;
 pub mod memtable;
@@ -39,6 +40,7 @@ pub mod table_meta_set_impl;
 #[cfg(any(test, feature = "test"))]
 pub mod tests;
 
+use error::ErrorKind;
 use manifest::details::Options as ManifestOptions;
 use object_store::config::StorageOptions;
 use serde::{Deserialize, Serialize};
diff --git a/src/analytic_engine/src/memtable/columnar/iter.rs 
b/src/analytic_engine/src/memtable/columnar/iter.rs
index 7abc24d2..5400fbd9 100644
--- a/src/analytic_engine/src/memtable/columnar/iter.rs
+++ b/src/analytic_engine/src/memtable/columnar/iter.rs
@@ -23,6 +23,7 @@ use std::{
     time::Instant,
 };
 
+use anyhow::Context;
 use arena::{Arena, BasicStats, MonoIncArena};
 use bytes_ext::{ByteVec, Bytes};
 use codec::{memcomparable::MemComparable, row, Encoder};
@@ -36,17 +37,14 @@ use common_types::{
     schema::Schema,
     SequenceNumber,
 };
-use generic_error::BoxError;
 use logger::trace;
+use macros::ensure;
 use parquet::data_type::AsBytes;
 use skiplist::{ArenaSlice, BytewiseComparator, IterRef, Skiplist};
-use snafu::{OptionExt, ResultExt};
 
 use crate::memtable::{
-    key,
-    key::{KeySequence, SequenceCodec},
-    AppendRow, BuildRecordBatch, DecodeInternalKey, Internal, InternalNoCause, 
IterTimeout,
-    ProjectSchema, Result, ScanContext, ScanRequest,
+    key::{self, KeySequence, SequenceCodec},
+    Result, ScanContext, ScanRequest,
 };
 
 /// Iterator state
@@ -106,7 +104,7 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send> 
ColumnarIterImpl<A> {
         let row_projector = request
             .row_projector_builder
             .build(&schema)
-            .context(ProjectSchema)?;
+            .context("ProjectSchema")?;
         let mut columnar_iter = Self {
             memtable,
             row_num,
@@ -147,15 +145,10 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send> 
ColumnarIterImpl<A> {
                 let column_schema = self.memtable_schema.column(*idx);
                 let column = memtable
                     .get(&column_schema.id)
-                    .with_context(|| InternalNoCause {
-                        msg: format!("column not found, column:{}", 
column_schema.name),
-                    })?;
+                    .with_context(|| format!("column not found, column:{}", 
column_schema.name))?;
                 for (i, key) in 
key_vec.iter_mut().enumerate().take(self.row_num) {
                     let datum = column.get_datum(i);
-                    encoder
-                        .encode(key, &datum)
-                        .box_err()
-                        .context(Internal { msg: "encode key" })?;
+                    encoder.encode(key, &datum).context("encode key")?;
                 }
             }
 
@@ -163,10 +156,7 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send> 
ColumnarIterImpl<A> {
             for (i, mut key) in key_vec.into_iter().enumerate() {
                 SequenceCodec
                     .encode(&mut key, &KeySequence::new(self.last_sequence, i 
as u32))
-                    .box_err()
-                    .context(Internal {
-                        msg: "encode key sequence",
-                    })?;
+                    .context("encode key sequence")?;
                 self.skiplist.put(&key, (i as u32).to_le_bytes().as_slice());
             }
 
@@ -203,9 +193,10 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send> 
ColumnarIterImpl<A> {
         if !rows.is_empty() {
             if let Some(deadline) = self.deadline {
                 let now = Instant::now();
-                if now >= deadline {
-                    return IterTimeout { now, deadline }.fail();
-                }
+                ensure!(
+                    now < deadline,
+                    "iter timeout, now:{now:?}, deadline:{deadline:?}"
+                );
             }
 
             let fetched_schema = self.row_projector.fetched_schema().clone();
@@ -219,10 +210,10 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send> 
ColumnarIterImpl<A> {
                 self.batch_size,
             );
             for row in rows.into_iter() {
-                builder.append_row(row).context(AppendRow)?;
+                builder.append_row(row).context("AppendRow")?;
             }
 
-            let batch = builder.build().context(BuildRecordBatch)?;
+            let batch = builder.build().context("BuildRecordBatch")?;
             trace!("column iterator send one batch:{:?}", batch);
             Ok(Some(batch))
         } else {
@@ -245,7 +236,8 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send> 
ColumnarIterImpl<A> {
         while self.iter.valid() {
             // Fetch current entry
             let key = self.iter.key();
-            let (user_key, _) = 
key::user_key_from_internal_key(key).context(DecodeInternalKey)?;
+            let (user_key, _) =
+                
key::user_key_from_internal_key(key).context("DecodeInternalKey")?;
 
             // Check user key is still in range
             if self.is_after_end_bound(user_key) {
@@ -262,7 +254,7 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send> 
ColumnarIterImpl<A> {
                         // be set as last_internal_key so maybe we can just
                         // unwrap it?
                         let (last_user_key, _) = 
key::user_key_from_internal_key(last_internal_key)
-                            .context(DecodeInternalKey)?;
+                            .context("DecodeInternalKey")?;
                         user_key == last_user_key
                     }
                     // This is the first user key
diff --git a/src/analytic_engine/src/memtable/columnar/mod.rs 
b/src/analytic_engine/src/memtable/columnar/mod.rs
index 6cbe783f..33bd86bd 100644
--- a/src/analytic_engine/src/memtable/columnar/mod.rs
+++ b/src/analytic_engine/src/memtable/columnar/mod.rs
@@ -23,22 +23,21 @@ use std::{
     },
 };
 
+use anyhow::Context;
 use arena::MonoIncArena;
 use bytes_ext::Bytes;
 use common_types::{
     column::Column, column_schema::ColumnId, datum::Datum, row::Row, 
schema::Schema,
     time::TimeRange, SequenceNumber,
 };
-use generic_error::BoxError;
 use logger::debug;
+use macros::ensure;
 use skiplist::{BytewiseComparator, Skiplist};
-use snafu::{ensure, OptionExt, ResultExt};
 
 use crate::memtable::{
     columnar::iter::ColumnarIterImpl, factory::Options, key::KeySequence,
-    reversed_iter::ReversedColumnarIterator, ColumnarIterPtr, Internal, 
InternalNoCause,
-    InvalidPutSequence, MemTable, Metrics as MemtableMetrics, PutContext, 
Result, ScanContext,
-    ScanRequest,
+    reversed_iter::ReversedColumnarIterator, ColumnarIterPtr, MemTable, 
Metrics as MemtableMetrics,
+    PutContext, Result, ScanContext, ScanRequest,
 };
 
 pub mod factory;
@@ -108,16 +107,11 @@ impl MemTable for ColumnarMemTable {
             } else {
                 // TODO: impl append() one row in column, avoid memory 
expansion.
                 let column = Column::with_capacity(1, column_schema.data_type)
-                    .box_err()
-                    .context(Internal {
-                        msg: "new column failed",
-                    })?;
+                    .context("new column failed")?;
                 columns.insert(column_schema.id, column);
                 columns
                     .get_mut(&column_schema.id)
-                    .context(InternalNoCause {
-                        msg: "get column failed",
-                    })?
+                    .context("get column failed")?
             };
 
             if let Some(writer_index) = 
ctx.index_in_writer.column_index_in_writer(i) {
@@ -127,10 +121,7 @@ impl MemTable for ColumnarMemTable {
                 } else {
                     column
                         .append_datum_ref(&row[writer_index])
-                        .box_err()
-                        .context(Internal {
-                            msg: "append datum failed",
-                        })?
+                        .context("append datum failed")?
                 }
             } else {
                 column.append_nulls(1);
@@ -140,9 +131,7 @@ impl MemTable for ColumnarMemTable {
             let mut memtable = self.memtable.write().unwrap();
             for (k, v) in columns {
                 if let Some(column) = memtable.get_mut(&k) {
-                    column.append_column(v).box_err().context(Internal {
-                        msg: "append column",
-                    })?;
+                    column.append_column(v).context("append column")?;
                 } else {
                     memtable.insert(k, v);
                 };
@@ -174,18 +163,14 @@ impl MemTable for ColumnarMemTable {
             .schema
             .columns()
             .get(self.schema.timestamp_index())
-            .context(InternalNoCause {
-                msg: "timestamp column is missing",
-            })?;
+            .context("timestamp column is missing")?;
 
         let num_rows = self
             .memtable
             .read()
             .unwrap()
             .get(&timestamp_column.id)
-            .context(InternalNoCause {
-                msg: "get timestamp column failed",
-            })?
+            .context("get timestamp column failed")?
             .len();
         let (reverse, batch_size) = (request.reverse, ctx.batch_size);
         let arena = MonoIncArena::with_collector(
@@ -219,10 +204,7 @@ impl MemTable for ColumnarMemTable {
         let last = self.last_sequence();
         ensure!(
             sequence >= last,
-            InvalidPutSequence {
-                given: sequence,
-                last
-            }
+            "invalid sequence, given:{sequence}, last:{last}"
         );
 
         self.last_sequence.store(sequence, Ordering::Relaxed);
diff --git a/src/analytic_engine/src/memtable/error.rs 
b/src/analytic_engine/src/memtable/error.rs
new file mode 100644
index 00000000..4389b0e4
--- /dev/null
+++ b/src/analytic_engine/src/memtable/error.rs
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use thiserror::Error;
+
+use crate::ErrorKind;
+
+#[derive(Debug, Error)]
+#[error(transparent)]
+pub struct Error(#[from] InnerError);
+
+impl From<anyhow::Error> for Error {
+    fn from(source: anyhow::Error) -> Self {
+        Self(InnerError::Other { source })
+    }
+}
+
+impl Error {
+    pub fn kind(&self) -> ErrorKind {
+        match self.0 {
+            InnerError::KeyTooLarge { .. } => ErrorKind::KeyTooLarge,
+            InnerError::Other { .. } => ErrorKind::Internal,
+        }
+    }
+}
+
+#[derive(Error, Debug)]
+pub(crate) enum InnerError {
+    #[error("too large key, max:{max}, current:{current}")]
+    KeyTooLarge { current: usize, max: usize },
+
+    #[error(transparent)]
+    Other {
+        #[from]
+        source: anyhow::Error,
+    },
+}
diff --git a/src/analytic_engine/src/memtable/layered/iter.rs 
b/src/analytic_engine/src/memtable/layered/iter.rs
index be32b77e..7b051966 100644
--- a/src/analytic_engine/src/memtable/layered/iter.rs
+++ b/src/analytic_engine/src/memtable/layered/iter.rs
@@ -17,13 +17,12 @@
 
 //! Skiplist memtable iterator
 
+use anyhow::Context;
 use common_types::{record_batch::FetchedRecordBatch, schema::Schema, 
time::TimeRange};
-use generic_error::BoxError;
-use snafu::ResultExt;
 
 use crate::memtable::{
     layered::{ImmutableSegment, MutableSegment},
-    ColumnarIterPtr, Internal, ProjectSchema, Result, ScanContext, ScanRequest,
+    ColumnarIterPtr, Error, Result, ScanContext, ScanRequest,
 };
 
 /// Columnar iterator for [LayeredMemTable]
@@ -43,7 +42,7 @@ impl ColumnarIterImpl {
         let row_projector = request
             .row_projector_builder
             .build(memtable_schema)
-            .context(ProjectSchema)?;
+            .context("build row projector")?;
 
         let (maybe_mutable, selected_immutables) =
             Self::filter_by_time_range(mutable, immutables, 
request.time_range);
@@ -63,13 +62,11 @@ impl ColumnarIterImpl {
                         fetched_column_indexes,
                         batch.clone(),
                     )
-                    .box_err()
-                    .with_context(|| Internal {
-                        msg: format!("row_projector:{row_projector:?}",),
-                    })
+                    .map_err(|e| Error::from(anyhow::Error::new(e)))
                 })
             })
             .collect::<Vec<_>>();
+
         let immutable_iter = immutable_batches.into_iter();
 
         let maybe_mutable_iter = match maybe_mutable {
diff --git a/src/analytic_engine/src/memtable/layered/mod.rs 
b/src/analytic_engine/src/memtable/layered/mod.rs
index 35d0b4a1..72e992bc 100644
--- a/src/analytic_engine/src/memtable/layered/mod.rs
+++ b/src/analytic_engine/src/memtable/layered/mod.rs
@@ -29,6 +29,7 @@ use std::{
     },
 };
 
+use anyhow::Context;
 use arena::CollectorRef;
 use arrow::record_batch::RecordBatch as ArrowRecordBatch;
 use bytes_ext::Bytes;
@@ -36,17 +37,15 @@ use common_types::{
     projected_schema::RowProjectorBuilder, row::Row, schema::Schema, 
time::TimeRange,
     SequenceNumber,
 };
-use generic_error::BoxError;
 use logger::debug;
 use skiplist::{BytewiseComparator, KeyComparator};
-use snafu::{OptionExt, ResultExt};
 
 use crate::memtable::{
     factory::{FactoryRef, Options},
     key::KeySequence,
     layered::iter::ColumnarIterImpl,
-    ColumnarIterPtr, Internal, InternalNoCause, MemTable, MemTableRef, Metrics 
as MemtableMetrics,
-    PutContext, Result, ScanContext, ScanRequest,
+    ColumnarIterPtr, MemTable, MemTableRef, Metrics as MemtableMetrics, 
PutContext, Result,
+    ScanContext, ScanRequest,
 };
 
 /// MemTable implementation based on skiplist
@@ -238,15 +237,15 @@ impl Inner {
             .map(|batch_res| batch_res.map(|batch| 
batch.into_arrow_record_batch()))
             .collect::<Result<Vec<_>>>()?;
 
-        let time_range = current_mutable.time_range().context(InternalNoCause {
-            msg: "failed to get time range from mutable segment",
-        })?;
-        let max_key = current_mutable.max_key().context(InternalNoCause {
-            msg: "failed to get max key from mutable segment",
-        })?;
-        let min_key = current_mutable.min_key().context(InternalNoCause {
-            msg: "failed to get min key from mutable segment",
-        })?;
+        let time_range = current_mutable
+            .time_range()
+            .context("failed to get time range from mutable segment")?;
+        let max_key = current_mutable
+            .max_key()
+            .context("failed to get max key from mutable segment")?;
+        let min_key = current_mutable
+            .min_key()
+            .context("failed to get min key from mutable segment")?;
         let immutable = ImmutableSegment::new(immutable_batches, time_range, 
min_key, max_key);
 
         self.immutable_segments.push(immutable);
@@ -388,10 +387,7 @@ impl MutableSegmentBuilder {
         let memtable = self
             .memtable_factory
             .create_memtable(memtable_opts)
-            .box_err()
-            .context(Internal {
-                msg: "failed to build mutable segment",
-            })?;
+            .context("failed to build mutable segment")?;
 
         Ok(MutableSegment(memtable))
     }
@@ -409,7 +405,7 @@ struct MutableBuilderOptions {
 
 /// Immutable batch
 pub(crate) struct ImmutableSegment {
-    /// Record batch converted from `MutableBatch`    
+    /// Record batch converted from `MutableBatch`
     record_batches: Vec<ArrowRecordBatch>,
 
     /// Min time of source `MutableBatch`
diff --git a/src/analytic_engine/src/memtable/mod.rs 
b/src/analytic_engine/src/memtable/mod.rs
index 7df963a2..f53bff14 100644
--- a/src/analytic_engine/src/memtable/mod.rs
+++ b/src/analytic_engine/src/memtable/mod.rs
@@ -18,6 +18,7 @@
 //! MemTable
 
 pub mod columnar;
+pub mod error;
 pub mod factory;
 pub mod key;
 pub mod layered;
@@ -27,6 +28,7 @@ pub mod test_util;
 
 use std::{collections::HashMap, ops::Bound, sync::Arc, time::Instant};
 
+use anyhow::Context;
 use bytes_ext::{ByteVec, Bytes};
 use common_types::{
     projected_schema::RowProjectorBuilder,
@@ -36,12 +38,11 @@ use common_types::{
     time::TimeRange,
     SequenceNumber, MUTABLE_SEGMENT_SWITCH_THRESHOLD,
 };
-use generic_error::{BoxError, GenericError};
+pub use error::Error;
 use horaedbproto::manifest;
 use macros::define_result;
 use serde::{Deserialize, Serialize};
 use size_ext::ReadableSize;
-use snafu::{Backtrace, ResultExt, Snafu};
 use trace_metric::MetricsCollector;
 
 use crate::memtable::key::KeySequence;
@@ -96,9 +97,9 @@ impl LayeredMemtableOptions {
     pub fn parse_from(opts: &HashMap<String, String>) -> Result<Self> {
         let mut options = LayeredMemtableOptions::default();
         if let Some(v) = opts.get(MUTABLE_SEGMENT_SWITCH_THRESHOLD) {
-            let threshold = v.parse::<u64>().box_err().context(Internal {
-                msg: format!("invalid mutable segment switch threshold:{v}"),
-            })?;
+            let threshold = v
+                .parse::<u64>()
+                .with_context(|| format!("invalid mutable segment switch 
threshold:{v}"))?;
             options.mutable_segment_switch_threshold = ReadableSize(threshold);
         }
 
@@ -122,95 +123,7 @@ impl From<LayeredMemtableOptions> for 
manifest::LayeredMemtableOptions {
     }
 }
 
-#[derive(Debug, Snafu)]
-#[snafu(visibility(pub(crate)))]
-pub enum Error {
-    #[snafu(display("Failed to encode internal key, err:{}", source))]
-    EncodeInternalKey { source: crate::memtable::key::Error },
-
-    #[snafu(display("Failed to decode internal key, err:{}", source))]
-    DecodeInternalKey { source: crate::memtable::key::Error },
-
-    #[snafu(display("Failed to decode row, err:{}", source))]
-    DecodeRow { source: codec::row::Error },
-
-    #[snafu(display("Failed to append row to batch builder, err:{}", source))]
-    AppendRow {
-        source: common_types::record_batch::Error,
-    },
-
-    #[snafu(display("Failed to build record batch, err:{}", source,))]
-    BuildRecordBatch {
-        source: common_types::record_batch::Error,
-    },
-
-    #[snafu(display("Failed to decode continuous row, err:{}", source))]
-    DecodeContinuousRow {
-        source: common_types::row::contiguous::Error,
-    },
-
-    #[snafu(display("Failed to project memtable schema, err:{}", source))]
-    ProjectSchema {
-        source: common_types::projected_schema::Error,
-    },
-
-    #[snafu(display(
-        "Invalid sequence number to put, given:{}, last:{}.\nBacktrace:\n{}",
-        given,
-        last,
-        backtrace
-    ))]
-    InvalidPutSequence {
-        given: SequenceNumber,
-        last: SequenceNumber,
-        backtrace: Backtrace,
-    },
-
-    #[snafu(display("Invalid row, err:{}", source))]
-    InvalidRow { source: GenericError },
-
-    #[snafu(display("Fail to iter in reverse order, err:{}", source))]
-    IterReverse { source: GenericError },
-
-    #[snafu(display(
-        "Timeout when iter memtable, now:{:?}, deadline:{:?}.\nBacktrace:\n{}",
-        now,
-        deadline,
-        backtrace
-    ))]
-    IterTimeout {
-        now: Instant,
-        deadline: Instant,
-        backtrace: Backtrace,
-    },
-
-    #[snafu(display("msg:{msg}, err:{source}"))]
-    Internal { msg: String, source: GenericError },
-
-    #[snafu(display("msg:{msg}"))]
-    InternalNoCause { msg: String },
-
-    #[snafu(display("Timestamp is not found in 
row.\nBacktrace:\n{backtrace}"))]
-    TimestampNotFound { backtrace: Backtrace },
-
-    #[snafu(display(
-        "{TOO_LARGE_MESSAGE}, current:{current}, 
max:{max}.\nBacktrace:\n{backtrace}"
-    ))]
-    KeyTooLarge {
-        current: usize,
-        max: usize,
-        backtrace: Backtrace,
-    },
-    #[snafu(display("Factory err, msg:{msg}, err:{source}"))]
-    Factory { msg: String, source: GenericError },
-
-    #[snafu(display("Factory err, msg:{msg}.\nBacktrace:\n{backtrace}"))]
-    FactoryNoCause { msg: String, backtrace: Backtrace },
-}
-
-pub const TOO_LARGE_MESSAGE: &str = "Memtable key length is too large";
-
-define_result!(Error);
+define_result!(error::Error);
 
 /// Options for put and context for tracing
 pub struct PutContext {
diff --git a/src/analytic_engine/src/memtable/reversed_iter.rs 
b/src/analytic_engine/src/memtable/reversed_iter.rs
index 10d424dd..14b3851f 100644
--- a/src/analytic_engine/src/memtable/reversed_iter.rs
+++ b/src/analytic_engine/src/memtable/reversed_iter.rs
@@ -18,10 +18,8 @@
 use std::iter::Rev;
 
 use common_types::record_batch::FetchedRecordBatch;
-use generic_error::BoxError;
-use snafu::ResultExt;
 
-use crate::memtable::{IterReverse, Result};
+use crate::memtable::{Error, Result};
 
 /// Reversed columnar iterator.
 // TODO(xikai): Now the implementation is not perfect: read all the entries
@@ -74,8 +72,7 @@ where
                 Ok(mut batch_with_key) => {
                     batch_with_key
                         .reverse_data()
-                        .box_err()
-                        .context(IterReverse)?;
+                        .map_err(|e| Error::from(anyhow::Error::new(e)))?;
 
                     Ok(batch_with_key)
                 }
diff --git a/src/analytic_engine/src/memtable/skiplist/iter.rs 
b/src/analytic_engine/src/memtable/skiplist/iter.rs
index cce3913d..ab9b59da 100644
--- a/src/analytic_engine/src/memtable/skiplist/iter.rs
+++ b/src/analytic_engine/src/memtable/skiplist/iter.rs
@@ -19,6 +19,7 @@
 
 use std::{cmp::Ordering, ops::Bound, time::Instant};
 
+use anyhow::Context;
 use arena::{Arena, BasicStats};
 use bytes_ext::{Bytes, BytesMut};
 use codec::row;
@@ -30,14 +31,13 @@ use common_types::{
     SequenceNumber,
 };
 use logger::trace;
+use macros::ensure;
 use skiplist::{ArenaSlice, BytewiseComparator, IterRef, Skiplist};
-use snafu::ResultExt;
 
 use crate::memtable::{
     key::{self, KeySequence},
     skiplist::SkiplistMemTable,
-    AppendRow, BuildRecordBatch, DecodeContinuousRow, DecodeInternalKey, 
EncodeInternalKey,
-    IterTimeout, ProjectSchema, Result, ScanContext, ScanRequest,
+    Result, ScanContext, ScanRequest,
 };
 
 /// Iterator state
@@ -91,7 +91,7 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send> 
ColumnarIterImpl<A> {
         let row_projector = request
             .row_projector_builder
             .build(&memtable.schema)
-            .context(ProjectSchema)?;
+            .context("build projector")?;
 
         let iter = memtable.skiplist.iter();
         let mut columnar_iter = Self {
@@ -122,7 +122,7 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send> 
ColumnarIterImpl<A> {
                 // Construct seek key
                 let mut key_buf = BytesMut::new();
                 let seek_key = key::internal_key_for_seek(user_key, 
self.sequence, &mut key_buf)
-                    .context(EncodeInternalKey)?;
+                    .context("encode internal key")?;
 
                 // Seek the skiplist
                 self.iter.seek(seek_key);
@@ -135,7 +135,7 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send> 
ColumnarIterImpl<A> {
                 let mut key_buf = BytesMut::new();
                 let seek_key =
                     key::internal_key_for_seek(&next_user_key, self.sequence, 
&mut key_buf)
-                        .context(EncodeInternalKey)?;
+                        .context("encode internal key")?;
 
                 // Seek the skiplist
                 self.iter.seek(seek_key);
@@ -168,14 +168,14 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send> 
ColumnarIterImpl<A> {
         while self.iter.valid() && num_rows < self.batch_size {
             if let Some(row) = self.fetch_next_row()? {
                 let row_reader = ContiguousRowReader::try_new(&row, 
&self.memtable_schema)
-                    .context(DecodeContinuousRow)?;
+                    .context("decode continuous row")?;
                 let projected_row = ProjectedContiguousRow::new(row_reader, 
&self.row_projector);
 
                 trace!("Column iterator fetch next row, row:{:?}", 
projected_row);
 
                 builder
                     .append_projected_contiguous_row(&projected_row)
-                    .context(AppendRow)?;
+                    .context("append row")?;
                 num_rows += 1;
             } else {
                 // There is no more row to fetch
@@ -191,12 +191,13 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send> 
ColumnarIterImpl<A> {
         if num_rows > 0 {
             if let Some(deadline) = self.deadline {
                 let now = Instant::now();
-                if now >= deadline {
-                    return IterTimeout { now, deadline }.fail();
-                }
+                ensure!(
+                    now < deadline,
+                    "iter timeout, now:{now:?}, deadline:{deadline:?}"
+                )
             }
 
-            let batch = builder.build().context(BuildRecordBatch)?;
+            let batch = builder.build().context("build record batch")?;
             trace!("column iterator send one batch:{:?}", batch);
 
             Ok(Some(batch))
@@ -221,7 +222,7 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send> 
ColumnarIterImpl<A> {
             // Fetch current entry
             let key = self.iter.key();
             let (user_key, sequence) =
-                
key::user_key_from_internal_key(key).context(DecodeInternalKey)?;
+                
key::user_key_from_internal_key(key).context("DecodeInternalKey")?;
 
             // Check user key is still in range
             if self.is_after_end_bound(user_key) {
@@ -238,7 +239,7 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send> 
ColumnarIterImpl<A> {
                         // be set as last_internal_key so maybe we can just
                         // unwrap it?
                         let (last_user_key, _) = 
key::user_key_from_internal_key(last_internal_key)
-                            .context(DecodeInternalKey)?;
+                            .context("DecodeInternalKey")?;
                         user_key == last_user_key
                     }
                     // This is the first user key
diff --git a/src/analytic_engine/src/memtable/skiplist/mod.rs 
b/src/analytic_engine/src/memtable/skiplist/mod.rs
index baa4d9c2..ae5b37f8 100644
--- a/src/analytic_engine/src/memtable/skiplist/mod.rs
+++ b/src/analytic_engine/src/memtable/skiplist/mod.rs
@@ -22,6 +22,7 @@ pub mod iter;
 
 use std::sync::atomic::{self, AtomicI64, AtomicU64, AtomicUsize};
 
+use anyhow::Context;
 use arena::{Arena, BasicStats};
 use bytes_ext::Bytes;
 use codec::Encoder;
@@ -31,17 +32,17 @@ use common_types::{
     time::TimeRange,
     SequenceNumber,
 };
-use generic_error::BoxError;
 use logger::{debug, trace};
+use macros::ensure;
 use skiplist::{BytewiseComparator, Skiplist};
-use snafu::{ensure, OptionExt, ResultExt};
 
 use crate::memtable::{
+    error::InnerError,
     key::{ComparableInternalKey, KeySequence},
     reversed_iter::ReversedColumnarIterator,
     skiplist::iter::ColumnarIterImpl,
-    ColumnarIterPtr, EncodeInternalKey, InvalidPutSequence, InvalidRow, 
KeyTooLarge, MemTable,
-    Metrics as MemtableMetrics, PutContext, Result, ScanContext, ScanRequest, 
TimestampNotFound,
+    ColumnarIterPtr, MemTable, Metrics as MemtableMetrics, PutContext, Result, 
ScanContext,
+    ScanRequest,
 };
 
 #[derive(Default, Debug)]
@@ -143,27 +144,30 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send + 
'static> MemTable
         // Encode key
         key_encoder
             .encode(internal_key, row)
-            .context(EncodeInternalKey)?;
+            .context("encode interval key")?;
 
         // TODO: we should check row's primary key size at the beginning of 
write
         // process, so WAL and memtable can keep in sync.
         ensure!(
             internal_key.len() <= skiplist::MAX_KEY_SIZE as usize,
-            KeyTooLarge {
+            InnerError::KeyTooLarge {
                 current: internal_key.len(),
-                max: skiplist::MAX_KEY_SIZE,
+                max: skiplist::MAX_KEY_SIZE as usize,
             }
         );
 
         // Encode row value. The ContiguousRowWriter will clear the buf.
         let row_value = &mut ctx.value_buf;
         let mut row_writer = ContiguousRowWriter::new(row_value, schema, 
&ctx.index_in_writer);
-        row_writer.write_row(row).box_err().context(InvalidRow)?;
+        row_writer.write_row(row).context("invalid row")?;
         let encoded_size = internal_key.len() + row_value.len();
         self.skiplist.put(internal_key, row_value);
 
         // Update min/max time
-        let timestamp = 
row.timestamp(schema).context(TimestampNotFound)?.as_i64();
+        let timestamp = row
+            .timestamp(schema)
+            .context("timestamp not found")?
+            .as_i64();
         _ = self
             .min_time
             .fetch_update(atomic::Ordering::Relaxed, 
atomic::Ordering::Relaxed, |v| {
@@ -230,10 +234,7 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send + 
'static> MemTable
         let last = self.last_sequence();
         ensure!(
             sequence >= last,
-            InvalidPutSequence {
-                given: sequence,
-                last
-            }
+            "invalid sequence, given:{sequence}, last:{last}"
         );
 
         self.last_sequence
diff --git a/src/components/macros/src/lib.rs b/src/components/macros/src/lib.rs
index f3e134ae..c627f151 100644
--- a/src/components/macros/src/lib.rs
+++ b/src/components/macros/src/lib.rs
@@ -38,6 +38,28 @@ macro_rules! hash_map(
      };
 );
 
+/// Util for working with anyhow + thiserror
+/// Works like anyhow's 
[ensure](https://docs.rs/anyhow/latest/anyhow/macro.ensure.html)
+/// But return `Return<T, ErrorFromAnyhow>`
+#[macro_export]
+macro_rules! ensure {
+    ($cond:expr, $msg:literal) => {
+        if !$cond {
+            return Err(anyhow::anyhow!($msg).into());
+        }
+    };
+    ($cond:expr, $err:expr) => {
+        if !$cond {
+            return Err($err.into());
+        }
+    };
+    ($cond:expr, $fmt:expr, $($arg:tt)*) => {
+        if !$cond {
+            return Err(anyhow::anyhow!($fmt, $($arg)*).into());
+        }
+    };
+}
+
 #[cfg(test)]
 mod tests {
     #[test]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to