This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git


The following commit(s) were added to refs/heads/dev by this push:
     new dc10253f feat: add table status check (#1418)
dc10253f is described below

commit dc10253f31d7bac3ab46c44424584ebd2fb99b8f
Author: CooooolFrog <[email protected]>
AuthorDate: Mon Jan 8 09:57:15 2024 +0800

    feat: add table status check (#1418)
    
    ## Rationale
    Refer to this issue
    https://github.com/apache/incubator-horaedb/issues/1386, currently, if
    the status of the shard is abnormal, we cannot get any valid exception
    information from the error message `table not found`.
    
    ## Detailed Changes
    * Add `TableStatus` in `cluster`, you can use it to get the status of
    the table in the current cluster..
    * Add `SchemaWithCluster`, It wraps the schema inside the cluster,
    through which the state of the cluster and schema can be combined.
    
    ## Test Plan
    Pass CI.
---
 catalog/src/schema.rs              |   3 +
 catalog_impls/src/cluster_based.rs | 115 +++++++++++++++++++++++++++++++++++++
 catalog_impls/src/lib.rs           |   1 +
 catalog_impls/src/volatile.rs      |  20 +++++--
 cluster/src/cluster_impl.rs        |  21 ++++++-
 cluster/src/lib.rs                 |  24 +++++++-
 cluster/src/shard_set.rs           |  10 ++++
 meta_client/src/types.rs           |   5 ++
 router/src/cluster_based.rs        |   5 ++
 src/ceresdb/src/setup.rs           |   7 ++-
 10 files changed, 202 insertions(+), 9 deletions(-)

diff --git a/catalog/src/schema.rs b/catalog/src/schema.rs
index 51fb7f82..c3997ca4 100644
--- a/catalog/src/schema.rs
+++ b/catalog/src/schema.rs
@@ -181,6 +181,9 @@ pub enum Error {
         table: String,
         backtrace: Backtrace,
     },
+
+    #[snafu(display("Table is not ready, err:{}", source))]
+    TableNotReady { source: GenericError },
 }
 
 define_result!(Error);
diff --git a/catalog_impls/src/cluster_based.rs 
b/catalog_impls/src/cluster_based.rs
new file mode 100644
index 00000000..650d2019
--- /dev/null
+++ b/catalog_impls/src/cluster_based.rs
@@ -0,0 +1,115 @@
+// Copyright 2023 The HoraeDB Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use async_trait::async_trait;
+use catalog::{
+    schema,
+    schema::{
+        CreateOptions, CreateTableRequest, DropOptions, DropTableRequest, 
NameRef, Schema,
+        SchemaRef, TableNotReady,
+    },
+};
+use cluster::{ClusterRef, TableStatus};
+use generic_error::BoxError;
+use snafu::{ResultExt, Snafu};
+use table_engine::table::{SchemaId, TableRef};
+
+#[derive(Debug, Snafu)]
+#[snafu(visibility(pub))]
+pub enum Error {
+    #[snafu(display("Invalid table status, status:{:?}", status))]
+    InvalidTableStatus { status: TableStatus },
+}
+
+/// A cluster-based implementation for [`schema`].
+
+/// Schema with cluster.
+/// It binds cluster and schema and will detect the health status of the 
cluster
+/// when calling the schema interface.
+pub(crate) struct SchemaWithCluster {
+    internal: SchemaRef,
+
+    cluster: ClusterRef,
+}
+
+impl SchemaWithCluster {
+    pub(crate) fn new(internal: SchemaRef, cluster: ClusterRef) -> 
SchemaWithCluster {
+        SchemaWithCluster { internal, cluster }
+    }
+
+    // Get table status, return None when table not found in shard.
+    fn table_status(&self, table_name: NameRef) -> Option<TableStatus> {
+        self.cluster.get_table_status(self.name(), table_name)
+    }
+}
+
+#[async_trait]
+impl Schema for SchemaWithCluster {
+    fn name(&self) -> NameRef {
+        self.internal.name()
+    }
+
+    fn id(&self) -> SchemaId {
+        self.internal.id()
+    }
+
+    fn table_by_name(&self, name: NameRef) -> schema::Result<Option<TableRef>> 
{
+        let find_table_result = self.internal.table_by_name(name)?;
+
+        if find_table_result.is_none() {
+            return match self.table_status(name) {
+                // Table not found in schema and shard not contains this table.
+                None => Ok(None),
+                // Table not found in schema but shard contains this table.
+                // Check the status of the shard.
+                Some(table_status) => InvalidTableStatus {
+                    status: table_status,
+                }
+                .fail()
+                .box_err()
+                .with_context(|| TableNotReady {})?,
+            };
+        }
+
+        Ok(find_table_result)
+    }
+
+    async fn create_table(
+        &self,
+        request: CreateTableRequest,
+        opts: CreateOptions,
+    ) -> schema::Result<TableRef> {
+        self.internal.create_table(request, opts).await
+    }
+
+    async fn drop_table(
+        &self,
+        request: DropTableRequest,
+        opts: DropOptions,
+    ) -> schema::Result<bool> {
+        self.internal.drop_table(request, opts).await
+    }
+
+    fn all_tables(&self) -> schema::Result<Vec<TableRef>> {
+        self.internal.all_tables()
+    }
+
+    fn register_table(&self, table: TableRef) {
+        self.internal.register_table(table)
+    }
+
+    fn unregister_table(&self, table_name: &str) {
+        self.internal.unregister_table(table_name)
+    }
+}
diff --git a/catalog_impls/src/lib.rs b/catalog_impls/src/lib.rs
index 2abbda95..90edc1b1 100644
--- a/catalog_impls/src/lib.rs
+++ b/catalog_impls/src/lib.rs
@@ -24,6 +24,7 @@ use system_catalog::{tables::Tables, SystemTableAdapter};
 
 use crate::system_tables::{SystemTables, SystemTablesBuilder};
 
+mod cluster_based;
 mod system_tables;
 pub mod table_based;
 pub mod volatile;
diff --git a/catalog_impls/src/volatile.rs b/catalog_impls/src/volatile.rs
index e217d157..a2aa7b99 100644
--- a/catalog_impls/src/volatile.rs
+++ b/catalog_impls/src/volatile.rs
@@ -32,7 +32,7 @@ use catalog::{
     },
     Catalog, CatalogRef, CreateSchemaWithCause,
 };
