This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new 69488fad refactor: manifest error code (#1546)
69488fad is described below
commit 69488fad550f313fe5ccad455f9235aeaf473b9c
Author: MianChen <[email protected]>
AuthorDate: Thu Aug 1 09:20:52 2024 -0500
refactor: manifest error code (#1546)
---
src/analytic_engine/src/instance/engine.rs | 4 +-
src/analytic_engine/src/manifest/details.rs | 130 ++++------------------
src/analytic_engine/src/manifest/error.rs | 47 ++++++++
src/analytic_engine/src/manifest/meta_edit.rs | 88 ++++-----------
src/analytic_engine/src/manifest/meta_snapshot.rs | 28 ++---
src/analytic_engine/src/manifest/mod.rs | 5 +
src/analytic_engine/src/memtable/key.rs | 51 +++------
src/analytic_engine/src/table_meta_set_impl.rs | 89 ++++++---------
8 files changed, 152 insertions(+), 290 deletions(-)
diff --git a/src/analytic_engine/src/instance/engine.rs
b/src/analytic_engine/src/instance/engine.rs
index 1cff5c22..abea72a8 100644
--- a/src/analytic_engine/src/instance/engine.rs
+++ b/src/analytic_engine/src/instance/engine.rs
@@ -218,9 +218,7 @@ pub enum Error {
CreateOpenFailedTable { table: String, backtrace: Backtrace },
#[snafu(display("Failed to open manifest, err:{}", source))]
- OpenManifest {
- source: crate::manifest::details::Error,
- },
+ OpenManifest { source: crate::manifest::Error },
#[snafu(display("Failed to find table, msg:{}.\nBacktrace:\n{}", msg,
backtrace))]
TableNotExist { msg: String, backtrace: Backtrace },
diff --git a/src/analytic_engine/src/manifest/details.rs
b/src/analytic_engine/src/manifest/details.rs
index f9218c56..7df80a4f 100644
--- a/src/analytic_engine/src/manifest/details.rs
+++ b/src/analytic_engine/src/manifest/details.rs
@@ -28,17 +28,15 @@ use std::{
};
use async_trait::async_trait;
-use generic_error::{BoxError, GenericError, GenericResult};
+use generic_error::{BoxError, GenericResult};
use horaedbproto::manifest as manifest_pb;
use lazy_static::lazy_static;
use logger::{debug, info, warn};
-use macros::define_result;
use object_store::{ObjectStoreRef, Path};
use parquet::data_type::AsBytes;
use prometheus::{exponential_buckets, register_histogram, Histogram};
use prost::Message;
use serde::{Deserialize, Serialize};
-use snafu::{Backtrace, ResultExt, Snafu};
use table_engine::table::TableId;
use time_ext::ReadableDuration;
use tokio::sync::Mutex;
@@ -57,104 +55,12 @@ use crate::{
MetaEdit, MetaEditRequest, MetaUpdate, MetaUpdateDecoder,
MetaUpdatePayload, Snapshot,
},
meta_snapshot::{MetaSnapshot, MetaSnapshotBuilder},
- LoadRequest, Manifest, SnapshotRequest,
+ Error, LoadRequest, Manifest, Result, SnapshotRequest,
},
space::SpaceId,
table::data::{TableDataRef, TableShardInfo},
};
-#[derive(Debug, Snafu)]
-#[snafu(visibility(pub(crate)))]
-pub enum Error {
- #[snafu(display(
- "Failed to encode payloads, wal_location:{:?}, err:{}",
- wal_location,
- source
- ))]
- EncodePayloads {
- wal_location: WalLocation,
- source: wal::manager::Error,
- },
-
- #[snafu(display("Failed to write update to wal, err:{}", source))]
- WriteWal { source: wal::manager::Error },
-
- #[snafu(display("Failed to read wal, err:{}", source))]
- ReadWal { source: wal::manager::Error },
-
- #[snafu(display("Failed to read log entry, err:{}", source))]
- ReadEntry { source: wal::manager::Error },
-
- #[snafu(display("Failed to apply table meta update, err:{}", source))]
- ApplyUpdate {
- source: crate::manifest::meta_snapshot::Error,
- },
-
- #[snafu(display("Failed to clean wal, err:{}", source))]
- CleanWal { source: wal::manager::Error },
-
- #[snafu(display(
- "Failed to store snapshot, err:{}.\nBacktrace:\n{:?}",
- source,
- backtrace
- ))]
- StoreSnapshot {
- source: object_store::ObjectStoreError,
- backtrace: Backtrace,
- },
-
- #[snafu(display(
- "Failed to fetch snapshot, err:{}.\nBacktrace:\n{:?}",
- source,
- backtrace
- ))]
- FetchSnapshot {
- source: object_store::ObjectStoreError,
- backtrace: Backtrace,
- },
-
- #[snafu(display(
- "Failed to decode snapshot, err:{}.\nBacktrace:\n{:?}",
- source,
- backtrace
- ))]
- DecodeSnapshot {
- source: prost::DecodeError,
- backtrace: Backtrace,
- },
-
- #[snafu(display("Failed to build snapshot, msg:{}.\nBacktrace:\n{:?}",
msg, backtrace))]
- BuildSnapshotNoCause { msg: String, backtrace: Backtrace },
-
- #[snafu(display("Failed to build snapshot, msg:{}, err:{}", msg, source))]
- BuildSnapshotWithCause { msg: String, source: GenericError },
-
- #[snafu(display(
- "Failed to apply edit to table, msg:{}.\nBacktrace:\n{:?}",
- msg,
- backtrace
- ))]
- ApplyUpdateToTableNoCause { msg: String, backtrace: Backtrace },
-
- #[snafu(display("Failed to apply edit to table, msg:{}, err:{}", msg,
source))]
- ApplyUpdateToTableWithCause { msg: String, source: GenericError },
-
- #[snafu(display(
- "Failed to apply snapshot to table, msg:{}.\nBacktrace:\n{:?}",
- msg,
- backtrace
- ))]
- ApplySnapshotToTableNoCause { msg: String, backtrace: Backtrace },
-
- #[snafu(display("Failed to apply snapshot to table, msg:{}, err:{}", msg,
source))]
- ApplySnapshotToTableWithCause { msg: String, source: GenericError },
-
- #[snafu(display("Failed to load snapshot, err:{}", source))]
- LoadSnapshot { source: GenericError },
-}
-
-define_result!(Error);
-
lazy_static! {
static ref RECOVER_TABLE_META_FROM_SNAPSHOT_DURATION: Histogram =
register_histogram!(
"recover_table_meta_from_snapshot_duration",
@@ -197,7 +103,7 @@ impl MetaUpdateLogEntryIterator for MetaUpdateReaderImpl {
.iter
.next_log_entries(decoder, |_| true, buffer)
.await
- .context(ReadEntry)?;
+ .map_err(anyhow::Error::new)?;
}
match self.buffer.pop_front() {
@@ -277,7 +183,7 @@ where
latest_seq = seq;
manifest_data_builder
.apply_update(update)
- .context(ApplyUpdate)?;
+ .map_err(anyhow::Error::new)?;
}
Ok(Snapshot {
end_seq: latest_seq,
@@ -302,7 +208,7 @@ where
latest_seq = seq;
manifest_data_builder
.apply_update(update)
- .context(ApplyUpdate)?;
+ .map_err(anyhow::Error::new)?;
has_logs = true;
}
@@ -633,7 +539,7 @@ impl MetaUpdateSnapshotStore for
ObjectStoreBasedSnapshotStore {
self.store
.put(&self.snapshot_path, payload.into())
.await
- .context(StoreSnapshot)?;
+ .map_err(anyhow::Error::new)?;
Ok(())
}
@@ -661,15 +567,13 @@ impl MetaUpdateSnapshotStore for
ObjectStoreBasedSnapshotStore {
}
let payload = get_res
- .context(FetchSnapshot)?
+ .map_err(anyhow::Error::new)?
.bytes()
.await
- .context(FetchSnapshot)?;
+ .map_err(anyhow::Error::new)?;
let snapshot_pb =
-
manifest_pb::Snapshot::decode(payload.as_bytes()).context(DecodeSnapshot)?;
- let snapshot = Snapshot::try_from(snapshot_pb)
- .box_err()
- .context(LoadSnapshot)?;
+
manifest_pb::Snapshot::decode(payload.as_bytes()).map_err(anyhow::Error::new)?;
+ let snapshot =
Snapshot::try_from(snapshot_pb).map_err(anyhow::Error::new)?;
Ok(Some(snapshot))
}
@@ -702,7 +606,7 @@ impl MetaUpdateLogStore for WalBasedLogStore {
.wal_manager
.read_batch(&ctx, &read_req)
.await
- .context(ReadWal)?;
+ .map_err(anyhow::Error::new)?;
Ok(MetaUpdateReaderImpl {
iter,
@@ -714,8 +618,12 @@ impl MetaUpdateLogStore for WalBasedLogStore {
async fn append(&self, meta_update: MetaUpdate) -> Result<SequenceNumber> {
let payload = MetaUpdatePayload::from(meta_update);
let log_batch_encoder = LogBatchEncoder::create(self.location);
- let log_batch =
log_batch_encoder.encode(&payload).context(EncodePayloads {
- wal_location: self.location,
+ let log_batch = log_batch_encoder.encode(&payload).map_err(|e| {
+ anyhow::anyhow!(
+ "Failed to encode payloads, wal_location:{:?}, err:{}",
+ self.location,
+ e
+ )
})?;
let write_ctx = WriteContext {
@@ -725,14 +633,14 @@ impl MetaUpdateLogStore for WalBasedLogStore {
self.wal_manager
.write(&write_ctx, &log_batch)
.await
- .context(WriteWal)
+ .map_err(|e| Error::from(anyhow::Error::new(e)))
}
async fn delete_up_to(&self, inclusive_end: SequenceNumber) -> Result<()> {
self.wal_manager
.mark_delete_entries_up_to(self.location, inclusive_end)
.await
- .context(CleanWal)
+ .map_err(|e| Error::from(anyhow::Error::new(e)))
}
}
diff --git a/src/analytic_engine/src/manifest/error.rs
b/src/analytic_engine/src/manifest/error.rs
new file mode 100644
index 00000000..632aaaad
--- /dev/null
+++ b/src/analytic_engine/src/manifest/error.rs
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use thiserror::Error;
+
+use crate::ErrorKind;
+
+#[derive(Debug, Error)]
+#[error(transparent)]
+pub struct Error(#[from] InnerError);
+
+impl From<anyhow::Error> for Error {
+ fn from(source: anyhow::Error) -> Self {
+ Self(InnerError::Other { source })
+ }
+}
+
+impl Error {
+ pub fn kind(&self) -> ErrorKind {
+ match self.0 {
+ InnerError::Other { .. } => ErrorKind::Internal,
+ }
+ }
+}
+
+#[derive(Error, Debug)]
+pub(crate) enum InnerError {
+ #[error(transparent)]
+ Other {
+ #[from]
+ source: anyhow::Error,
+ },
+}
diff --git a/src/analytic_engine/src/manifest/meta_edit.rs
b/src/analytic_engine/src/manifest/meta_edit.rs
index 0cfd8981..62a0848d 100644
--- a/src/analytic_engine/src/manifest/meta_edit.rs
+++ b/src/analytic_engine/src/manifest/meta_edit.rs
@@ -19,20 +19,19 @@
use std::convert::TryFrom;
+use anyhow::Context;
use bytes_ext::{Buf, BufMut};
use common_types::{
schema::{Schema, Version},
SequenceNumber,
};
use horaedbproto::{manifest as manifest_pb, schema as schema_pb};
-use macros::define_result;
use prost::Message;
-use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use table_engine::table::TableId;
use wal::log_batch::{Payload, PayloadDecodeContext, PayloadDecoder};
use crate::{
- manifest::meta_snapshot::MetaSnapshot,
+ manifest::{meta_snapshot::MetaSnapshot, Error, Result},
space::SpaceId,
sst::manager::FileId,
table::{
@@ -40,49 +39,9 @@ use crate::{
version::TableVersionMeta,
version_edit::{AddFile, DeleteFile, VersionEdit},
},
- table_options, TableOptions,
+ TableOptions,
};
-#[derive(Debug, Snafu)]
-pub enum Error {
- #[snafu(display("Failed to encode payload, err:{}.\nBacktrace:\n{}",
source, backtrace))]
- EncodePayloadPb {
- source: prost::EncodeError,
- backtrace: Backtrace,
- },
-
- #[snafu(display("Failed to decode payload, err:{}.\nBacktrace:\n{}",
source, backtrace))]
- DecodePayloadPb {
- source: prost::DecodeError,
- backtrace: Backtrace,
- },
-
- #[snafu(display("Failed to convert schema, err:{}", source))]
- ConvertSchema { source: common_types::schema::Error },
-
- #[snafu(display("Empty table schema.\nBacktrace:\n{}", backtrace))]
- EmptyTableSchema { backtrace: Backtrace },
-
- #[snafu(display("Empty table options.\nBacktrace:\n{}", backtrace))]
- EmptyTableOptions { backtrace: Backtrace },
-
- #[snafu(display("Failed to convert table options, err:{}", source))]
- ConvertTableOptions { source: table_options::Error },
-
- #[snafu(display("Empty meta update.\nBacktrace:\n{}", backtrace))]
- EmptyMetaUpdate { backtrace: Backtrace },
-
- #[snafu(display("Failed to convert version edit, err:{}", source))]
- ConvertVersionEdit {
- source: crate::table::version_edit::Error,
- },
-
- #[snafu(display("Failed to convert meta edit, msg:{}.\nBacktrace:\n{}",
msg, backtrace))]
- ConvertMetaEdit { msg: String, backtrace: Backtrace },
-}
-
-define_result!(Error);
-
/// Modifications to meta data in meta
#[derive(Debug, Clone)]
pub enum MetaUpdate {
@@ -133,7 +92,7 @@ impl TryFrom<manifest_pb::MetaUpdate> for MetaUpdate {
type Error = Error;
fn try_from(src: manifest_pb::MetaUpdate) -> Result<Self> {
- let meta_update = match src.meta.context(EmptyMetaUpdate)? {
+ let meta_update = match src.meta.context("Empty meta update.")? {
manifest_pb::meta_update::Meta::AddTable(v) => {
let add_table = AddTableMeta::try_from(v)?;
MetaUpdate::AddTable(add_table)
@@ -191,15 +150,15 @@ impl TryFrom<manifest_pb::AddTableMeta> for AddTableMeta {
type Error = Error;
fn try_from(src: manifest_pb::AddTableMeta) -> Result<Self> {
- let table_schema = src.schema.context(EmptyTableSchema)?;
- let opts = src.options.context(EmptyTableOptions)?;
+ let table_schema = src.schema.context("Empty table schema.")?;
+ let opts = src.options.context("Empty table options.")?;
Ok(Self {
space_id: src.space_id,
table_id: TableId::from(src.table_id),
table_name: src.table_name,
- schema: Schema::try_from(table_schema).context(ConvertSchema)?,
- opts: TableOptions::try_from(opts).context(ConvertTableOptions)?,
+ schema:
Schema::try_from(table_schema).map_err(anyhow::Error::new)?,
+ opts: TableOptions::try_from(opts).map_err(anyhow::Error::new)?,
})
}
}
@@ -288,12 +247,12 @@ impl TryFrom<manifest_pb::VersionEditMeta> for
VersionEditMeta {
fn try_from(src: manifest_pb::VersionEditMeta) -> Result<Self> {
let mut files_to_add = Vec::with_capacity(src.files_to_add.len());
for file_meta in src.files_to_add {
-
files_to_add.push(AddFile::try_from(file_meta).context(ConvertVersionEdit)?);
+
files_to_add.push(AddFile::try_from(file_meta).map_err(anyhow::Error::new)?);
}
let mut files_to_delete =
Vec::with_capacity(src.files_to_delete.len());
for file_meta in src.files_to_delete {
-
files_to_delete.push(DeleteFile::try_from(file_meta).context(ConvertVersionEdit)?);
+
files_to_delete.push(DeleteFile::try_from(file_meta).map_err(anyhow::Error::new)?);
}
Ok(Self {
@@ -332,12 +291,12 @@ impl TryFrom<manifest_pb::AlterSchemaMeta> for
AlterSchemaMeta {
type Error = Error;
fn try_from(src: manifest_pb::AlterSchemaMeta) -> Result<Self> {
- let table_schema = src.schema.context(EmptyTableSchema)?;
+ let table_schema = src.schema.context("Empty table schema.")?;
Ok(Self {
space_id: src.space_id,
table_id: TableId::from(src.table_id),
- schema: Schema::try_from(table_schema).context(ConvertSchema)?,
+ schema:
Schema::try_from(table_schema).map_err(anyhow::Error::new)?,
pre_schema_version: src.pre_schema_version,
})
}
@@ -365,12 +324,12 @@ impl TryFrom<manifest_pb::AlterOptionsMeta> for
AlterOptionsMeta {
type Error = Error;
fn try_from(src: manifest_pb::AlterOptionsMeta) -> Result<Self> {
- let table_options = src.options.context(EmptyTableOptions)?;
+ let table_options = src.options.context("Empty table options.")?;
Ok(Self {
space_id: src.space_id,
table_id: TableId::from(src.table_id),
- options:
TableOptions::try_from(table_options).context(ConvertTableOptions)?,
+ options:
TableOptions::try_from(table_options).map_err(anyhow::Error::new)?,
})
}
}
@@ -400,7 +359,8 @@ impl Payload for MetaUpdatePayload {
}
fn encode_to<B: BufMut>(&self, buf: &mut B) -> Result<()> {
- self.0.encode(buf).context(EncodePayloadPb)
+ self.0.encode(buf).map_err(anyhow::Error::new)?;
+ Ok(())
}
}
@@ -413,7 +373,7 @@ impl PayloadDecoder for MetaUpdateDecoder {
fn decode<B: Buf>(&self, _ctx: &PayloadDecodeContext, buf: &mut B) ->
Result<Self::Target> {
let meta_update_pb =
-
manifest_pb::MetaUpdate::decode(buf.chunk()).context(DecodePayloadPb)?;
+
manifest_pb::MetaUpdate::decode(buf.chunk()).map_err(anyhow::Error::new)?;
MetaUpdate::try_from(meta_update_pb)
}
}
@@ -506,10 +466,9 @@ impl TryFrom<MetaEdit> for MetaUpdate {
if let MetaEdit::Update(update) = value {
Ok(update)
} else {
- ConvertMetaEdit {
- msg: "it is not the update type meta edit",
- }
- .fail()
+ Err(Self::Error::from(anyhow::anyhow!(
+ "Failed to convert meta edit, it is not the update type meta
edit"
+ )))
}
}
}
@@ -521,10 +480,9 @@ impl TryFrom<MetaEdit> for MetaSnapshot {
if let MetaEdit::Snapshot(table_manifest_data) = value {
Ok(table_manifest_data)
} else {
- ConvertMetaEdit {
- msg: "it is not the snapshot type meta edit",
- }
- .fail()
+ Err(Self::Error::from(anyhow::anyhow!(
+ "Failed to convert meta edit, it is not the snapshot type meta
edit"
+ )))
}
}
}
diff --git a/src/analytic_engine/src/manifest/meta_snapshot.rs
b/src/analytic_engine/src/manifest/meta_snapshot.rs
index 705a4e03..2dc5a1b1 100644
--- a/src/analytic_engine/src/manifest/meta_snapshot.rs
+++ b/src/analytic_engine/src/manifest/meta_snapshot.rs
@@ -18,29 +18,16 @@
//! Meta data of manifest.
use logger::debug;
-use macros::define_result;
-use snafu::{ensure, Backtrace, Snafu};
+use macros::ensure;
use crate::{
- manifest::meta_edit::{AddTableMeta, MetaUpdate},
+ manifest::{
+ meta_edit::{AddTableMeta, MetaUpdate},
+ Result,
+ },
table::version::TableVersionMeta,
};
-#[derive(Debug, Snafu)]
-pub enum Error {
- #[snafu(display(
- "Apply update on non-exist table, meta update:{:?}\nBacktrace:\n{}",
- update,
- backtrace
- ))]
- TableNotFound {
- update: MetaUpdate,
- backtrace: Backtrace,
- },
-}
-
-define_result!(Error);
-
#[derive(Debug, Clone, PartialEq)]
pub struct MetaSnapshot {
pub table_meta: AddTableMeta,
@@ -83,7 +70,10 @@ impl MetaSnapshotBuilder {
if let MetaUpdate::AddTable(_) = &update {
} else {
- ensure!(self.is_table_exists(), TableNotFound { update });
+ ensure!(
+ self.is_table_exists(),
+ "Apply update on non-exist table, meta update:{update:?}",
+ );
}
match update {
diff --git a/src/analytic_engine/src/manifest/mod.rs
b/src/analytic_engine/src/manifest/mod.rs
index 8985734f..05a54192 100644
--- a/src/analytic_engine/src/manifest/mod.rs
+++ b/src/analytic_engine/src/manifest/mod.rs
@@ -18,6 +18,7 @@
//! Manage meta data of the engine
pub mod details;
+pub mod error;
pub mod meta_edit;
pub mod meta_snapshot;
@@ -25,11 +26,15 @@ use std::{fmt, sync::Arc};
use async_trait::async_trait;
use common_types::table::ShardId;
+pub use error::Error;
use generic_error::GenericResult;
+use macros::define_result;
use table_engine::table::TableId;
use crate::{manifest::meta_edit::MetaEditRequest, space::SpaceId,
table::data::TableCatalogInfo};
+define_result!(error::Error);
+
#[derive(Debug)]
pub struct LoadRequest {
pub space_id: SpaceId,
diff --git a/src/analytic_engine/src/memtable/key.rs
b/src/analytic_engine/src/memtable/key.rs
index 7d5a1e46..ef9acb66 100644
--- a/src/analytic_engine/src/memtable/key.rs
+++ b/src/analytic_engine/src/memtable/key.rs
@@ -30,35 +30,9 @@ use std::mem;
use bytes_ext::{BufMut, BytesMut, SafeBuf, SafeBufMut};
use codec::{memcomparable::MemComparable, Decoder, Encoder};
use common_types::{row::Row, schema::Schema, SequenceNumber};
-use macros::define_result;
-use snafu::{ensure, Backtrace, ResultExt, Snafu};
+use macros::ensure;
-#[derive(Debug, Snafu)]
-pub enum Error {
- #[snafu(display("Failed to encode key datum, err:{}", source))]
- EncodeKeyDatum { source: codec::memcomparable::Error },
-
- #[snafu(display("Failed to encode sequence, err:{}", source))]
- EncodeSequence { source: bytes_ext::Error },
-
- #[snafu(display("Failed to encode row index, err:{}", source))]
- EncodeIndex { source: bytes_ext::Error },
-
- #[snafu(display("Failed to decode sequence, err:{}", source))]
- DecodeSequence { source: bytes_ext::Error },
-
- #[snafu(display("Failed to decode row index, err:{}", source))]
- DecodeIndex { source: bytes_ext::Error },
-
- #[snafu(display(
- "Insufficient internal key length, len:{}.\nBacktrace:\n{}",
- len,
- backtrace
- ))]
- InternalKeyLen { len: usize, backtrace: Backtrace },
-}
-
-define_result!(Error);
+use crate::memtable::{Error, Result};
// u64 + u32
const KEY_SEQUENCE_BYTES_LEN: usize = 12;
@@ -128,7 +102,9 @@ impl<'a> Encoder<Row> for ComparableInternalKey<'a> {
fn encode<B: BufMut>(&self, buf: &mut B, value: &Row) -> Result<()> {
let encoder = MemComparable;
for idx in self.schema.primary_key_indexes() {
- encoder.encode(buf, &value[*idx]).context(EncodeKeyDatum)?;
+ encoder
+ .encode(buf, &value[*idx])
+ .map_err(anyhow::Error::new)?;
}
SequenceCodec.encode(buf, &self.sequence)?;
@@ -156,7 +132,8 @@ impl Encoder<KeySequence> for SequenceCodec {
// Encode sequence number and index in descend order
encode_sequence_number(buf, value.sequence())?;
let reversed_index = RowIndex::MAX - value.row_index();
- buf.try_put_u32(reversed_index).context(EncodeIndex)?;
+ buf.try_put_u32(reversed_index)
+ .map_err(anyhow::Error::new)?;
Ok(())
}
@@ -170,10 +147,10 @@ impl Decoder<KeySequence> for SequenceCodec {
type Error = Error;
fn decode<B: SafeBuf>(&self, buf: &mut B) -> Result<KeySequence> {
- let sequence = buf.try_get_u64().context(DecodeSequence)?;
+ let sequence = buf.try_get_u64().map_err(anyhow::Error::new)?;
// Reverse sequence
let sequence = SequenceNumber::MAX - sequence;
- let row_index = buf.try_get_u32().context(DecodeIndex)?;
+ let row_index = buf.try_get_u32().map_err(anyhow::Error::new)?;
// Reverse row index
let row_index = RowIndex::MAX - row_index;
@@ -186,7 +163,8 @@ fn encode_sequence_number<B: SafeBufMut>(buf: &mut B,
sequence: SequenceNumber)
// The sequence need to encode in descend order
let reversed_sequence = SequenceNumber::MAX - sequence;
// Encode sequence
- buf.try_put_u64(reversed_sequence).context(EncodeSequence)?;
+ buf.try_put_u64(reversed_sequence)
+ .map_err(anyhow::Error::new)?;
Ok(())
}
@@ -219,9 +197,10 @@ pub fn user_key_from_internal_key(internal_key: &[u8]) ->
Result<(&[u8], KeySequ
// Empty user key is meaningless
ensure!(
internal_key.len() > KEY_SEQUENCE_BYTES_LEN,
- InternalKeyLen {
- len: internal_key.len(),
- }
+ anyhow::anyhow!(
+ "Insufficient internal key length, len:{}",
+ internal_key.len()
+ )
);
let (left, mut right) = internal_key.split_at(internal_key.len() -
KEY_SEQUENCE_BYTES_LEN);
diff --git a/src/analytic_engine/src/table_meta_set_impl.rs
b/src/analytic_engine/src/table_meta_set_impl.rs
index a7d196d8..e14f5e4a 100644
--- a/src/analytic_engine/src/table_meta_set_impl.rs
+++ b/src/analytic_engine/src/table_meta_set_impl.rs
@@ -19,18 +19,14 @@
use std::{fmt, num::NonZeroUsize, sync::Arc};
-use generic_error::BoxError;
+use anyhow::Context;
use id_allocator::IdAllocator;
use logger::debug;
-use snafu::{OptionExt, ResultExt};
use table_engine::table::TableId;
use crate::{
manifest::{
- details::{
- ApplySnapshotToTableWithCause, ApplyUpdateToTableNoCause,
ApplyUpdateToTableWithCause,
- BuildSnapshotNoCause, TableMetaSet,
- },
+ details::TableMetaSet,
meta_edit::{
self, AddTableMeta, AlterOptionsMeta, AlterSchemaMeta,
DropTableMeta, MetaEditRequest,
MetaUpdate, VersionEditMeta,
@@ -78,32 +74,22 @@ impl TableMetaSetImpl {
space_id: SpaceId,
table_key: TableKey<'_>,
apply_edit: F,
- ) -> crate::manifest::details::Result<TableDataRef>
+ ) -> crate::manifest::Result<TableDataRef>
where
- F: FnOnce(SpaceRef, TableDataRef) ->
crate::manifest::details::Result<()>,
+ F: FnOnce(SpaceRef, TableDataRef) -> crate::manifest::Result<()>,
{
let spaces = self.spaces.read().unwrap();
let space = spaces
.get_by_id(space_id)
- .with_context(|| ApplyUpdateToTableNoCause {
- msg: format!("space not found, space_id:{space_id}"),
- })?;
+ .ok_or_else(|| anyhow::anyhow!("space not found,
space_id:{space_id}"))?;
let table_data = match table_key {
- TableKey::Name(name) => {
- space
- .find_table(name)
- .with_context(|| ApplyUpdateToTableNoCause {
- msg: format!("table not found, space_id:{space_id},
table_name:{name}"),
- })?
- }
- TableKey::Id(id) => {
- space
- .find_table_by_id(id)
- .with_context(|| ApplyUpdateToTableNoCause {
- msg: format!("table not found, space_id:{space_id},
table_id:{id}"),
- })?
- }
+ TableKey::Name(name) => space.find_table(name).ok_or_else(|| {
+ anyhow::anyhow!("table not found, space_id:{space_id},
table_name:{name}")
+ })?,
+ TableKey::Id(id) => space.find_table_by_id(id).ok_or_else(|| {
+ anyhow::anyhow!("table not found, space_id:{space_id},
table_id:{id}")
+ })?,
};
apply_edit(space.clone(), table_data.clone())?;
@@ -116,7 +102,7 @@ impl TableMetaSetImpl {
meta_update: MetaUpdate,
shard_info: TableShardInfo,
table_catalog_info: TableCatalogInfo,
- ) -> crate::manifest::details::Result<TableDataRef> {
+ ) -> crate::manifest::Result<TableDataRef> {
match meta_update {
MetaUpdate::AddTable(AddTableMeta {
space_id,
@@ -126,12 +112,9 @@ impl TableMetaSetImpl {
opts,
}) => {
let spaces = self.spaces.read().unwrap();
- let space =
- spaces
- .get_by_id(space_id)
- .with_context(|| ApplyUpdateToTableNoCause {
- msg: format!("space not found,
space_id:{space_id}"),
- })?;
+ let space = spaces
+ .get_by_id(space_id)
+ .ok_or_else(|| anyhow::anyhow!("space not found,
space_id:{space_id}"))?;
let mem_size_options = MemSizeOptions {
collector: space.mem_usage_collector.clone(),
@@ -161,13 +144,10 @@ impl TableMetaSetImpl {
&self.file_purger,
mem_size_options,
)
- .box_err()
- .with_context(|| ApplyUpdateToTableWithCause {
- msg: format!(
- "failed to new table data, space_id:{},
table_id:{}",
- space.id, table_id
- ),
- })?,
+ .context(format!(
+ "failed to new table data, space_id:{}, table_id:{}",
+ space.id, table_id
+ ))?,
);
space.insert_table(table_data.clone());
@@ -253,7 +233,7 @@ impl TableMetaSetImpl {
meta_snapshot: MetaSnapshot,
shard_info: TableShardInfo,
table_catalog_info: TableCatalogInfo,
- ) -> crate::manifest::details::Result<TableDataRef> {
+ ) -> crate::manifest::Result<TableDataRef> {
debug!("TableMetaSet apply snapshot, snapshot :{:?}", meta_snapshot);
let MetaSnapshot {
@@ -265,9 +245,7 @@ impl TableMetaSetImpl {
let spaces = self.spaces.read().unwrap();
let space = spaces
.get_by_id(space_id)
- .with_context(|| ApplyUpdateToTableNoCause {
- msg: format!("space not found, space_id:{space_id}"),
- })?;
+ .ok_or_else(|| anyhow::anyhow!("space not found,
space_id:{space_id}"))?;
// Apply max file id to the allocator
let allocator = match version_meta.clone() {
@@ -298,13 +276,10 @@ impl TableMetaSetImpl {
allocator,
table_catalog_info,
)
- .box_err()
- .with_context(|| ApplySnapshotToTableWithCause {
- msg: format!(
- "failed to new table_data, space_id:{}, table_name:{}",
- space.id, table_name
- ),
- })?,
+ .context(format!(
+ "failed to new table_data, space_id:{}, table_name:{}",
+ space.id, table_name
+ ))?,
);
// Apply version meta to the table.
@@ -333,17 +308,19 @@ impl TableMetaSet for TableMetaSetImpl {
&self,
space_id: SpaceId,
table_id: TableId,
- ) -> crate::manifest::details::Result<Option<MetaSnapshot>> {
+ ) -> crate::manifest::Result<Option<MetaSnapshot>> {
let table_data = {
let spaces = self.spaces.read().unwrap();
spaces
.get_by_id(space_id)
- .context(BuildSnapshotNoCause {
- msg: format!("space not exist, space_id:{space_id},
table_id:{table_id}",),
+ .ok_or_else(|| {
+ anyhow::anyhow!("space not exist, space_id:{space_id},
table_id:{table_id}")
})?
.find_table_by_id(table_id)
- .context(BuildSnapshotNoCause {
- msg: format!("table data not exist, space_id:{space_id},
table_id:{table_id}",),
+ .ok_or_else(|| {
+ anyhow::anyhow!(
+ "table data not exist, space_id:{space_id},
table_id:{table_id}"
+ )
})?
};
@@ -383,7 +360,7 @@ impl TableMetaSet for TableMetaSetImpl {
fn apply_edit_to_table(
&self,
request: crate::manifest::meta_edit::MetaEditRequest,
- ) -> crate::manifest::details::Result<TableDataRef> {
+ ) -> crate::manifest::Result<TableDataRef> {
let MetaEditRequest {
shard_info,
meta_edit,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]