-use cluster::shard_set::ShardSet;
+use cluster::{shard_set::ShardSet, ClusterRef};
 use common_types::schema::SchemaName;
 use generic_error::BoxError;
 use logger::{debug, info};
@@ -41,19 +41,23 @@ use snafu::{ensure, OptionExt, ResultExt};
 use table_engine::table::{SchemaId, TableRef};
 use tokio::sync::Mutex;
 
+use crate::cluster_based::SchemaWithCluster;
+
 /// ManagerImpl manages multiple volatile catalogs.
 pub struct ManagerImpl {
     catalogs: HashMap<String, Arc<CatalogImpl>>,
     shard_set: ShardSet,
     meta_client: MetaClientRef,
+    cluster: ClusterRef,
 }
 
 impl ManagerImpl {
-    pub fn new(shard_set: ShardSet, meta_client: MetaClientRef) -> Self {
+    pub fn new(shard_set: ShardSet, meta_client: MetaClientRef, cluster: 
ClusterRef) -> Self {
         let mut manager = ManagerImpl {
             catalogs: HashMap::new(),
             shard_set,
             meta_client,
+            cluster,
         };
 
         manager.maybe_create_default_catalog();
@@ -101,6 +105,7 @@ impl ManagerImpl {
             schemas: RwLock::new(HashMap::new()),
             shard_set: self.shard_set.clone(),
             meta_client: self.meta_client.clone(),
+            cluster: self.cluster.clone(),
         });
 
         self.catalogs.insert(catalog_name, catalog.clone());
@@ -121,6 +126,7 @@ struct CatalogImpl {
     schemas: RwLock<HashMap<SchemaName, SchemaRef>>,
     shard_set: ShardSet,
     meta_client: MetaClientRef,
+    cluster: ClusterRef,
 }
 
 #[async_trait]
@@ -171,7 +177,10 @@ impl Catalog for CatalogImpl {
             self.shard_set.clone(),
         ));
 
-        schemas.insert(name.to_string(), schema);
+        let cluster_based: SchemaRef =
+            Arc::new(SchemaWithCluster::new(schema, self.cluster.clone()));
+
+        schemas.insert(name.to_string(), cluster_based);
 
         info!(
             "create schema success, catalog:{}, schema:{}",
@@ -282,7 +291,10 @@ impl Schema for SchemaImpl {
     }
 
     fn table_by_name(&self, name: NameRef) -> schema::Result<Option<TableRef>> 
{
-        let table = self.tables.read().unwrap().get(name).cloned();
+        let table = self
+            .get_table(self.catalog_name.as_str(), self.schema_name.as_str(), 
name)
+            .unwrap()
+            .clone();
         Ok(table)
     }
 
diff --git a/cluster/src/cluster_impl.rs b/cluster/src/cluster_impl.rs
index 6804081a..3ec73a00 100644
--- a/cluster/src/cluster_impl.rs
+++ b/cluster/src/cluster_impl.rs
@@ -44,7 +44,7 @@ use crate::{
     topology::ClusterTopology,
     Cluster, ClusterNodesNotFound, ClusterNodesResp, 
EtcdClientFailureWithCause,
     InitEtcdClientConfig, InvalidArguments, MetaClientFailure, OpenShard, 
OpenShardWithCause,
-    Result, ShardNotFound,
+    Result, ShardNotFound, TableStatus,
 };
 
 /// ClusterImpl is an implementation of [`Cluster`] based [`MetaClient`].
@@ -311,6 +311,19 @@ impl Inner {
         self.shard_set.get(shard_id)
     }
 
+    /// Get shard by table name.
+    ///
+    /// This method is similar to `route_tables`, but it will not send request
+    /// to meta server, it only load data from local cache.
+    /// If target table is not found in any shards in this cluster, return 
None.
+    /// Otherwise, return the shard where this table is exists.
+    fn get_shard_by_table_name(&self, schema_name: &str, table_name: &str) -> 
Option<ShardRef> {
+        let shards = self.shard_set.all_shards();
+        shards
+            .into_iter()
+            .find(|shard| shard.find_table(schema_name, table_name).is_some())
+    }
+
     fn close_shard(&self, shard_id: ShardId) -> Result<ShardRef> {
         info!("Remove shard from shard_set, id:{shard_id}");
         self.shard_set
@@ -368,6 +381,12 @@ impl Cluster for ClusterImpl {
         self.inner.shard(shard_id)
     }
 
+    fn get_table_status(&self, schema_name: &str, table_name: &str) -> 
Option<TableStatus> {
+        self.inner
+            .get_shard_by_table_name(schema_name, table_name)
+            .map(|shard| TableStatus::from(shard.get_status()))
+    }
+
     async fn close_shard(&self, shard_id: ShardId) -> Result<ShardRef> {
         self.inner.close_shard(shard_id)
     }
diff --git a/cluster/src/lib.rs b/cluster/src/lib.rs
index be7374d7..f1326529 100644
--- a/cluster/src/lib.rs
+++ b/cluster/src/lib.rs
@@ -29,7 +29,8 @@ use common_types::schema::SchemaName;
 use generic_error::GenericError;
 use macros::define_result;
 use meta_client::types::{
-    ClusterNodesRef, RouteTablesRequest, RouteTablesResponse, ShardId, 
ShardInfo, ShardVersion,
+    ClusterNodesRef, RouteTablesRequest, RouteTablesResponse, ShardId, 
ShardInfo, ShardStatus,
+    ShardVersion,
 };
 use shard_lock_manager::ShardLockManagerRef;
 use snafu::{Backtrace, Snafu};
@@ -161,6 +162,23 @@ pub enum Error {
 
 define_result!(Error);
 
+#[derive(Debug)]
+pub enum TableStatus {
+    Ready,
+    Recovering,
+    Frozen,
+}
+
+impl From<ShardStatus> for TableStatus {
+    fn from(value: ShardStatus) -> Self {
+        match value {
+            ShardStatus::Init | ShardStatus::Opening => 
TableStatus::Recovering,
+            ShardStatus::Ready => TableStatus::Ready,
+            ShardStatus::Frozen => TableStatus::Frozen,
+        }
+    }
+}
+
 pub type ClusterRef = Arc<dyn Cluster + Send + Sync>;
 
 #[derive(Clone, Debug)]
@@ -184,12 +202,14 @@ pub trait Cluster {
     /// None.
     fn shard(&self, shard_id: ShardId) -> Option<ShardRef>;
 
+    fn get_table_status(&self, schema_name: &str, table_name: &str) -> 
Option<TableStatus>;
+
     /// Close shard.
     ///
     /// Return error if the shard is not found.
     async fn close_shard(&self, shard_id: ShardId) -> Result<ShardRef>;
 
-    /// list shards
+    /// list loaded shards in current node.
     fn list_shards(&self) -> Vec<ShardInfo>;
 
     async fn route_tables(&self, req: &RouteTablesRequest) -> 
Result<RouteTablesResponse>;
diff --git a/cluster/src/shard_set.rs b/cluster/src/shard_set.rs
index b815c604..00bbb9ef 100644
--- a/cluster/src/shard_set.rs
+++ b/cluster/src/shard_set.rs
@@ -132,11 +132,21 @@ impl Shard {
         ret
     }
 
+    pub fn get_status(&self) -> ShardStatus {
+        let data = self.data.read().unwrap();
+        data.shard_info.status.clone()
+    }
+
     pub fn is_opened(&self) -> bool {
         let data = self.data.read().unwrap();
         data.is_opened()
     }
 
+    pub fn is_frozen(&self) -> bool {
+        let data = self.data.read().unwrap();
+        data.is_frozen()
+    }
+
     pub async fn close(&self, ctx: CloseContext) -> Result<()> {
         let operator = self.operator.lock().await;
         operator.close(ctx).await
diff --git a/meta_client/src/types.rs b/meta_client/src/types.rs
index f822428e..f23e8c54 100644
--- a/meta_client/src/types.rs
+++ b/meta_client/src/types.rs
@@ -226,6 +226,11 @@ impl ShardInfo {
     pub fn is_opened(&self) -> bool {
         matches!(self.status, ShardStatus::Ready | ShardStatus::Frozen)
     }
+
+    #[inline]
+    pub fn is_ready(&self) -> bool {
+        matches!(self.status, ShardStatus::Ready)
+    }
 }
 
 #[derive(Debug, Default, Copy, Clone, Eq, PartialEq, Serialize)]
diff --git a/router/src/cluster_based.rs b/router/src/cluster_based.rs
index 83d2b266..04f30c84 100644
--- a/router/src/cluster_based.rs
+++ b/router/src/cluster_based.rs
@@ -201,6 +201,7 @@ mod tests {
     use ceresdbproto::storage::{RequestContext, RouteRequest as 
RouteRequestPb};
     use cluster::{
         shard_lock_manager::ShardLockManagerRef, shard_set::ShardRef, Cluster, 
ClusterNodesResp,
+        TableStatus,
     };
     use common_types::table::ShardId;
     use meta_client::types::{
@@ -230,6 +231,10 @@ mod tests {
             unimplemented!();
         }
 
+        fn get_table_status(&self, _: &str, _: &str) -> Option<TableStatus> {
+            unimplemented!()
+        }
+
         async fn close_shard(&self, _: ShardId) -> cluster::Result<ShardRef> {
             unimplemented!();
         }
diff --git a/src/ceresdb/src/setup.rs b/src/ceresdb/src/setup.rs
index a7ae6a31..d6d4e770 100644
--- a/src/ceresdb/src/setup.rs
+++ b/src/ceresdb/src/setup.rs
@@ -334,8 +334,11 @@ async fn build_with_meta<T: WalsOpener>(
     };
     let engine_proxy = build_table_engine_proxy(engine_builder).await;
 
-    let meta_based_manager_ref =
-        Arc::new(volatile::ManagerImpl::new(shard_set, meta_client.clone()));
+    let meta_based_manager_ref = Arc::new(volatile::ManagerImpl::new(
+        shard_set,
+        meta_client.clone(),
+        cluster.clone(),
+    ));
 
     // Build catalog manager.
     let catalog_manager = 
Arc::new(CatalogManagerImpl::new(meta_based_manager_ref));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to