This is an automated email from the ASF dual-hosted git repository.

xikai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb-meta.git


The following commit(s) were added to refs/heads/main by this push:
     new d880dd2  refactor: make create table idempotent (#286)
d880dd2 is described below

commit d880dd24df1d37599fb9e565733d0442540cb6cf
Author: CooooolFrog <[email protected]>
AuthorDate: Tue Jan 23 15:35:47 2024 +0800

    refactor: make create table idempotent (#286)
    
    ## Rationale
    Store the intermediate data for table creation through procedures to
    support idempotent table creation.
    
    ## Detailed Changes
    * Store the intermediate data for table creation through procedure.
    * Supports retrying from the upper layer to complete failed table
    creation procedures.
    
    ## Test Plan
    Pass all unit tests and integration tests.
---
 server/cluster/cluster.go                          |  2 +-
 server/cluster/metadata/cluster_metadata.go        | 33 +++++++-
 server/cluster/metadata/topology_manager.go        | 95 +++++++++++++++++++++-
 server/coordinator/factory.go                      | 32 +++++---
 server/coordinator/factory_test.go                 |  2 +-
 server/coordinator/persist_shard_picker.go         | 80 ++++++++++++++++++
 server/coordinator/persist_shard_picker_test.go    | 87 ++++++++++++++++++++
 .../procedure/ddl/createtable/create_table.go      | 77 ++++++++++++++----
 server/coordinator/procedure/error.go              |  1 +
 .../scheduler/manager/scheduler_manager_test.go    |  2 +-
 .../scheduler/rebalanced/scheduler_test.go         | 10 ++-
 .../coordinator/scheduler/reopen/scheduler_test.go |  4 +-
 .../coordinator/scheduler/static/scheduler_test.go | 10 ++-
 server/etcdutil/util.go                            | 29 ++++++-
 server/etcdutil/util_test.go                       | 39 +++++++++
 server/storage/key_path.go                         | 14 ++++
 server/storage/meta.go                             |  7 ++
 server/storage/storage_impl.go                     | 67 +++++++++++++++
 server/storage/types.go                            | 27 ++++++
 19 files changed, 576 insertions(+), 42 deletions(-)

diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go
index 6af9b2a..9513617 100644
--- a/server/cluster/cluster.go
+++ b/server/cluster/cluster.go
@@ -58,7 +58,7 @@ func NewCluster(logger *zap.Logger, metadata 
*metadata.ClusterMetadata, client *
        dispatch := eventdispatch.NewDispatchImpl()
 
        procedureIDRootPath := strings.Join([]string{rootPath, metadata.Name(), 
defaultProcedurePrefixKey}, "/")
-       procedureFactory := coordinator.NewFactory(logger, 
id.NewAllocatorImpl(logger, client, procedureIDRootPath, defaultAllocStep), 
dispatch, procedureStorage)
+       procedureFactory := coordinator.NewFactory(logger, 
id.NewAllocatorImpl(logger, client, procedureIDRootPath, defaultAllocStep), 
dispatch, procedureStorage, metadata)
 
        schedulerManager := manager.NewManager(logger, procedureManager, 
procedureFactory, metadata, client, rootPath, metadata.GetTopologyType(), 
metadata.GetProcedureExecutingBatchSize())
 
diff --git a/server/cluster/metadata/cluster_metadata.go 
b/server/cluster/metadata/cluster_metadata.go
index ab9b293..0ae03b6 100644
--- a/server/cluster/metadata/cluster_metadata.go
+++ b/server/cluster/metadata/cluster_metadata.go
@@ -115,7 +115,8 @@ func (c *ClusterMetadata) Load(ctx context.Context) error {
                return errors.WithMessage(err, "load table manager")
        }
 
-       if err := c.topologyManager.Load(ctx); err != nil {
+       schemas := c.tableManager.GetSchemas()
+       if err := c.topologyManager.Load(ctx, schemas); err != nil {
                return errors.WithMessage(err, "load topology manager")
        }
 
@@ -279,6 +280,11 @@ func (c *ClusterMetadata) GetTable(schemaName, tableName 
string) (storage.Table,
        return c.tableManager.GetTable(schemaName, tableName)
 }
 
+// GetTableShard get the shard where the table actually exists.
+func (c *ClusterMetadata) GetTableShard(ctx context.Context, table 
storage.Table) (storage.ShardID, bool) {
+       return c.topologyManager.GetTableShardID(ctx, table)
+}
+
 func (c *ClusterMetadata) CreateTableMetadata(ctx context.Context, request 
CreateTableMetadataRequest) (CreateTableMetadataResult, error) {
        c.logger.Info("create table start", zap.String("cluster", c.Name()), 
zap.String("schemaName", request.SchemaName), zap.String("tableName", 
request.TableName))
 
@@ -392,6 +398,31 @@ func (c *ClusterMetadata) CreateTable(ctx context.Context, 
request CreateTableRe
        return ret, nil
 }
 
+func (c *ClusterMetadata) GetTableAssignedShard(ctx context.Context, 
schemaName string, tableName string) (storage.ShardID, bool, error) {
+       schema, exists := c.tableManager.GetSchema(schemaName)
+       if !exists {
+               return 0, false, errors.WithMessagef(ErrSchemaNotFound, "schema 
%s not found", schemaName)
+       }
+       shardIDs, exists := c.topologyManager.GetTableAssignedShard(ctx, 
schema.ID, tableName)
+       return shardIDs, exists, nil
+}
+
+func (c *ClusterMetadata) AssignTableToShard(ctx context.Context, schemaName 
string, tableName string, shardID storage.ShardID) error {
+       schema, exists := c.tableManager.GetSchema(schemaName)
+       if !exists {
+               return errors.WithMessagef(ErrSchemaNotFound, "schema %s not 
found", schemaName)
+       }
+       return c.topologyManager.AssignTableToShard(ctx, schema.ID, tableName, 
shardID)
+}
+
+func (c *ClusterMetadata) DeleteTableAssignedShard(ctx context.Context, 
schemaName string, tableName string) error {
+       schema, exists := c.tableManager.GetSchema(schemaName)
+       if !exists {
+               return errors.WithMessagef(ErrSchemaNotFound, "schema %s not 
found", schemaName)
+       }
+       return c.topologyManager.DeleteTableAssignedShard(ctx, schema.ID, 
tableName)
+}
+
 func (c *ClusterMetadata) GetShards() []storage.ShardID {
        return c.topologyManager.GetShards()
 }
diff --git a/server/cluster/metadata/topology_manager.go 
b/server/cluster/metadata/topology_manager.go
index 2426bb0..f744a5d 100644
--- a/server/cluster/metadata/topology_manager.go
+++ b/server/cluster/metadata/topology_manager.go
@@ -33,7 +33,7 @@ import (
 // TopologyManager manages the cluster topology, including the mapping 
relationship between shards, nodes, and tables.
 type TopologyManager interface {
        // Load load cluster topology from storage.
-       Load(ctx context.Context) error
+       Load(ctx context.Context, schemas []storage.Schema) error
        // GetVersion get cluster view version.
        GetVersion() uint64
        // GetClusterState get cluster view state.
@@ -44,6 +44,14 @@ type TopologyManager interface {
        AddTable(ctx context.Context, shardID storage.ShardID, latestVersion 
uint64, tables []storage.Table) error
        // RemoveTable remove table on target shards from cluster topology.
        RemoveTable(ctx context.Context, shardID storage.ShardID, latestVersion 
uint64, tableIDs []storage.TableID) error
+       // GetTableShardID get the shardID of the shard where the table is 
located.
+       GetTableShardID(ctx context.Context, table storage.Table) 
(storage.ShardID, bool)
+       // AssignTableToShard persistent table shard mapping, it is used to 
store assign results and make the table creation idempotent.
+       AssignTableToShard(ctx context.Context, schemaID storage.SchemaID, 
tableName string, shardID storage.ShardID) error
+       // GetTableAssignedShard get table assign result.
+       GetTableAssignedShard(ctx context.Context, schemaID storage.SchemaID, 
tableName string) (storage.ShardID, bool)
+       // DeleteTableAssignedShard delete table assign result.
+       DeleteTableAssignedShard(ctx context.Context, schemaID 
storage.SchemaID, tableName string) error
        // GetShards get all shards in cluster topology.
        GetShards() []storage.ShardID
        // GetShardNodesByID get shardNodes with shardID.
@@ -133,6 +141,8 @@ type TopologyManagerImpl struct {
        // ShardView in memory.
        shardTablesMapping map[storage.ShardID]*storage.ShardView // ShardID -> 
shardTopology
        tableShardMapping  map[storage.TableID][]storage.ShardID  // tableID -> 
ShardID
+       // Table assign result in memory.
+       tableAssignMapping map[storage.SchemaID]map[string]storage.ShardID // 
tableName -> shardID
 
        nodes map[string]storage.Node // NodeName in memory.
 }
@@ -150,11 +160,12 @@ func NewTopologyManagerImpl(logger *zap.Logger, storage 
storage.Storage, cluster
                nodeShardsMapping:  nil,
                shardTablesMapping: nil,
                tableShardMapping:  nil,
+               tableAssignMapping: nil,
                nodes:              nil,
        }
 }
 
-func (m *TopologyManagerImpl) Load(ctx context.Context) error {
+func (m *TopologyManagerImpl) Load(ctx context.Context, schemas 
[]storage.Schema) error {
        m.lock.Lock()
        defer m.lock.Unlock()
 
@@ -169,6 +180,11 @@ func (m *TopologyManagerImpl) Load(ctx context.Context) 
error {
        if err := m.loadNodes(ctx); err != nil {
                return errors.WithMessage(err, "load nodes")
        }
+
+       if err := m.loadAssignTable(ctx, schemas); err != nil {
+               return errors.WithMessage(err, "load assign table")
+       }
+
        return nil
 }
 
@@ -294,6 +310,64 @@ func (m *TopologyManagerImpl) RemoveTable(ctx 
context.Context, shardID storage.S
        return nil
 }
 
+func (m *TopologyManagerImpl) GetTableShardID(_ context.Context, table 
storage.Table) (storage.ShardID, bool) {
+       m.lock.RLock()
+       defer m.lock.RUnlock()
+
+       shardIDs, exists := m.tableShardMapping[table.ID]
+       if exists {
+               return shardIDs[0], true
+       }
+
+       return 0, false
+}
+
+func (m *TopologyManagerImpl) AssignTableToShard(ctx context.Context, schemaID 
storage.SchemaID, tableName string, shardID storage.ShardID) error {
+       m.lock.Lock()
+       defer m.lock.Unlock()
+
+       if err := m.storage.AssignTableToShard(ctx, 
storage.AssignTableToShardRequest{
+               ClusterID: m.clusterID,
+               SchemaID:  schemaID,
+               TableName: tableName,
+               ShardID:   shardID,
+       }); err != nil {
+               return errors.WithMessage(err, "storage assign table")
+       }
+
+       // Update cache im memory.
+       if _, exists := m.tableAssignMapping[schemaID]; !exists {
+               m.tableAssignMapping[schemaID] = 
make(map[string]storage.ShardID, 0)
+       }
+
+       m.tableAssignMapping[schemaID][tableName] = shardID
+
+       return nil
+}
+
+func (m *TopologyManagerImpl) GetTableAssignedShard(_ context.Context, 
schemaID storage.SchemaID, tableName string) (storage.ShardID, bool) {
+       assignResult, exists := m.tableAssignMapping[schemaID][tableName]
+       return assignResult, exists
+}
+
+func (m *TopologyManagerImpl) DeleteTableAssignedShard(ctx context.Context, 
schemaID storage.SchemaID, tableName string) error {
+       m.lock.Lock()
+       defer m.lock.Unlock()
+
+       if err := m.storage.DeleteTableAssignedShard(ctx, 
storage.DeleteTableAssignedRequest{
+               ClusterID: m.clusterID,
+               SchemaID:  schemaID,
+               TableName: tableName,
+       }); err != nil {
+               return errors.WithMessage(err, "storage delete assign table")
+       }
+
+       // Update cache im memory.
+       delete(m.tableAssignMapping[schemaID], tableName)
+
+       return nil
+}
+
 func (m *TopologyManagerImpl) GetShards() []storage.ShardID {
        m.lock.RLock()
        defer m.lock.RUnlock()
@@ -584,6 +658,23 @@ func (m *TopologyManagerImpl) loadShardViews(ctx 
context.Context) error {
        return nil
 }
 
+func (m *TopologyManagerImpl) loadAssignTable(ctx context.Context, schemas 
[]storage.Schema) error {
+       m.tableAssignMapping = 
make(map[storage.SchemaID]map[string]storage.ShardID, len(schemas))
+       for _, schema := range schemas {
+               m.tableAssignMapping[schema.ID] = 
make(map[string]storage.ShardID, 0)
+
+               listAssignTableResult, err := 
m.storage.ListTableAssignedShard(ctx, storage.ListAssignTableRequest{ClusterID: 
m.clusterID, SchemaID: schema.ID})
+               if err != nil {
+                       return errors.WithMessage(err, "storage list assign 
table")
+               }
+               for _, assignTable := range listAssignTableResult.TableAssigns {
+                       m.tableAssignMapping[schema.ID][assignTable.TableName] 
= assignTable.ShardID
+               }
+       }
+
+       return nil
+}
+
 func (m *TopologyManagerImpl) loadNodes(ctx context.Context) error {
        nodesResult, err := m.storage.ListNodes(ctx, 
storage.ListNodesRequest{ClusterID: m.clusterID})
        if err != nil {
diff --git a/server/coordinator/factory.go b/server/coordinator/factory.go
index a830fe3..9e12c7f 100644
--- a/server/coordinator/factory.go
+++ b/server/coordinator/factory.go
@@ -43,7 +43,7 @@ type Factory struct {
        idAllocator id.Allocator
        dispatch    eventdispatch.Dispatch
        storage     procedure.Storage
-       shardPicker ShardPicker
+       shardPicker *PersistShardPicker
 }
 
 type CreateTableRequest struct {
@@ -101,13 +101,13 @@ type BatchRequest struct {
        BatchType procedure.Kind
 }
 
-func NewFactory(logger *zap.Logger, allocator id.Allocator, dispatch 
eventdispatch.Dispatch, storage procedure.Storage) *Factory {
+func NewFactory(logger *zap.Logger, allocator id.Allocator, dispatch 
eventdispatch.Dispatch, storage procedure.Storage, clusterMetadata 
*metadata.ClusterMetadata) *Factory {
        return &Factory{
                idAllocator: allocator,
                dispatch:    dispatch,
                storage:     storage,
                logger:      logger,
-               shardPicker: NewLeastTableShardPicker(),
+               shardPicker: NewPersistShardPicker(clusterMetadata, 
NewLeastTableShardPicker()),
        }
 }
 
@@ -129,14 +129,24 @@ func (f *Factory) makeCreateTableProcedure(ctx 
context.Context, request CreateTa
        }
        snapshot := request.ClusterMetadata.GetClusterSnapshot()
 
-       shards, err := f.shardPicker.PickShards(ctx, snapshot, 1)
+       var targetShardID storage.ShardID
+       shardID, exists, err := 
request.ClusterMetadata.GetTableAssignedShard(ctx, 
request.SourceReq.SchemaName, request.SourceReq.Name)
        if err != nil {
-               f.logger.Error("pick table shard", zap.Error(err))
-               return nil, errors.WithMessage(err, "pick table shard")
+               return nil, err
        }
-       if len(shards) != 1 {
-               f.logger.Error("pick table shards length not equal 1", 
zap.Int("shards", len(shards)))
-               return nil, errors.WithMessagef(procedure.ErrPickShard, "pick 
table shard, shards length:%d", len(shards))
+       if exists {
+               targetShardID = shardID
+       } else {
+               shards, err := f.shardPicker.PickShards(ctx, snapshot, 
request.SourceReq.GetSchemaName(), []string{request.SourceReq.GetName()})
+               if err != nil {
+                       f.logger.Error("pick table shard", zap.Error(err))
+                       return nil, errors.WithMessage(err, "pick table shard")
+               }
+               if len(shards) != 1 {
+                       f.logger.Error("pick table shards length not equal 1", 
zap.Int("shards", len(shards)))
+                       return nil, errors.WithMessagef(procedure.ErrPickShard, 
"pick table shard, shards length:%d", len(shards))
+               }
+               targetShardID = shards[request.SourceReq.GetName()].ID
        }
 
        return createtable.NewProcedure(createtable.ProcedureParams{
@@ -144,7 +154,7 @@ func (f *Factory) makeCreateTableProcedure(ctx 
context.Context, request CreateTa
                ClusterMetadata: request.ClusterMetadata,
                ClusterSnapshot: snapshot,
                ID:              id,
-               ShardID:         shards[0].ID,
+               ShardID:         targetShardID,
                SourceReq:       request.SourceReq,
                OnSucceeded:     request.OnSucceeded,
                OnFailed:        request.OnFailed,
@@ -164,7 +174,7 @@ func (f *Factory) makeCreatePartitionTableProcedure(ctx 
context.Context, request
                nodeNames[shardNode.NodeName] = 1
        }
 
-       subTableShards, err := f.shardPicker.PickShards(ctx, snapshot, 
len(request.SourceReq.PartitionTableInfo.SubTableNames))
+       subTableShards, err := f.shardPicker.PickShards(ctx, snapshot, 
request.SourceReq.GetSchemaName(), 
request.SourceReq.PartitionTableInfo.SubTableNames)
        if err != nil {
                return nil, errors.WithMessage(err, "pick sub table shards")
        }
diff --git a/server/coordinator/factory_test.go 
b/server/coordinator/factory_test.go
index 336255a..04c83c6 100644
--- a/server/coordinator/factory_test.go
+++ b/server/coordinator/factory_test.go
@@ -39,7 +39,7 @@ func setupFactory(t *testing.T) (*coordinator.Factory, 
*metadata.ClusterMetadata
        dispatch := test.MockDispatch{}
        allocator := test.MockIDAllocator{}
        storage := test.NewTestStorage(t)
-       f := coordinator.NewFactory(zap.NewNop(), allocator, dispatch, storage)
+       f := coordinator.NewFactory(zap.NewNop(), allocator, dispatch, storage, 
c.GetMetadata())
 
        return f, c.GetMetadata()
 }
diff --git a/server/coordinator/persist_shard_picker.go 
b/server/coordinator/persist_shard_picker.go
new file mode 100644
index 0000000..b368422
--- /dev/null
+++ b/server/coordinator/persist_shard_picker.go
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package coordinator
+
+import (
+       "context"
+
+       "github.com/apache/incubator-horaedb-meta/server/cluster/metadata"
+       "github.com/apache/incubator-horaedb-meta/server/storage"
+)
+
+type PersistShardPicker struct {
+       cluster  *metadata.ClusterMetadata
+       internal ShardPicker
+}
+
+func NewPersistShardPicker(cluster *metadata.ClusterMetadata, internal 
ShardPicker) *PersistShardPicker {
+       return &PersistShardPicker{cluster: cluster, internal: internal}
+}
+
+func (p *PersistShardPicker) PickShards(ctx context.Context, snapshot 
metadata.Snapshot, schemaName string, tableNames []string) 
(map[string]storage.ShardNode, error) {
+       result := map[string]storage.ShardNode{}
+
+       shardNodeMap := make(map[storage.ShardID]storage.ShardNode, 
len(tableNames))
+       for _, shardNode := range snapshot.Topology.ClusterView.ShardNodes {
+               shardNodeMap[shardNode.ID] = shardNode
+       }
+
+       var missingTables []string
+       // If table assign has been created, just reuse it.
+       for i := 0; i < len(tableNames); i++ {
+               shardID, exists, err := p.cluster.GetTableAssignedShard(ctx, 
schemaName, tableNames[i])
+               if err != nil {
+                       return map[string]storage.ShardNode{}, err
+               }
+               if exists {
+                       result[tableNames[i]] = shardNodeMap[shardID]
+               } else {
+                       missingTables = append(missingTables, tableNames[i])
+               }
+       }
+
+       // All table has been assigned to shard.
+       if len(missingTables) == 0 {
+               return result, nil
+       }
+
+       // No table assign has been created, try to pick shard and save table 
assigns.
+       shardNodes, err := p.internal.PickShards(ctx, snapshot, 
len(missingTables))
+       if err != nil {
+               return map[string]storage.ShardNode{}, err
+       }
+
+       for i, shardNode := range shardNodes {
+               result[missingTables[i]] = shardNode
+               err = p.cluster.AssignTableToShard(ctx, schemaName, 
missingTables[i], shardNode.ID)
+               if err != nil {
+                       return map[string]storage.ShardNode{}, err
+               }
+       }
+
+       return result, nil
+}
diff --git a/server/coordinator/persist_shard_picker_test.go 
b/server/coordinator/persist_shard_picker_test.go
new file mode 100644
index 0000000..67e3aa7
--- /dev/null
+++ b/server/coordinator/persist_shard_picker_test.go
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package coordinator_test
+
+import (
+       "context"
+       "testing"
+
+       "github.com/apache/incubator-horaedb-meta/server/cluster/metadata"
+       "github.com/apache/incubator-horaedb-meta/server/coordinator"
+       
"github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/test"
+       "github.com/apache/incubator-horaedb-meta/server/storage"
+       "github.com/stretchr/testify/require"
+)
+
+func TestPersistShardPicker(t *testing.T) {
+       re := require.New(t)
+       ctx := context.Background()
+
+       c := test.InitStableCluster(ctx, t)
+
+       persistShardPicker := 
coordinator.NewPersistShardPicker(c.GetMetadata(), 
coordinator.NewLeastTableShardPicker())
+       pickResult, err := persistShardPicker.PickShards(ctx, 
c.GetMetadata().GetClusterSnapshot(), test.TestSchemaName, 
[]string{test.TestTableName0})
+       re.NoError(err)
+       re.Equal(len(pickResult), 1)
+
+       createResult, err := c.GetMetadata().CreateTable(ctx, 
metadata.CreateTableRequest{
+               ShardID:       pickResult[test.TestTableName0].ID,
+               LatestVersion: 0,
+               SchemaName:    test.TestSchemaName,
+               TableName:     test.TestTableName0,
+               PartitionInfo: storage.PartitionInfo{Info: nil},
+       })
+       re.NoError(err)
+       re.Equal(test.TestTableName0, createResult.Table.Name)
+
+       // Try to pick shard for same table after the table is created.
+       newPickResult, err := persistShardPicker.PickShards(ctx, 
c.GetMetadata().GetClusterSnapshot(), test.TestSchemaName, 
[]string{test.TestTableName0})
+       re.NoError(err)
+       re.Equal(len(newPickResult), 1)
+       re.Equal(newPickResult[test.TestTableName0], 
pickResult[test.TestTableName0])
+
+       // Try to pick shard for another table.
+       pickResult, err = persistShardPicker.PickShards(ctx, 
c.GetMetadata().GetClusterSnapshot(), test.TestSchemaName, 
[]string{test.TestTableName1})
+       re.NoError(err)
+       re.Equal(len(pickResult), 1)
+
+       err = c.GetMetadata().DropTable(ctx, metadata.DropTableRequest{
+               SchemaName:    test.TestSchemaName,
+               TableName:     test.TestTableName0,
+               ShardID:       pickResult[test.TestTableName0].ID,
+               LatestVersion: 0,
+       })
+       re.NoError(err)
+
+       // Try to pick shard for table1 after drop table0.
+       newPickResult, err = persistShardPicker.PickShards(ctx, 
c.GetMetadata().GetClusterSnapshot(), test.TestSchemaName, 
[]string{test.TestTableName1})
+       re.NoError(err)
+       re.Equal(len(pickResult), 1)
+       re.Equal(newPickResult[test.TestTableName1], 
pickResult[test.TestTableName1])
+
+       err = c.GetMetadata().DeleteTableAssignedShard(ctx, 
test.TestSchemaName, test.TestTableName1)
+       re.NoError(err)
+
+       // Try to pick another for table1 after drop table1 assign result.
+       newPickResult, err = persistShardPicker.PickShards(ctx, 
c.GetMetadata().GetClusterSnapshot(), test.TestSchemaName, 
[]string{test.TestTableName1})
+       re.NoError(err)
+       re.Equal(len(pickResult), 1)
+       re.NotEqual(newPickResult[test.TestTableName1], 
pickResult[test.TestTableName1])
+}
diff --git a/server/coordinator/procedure/ddl/createtable/create_table.go 
b/server/coordinator/procedure/ddl/createtable/create_table.go
index 49138a4..292f090 100644
--- a/server/coordinator/procedure/ddl/createtable/create_table.go
+++ b/server/coordinator/procedure/ddl/createtable/create_table.go
@@ -37,30 +37,34 @@ import (
 )
 
 const (
-       eventCreateMetadata = "EventCreateMetadata"
-       eventCreateOnShard  = "EventCreateOnShard"
-       eventFinish         = "EventFinish"
-
-       stateBegin          = "StateBegin"
-       stateCreateMetadata = "StateCreateMetadata"
-       stateCreateOnShard  = "StateCreateOnShard"
-       stateFinish         = "StateFinish"
+       eventCheckTableExists = "EventCheckTableExists"
+       eventCreateMetadata   = "EventCreateMetadata"
+       eventCreateOnShard    = "EventCreateOnShard"
+       eventFinish           = "EventFinish"
+
+       stateBegin            = "StateBegin"
+       stateCheckTableExists = "StateCheckTableExists"
+       stateCreateMetadata   = "StateCreateMetadata"
+       stateCreateOnShard    = "StateCreateOnShard"
+       stateFinish           = "StateFinish"
 )
 
 var (
        createTableEvents = fsm.Events{
-               {Name: eventCreateMetadata, Src: []string{stateBegin}, Dst: 
stateCreateMetadata},
+               {Name: eventCheckTableExists, Src: []string{stateBegin}, Dst: 
stateCheckTableExists},
+               {Name: eventCreateMetadata, Src: 
[]string{stateCheckTableExists}, Dst: stateCreateMetadata},
                {Name: eventCreateOnShard, Src: []string{stateCreateMetadata}, 
Dst: stateCreateOnShard},
                {Name: eventFinish, Src: []string{stateCreateOnShard}, Dst: 
stateFinish},
        }
        createTableCallbacks = fsm.Callbacks{
-               eventCreateMetadata: createMetadataCallback,
-               eventCreateOnShard:  createOnShard,
-               eventFinish:         createFinish,
+               eventCheckTableExists: checkTableExists,
+               eventCreateMetadata:   createMetadata,
+               eventCreateOnShard:    createOnShard,
+               eventFinish:           createFinish,
        }
 )
 
-func createMetadataCallback(event *fsm.Event) {
+func checkTableExists(event *fsm.Event) {
        req, err := procedure.GetRequestFromEvent[*callbackRequest](event)
        if err != nil {
                procedure.CancelEventWithLog(event, err, "get request from 
event")
@@ -68,6 +72,42 @@ func createMetadataCallback(event *fsm.Event) {
        }
        params := req.p.params
 
+       // Check whether the table metadata already exists.
+       table, exists, err := 
params.ClusterMetadata.GetTable(params.SourceReq.GetSchemaName(), 
params.SourceReq.GetName())
+       if err != nil {
+               procedure.CancelEventWithLog(event, err, "get table metadata")
+               return
+       }
+       if !exists {
+               return
+       }
+
+       // Check whether the table shard mapping already exists.
+       _, exists = params.ClusterMetadata.GetTableShard(req.ctx, table)
+       if exists {
+               procedure.CancelEventWithLog(event, 
metadata.ErrTableAlreadyExists, "table shard already exists")
+               return
+       }
+}
+
+func createMetadata(event *fsm.Event) {
+       req, err := procedure.GetRequestFromEvent[*callbackRequest](event)
+       if err != nil {
+               procedure.CancelEventWithLog(event, err, "get request from 
event")
+               return
+       }
+       params := req.p.params
+
+       _, exists, err := 
params.ClusterMetadata.GetTable(params.SourceReq.GetSchemaName(), 
params.SourceReq.GetName())
+       if err != nil {
+               procedure.CancelEventWithLog(event, err, "get table metadata")
+               return
+       }
+       if exists {
+               log.Info("table metadata already exists", 
zap.String("schemaName", params.SourceReq.GetSchemaName()), 
zap.String("tableName", params.SourceReq.GetName()))
+               return
+       }
+
        createTableMetadataRequest := metadata.CreateTableMetadataRequest{
                SchemaName:    params.SourceReq.GetSchemaName(),
                TableName:     params.SourceReq.GetName(),
@@ -139,6 +179,11 @@ func createFinish(event *fsm.Event) {
                procedure.CancelEventWithLog(event, err, "get request from 
event")
                return
        }
+       params := req.p.params
+
+       if err := 
req.p.params.ClusterMetadata.DeleteTableAssignedShard(req.ctx, 
params.SourceReq.GetSchemaName(), params.SourceReq.GetName()); err != nil {
+               log.Warn("delete assign table failed", zap.String("schemaName", 
params.SourceReq.GetSchemaName()), zap.String("tableName", 
params.SourceReq.GetName()))
+       }
 
        assert.Assert(req.createTableResult != nil)
        if err := req.p.params.OnSucceeded(*req.createTableResult); err != nil {
@@ -229,7 +274,6 @@ func (p *Procedure) Kind() procedure.Kind {
 func (p *Procedure) Start(ctx context.Context) error {
        p.updateState(procedure.StateRunning)
 
-       // Try to load persist data.
        req := &callbackRequest{
                ctx:               ctx,
                p:                 p,
@@ -239,6 +283,11 @@ func (p *Procedure) Start(ctx context.Context) error {
        for {
                switch p.fsm.Current() {
                case stateBegin:
+                       if err := p.fsm.Event(eventCheckTableExists, req); err 
!= nil {
+                               _ = p.params.OnFailed(err)
+                               return err
+                       }
+               case stateCheckTableExists:
                        if err := p.fsm.Event(eventCreateMetadata, req); err != 
nil {
                                _ = p.params.OnFailed(err)
                                return err
diff --git a/server/coordinator/procedure/error.go 
b/server/coordinator/procedure/error.go
index 165cb2c..c59dc8f 100644
--- a/server/coordinator/procedure/error.go
+++ b/server/coordinator/procedure/error.go
@@ -23,6 +23,7 @@ import "github.com/apache/incubator-horaedb-meta/pkg/coderr"
 
 var (
        ErrShardLeaderNotFound     = coderr.NewCodeError(coderr.Internal, 
"shard leader not found")
+       ErrShardNotMatch           = coderr.NewCodeError(coderr.Internal, 
"target shard not match to persis data")
        ErrProcedureNotFound       = coderr.NewCodeError(coderr.Internal, 
"procedure not found")
        ErrClusterConfigChanged    = coderr.NewCodeError(coderr.Internal, 
"cluster config changed")
        ErrTableNotExists          = coderr.NewCodeError(coderr.Internal, 
"table not exists")
diff --git a/server/coordinator/scheduler/manager/scheduler_manager_test.go 
b/server/coordinator/scheduler/manager/scheduler_manager_test.go
index 15d2bd6..7edbf5e 100644
--- a/server/coordinator/scheduler/manager/scheduler_manager_test.go
+++ b/server/coordinator/scheduler/manager/scheduler_manager_test.go
@@ -44,7 +44,7 @@ func TestSchedulerManager(t *testing.T) {
        dispatch := test.MockDispatch{}
        allocator := test.MockIDAllocator{}
        s := test.NewTestStorage(t)
-       f := coordinator.NewFactory(zap.NewNop(), allocator, dispatch, s)
+       f := coordinator.NewFactory(zap.NewNop(), allocator, dispatch, s, 
c.GetMetadata())
        _, client, _ := etcdutil.PrepareEtcdServerAndClient(t)
 
        // Create scheduler manager with enableScheduler equal to false.
diff --git a/server/coordinator/scheduler/rebalanced/scheduler_test.go 
b/server/coordinator/scheduler/rebalanced/scheduler_test.go
index e451b53..8c93157 100644
--- a/server/coordinator/scheduler/rebalanced/scheduler_test.go
+++ b/server/coordinator/scheduler/rebalanced/scheduler_test.go
@@ -35,23 +35,25 @@ func TestRebalancedScheduler(t *testing.T) {
        re := require.New(t)
        ctx := context.Background()
 
-       procedureFactory := coordinator.NewFactory(zap.NewNop(), 
test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t))
-
-       s := rebalanced.NewShardScheduler(zap.NewNop(), procedureFactory, 
nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1)
-
        // EmptyCluster would be scheduled an empty procedure.
        emptyCluster := test.InitEmptyCluster(ctx, t)
+       procedureFactory := coordinator.NewFactory(zap.NewNop(), 
test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t), 
emptyCluster.GetMetadata())
+       s := rebalanced.NewShardScheduler(zap.NewNop(), procedureFactory, 
nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1)
        result, err := s.Schedule(ctx, 
emptyCluster.GetMetadata().GetClusterSnapshot())
        re.NoError(err)
        re.Empty(result)
 
        // PrepareCluster would be scheduled an empty procedure.
        prepareCluster := test.InitPrepareCluster(ctx, t)
+       procedureFactory = coordinator.NewFactory(zap.NewNop(), 
test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t), 
prepareCluster.GetMetadata())
+       s = rebalanced.NewShardScheduler(zap.NewNop(), procedureFactory, 
nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1)
        _, err = s.Schedule(ctx, 
prepareCluster.GetMetadata().GetClusterSnapshot())
        re.NoError(err)
 
        // StableCluster with all shards assigned would be scheduled a load 
balance procedure.
        stableCluster := test.InitStableCluster(ctx, t)
+       procedureFactory = coordinator.NewFactory(zap.NewNop(), 
test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t), 
stableCluster.GetMetadata())
+       s = rebalanced.NewShardScheduler(zap.NewNop(), procedureFactory, 
nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1)
        _, err = s.Schedule(ctx, 
stableCluster.GetMetadata().GetClusterSnapshot())
        re.NoError(err)
 }
diff --git a/server/coordinator/scheduler/reopen/scheduler_test.go 
b/server/coordinator/scheduler/reopen/scheduler_test.go
index 8060e2d..2b23051 100644
--- a/server/coordinator/scheduler/reopen/scheduler_test.go
+++ b/server/coordinator/scheduler/reopen/scheduler_test.go
@@ -35,12 +35,12 @@ import (
 func TestReopenShardScheduler(t *testing.T) {
        re := require.New(t)
        ctx := context.Background()
+       emptyCluster := test.InitEmptyCluster(ctx, t)
 
-       procedureFactory := coordinator.NewFactory(zap.NewNop(), 
test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t))
+       procedureFactory := coordinator.NewFactory(zap.NewNop(), 
test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t), 
emptyCluster.GetMetadata())
 
        s := reopen.NewShardScheduler(procedureFactory, 1)
 
-       emptyCluster := test.InitEmptyCluster(ctx, t)
        // ReopenShardScheduler should not schedule when cluster is not stable.
        result, err := s.Schedule(ctx, 
emptyCluster.GetMetadata().GetClusterSnapshot())
        re.NoError(err)
diff --git a/server/coordinator/scheduler/static/scheduler_test.go 
b/server/coordinator/scheduler/static/scheduler_test.go
index 929a2f0..d1c9b83 100644
--- a/server/coordinator/scheduler/static/scheduler_test.go
+++ b/server/coordinator/scheduler/static/scheduler_test.go
@@ -35,24 +35,26 @@ func TestStaticTopologyScheduler(t *testing.T) {
        re := require.New(t)
        ctx := context.Background()
 
-       procedureFactory := coordinator.NewFactory(zap.NewNop(), 
test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t))
-
-       s := static.NewShardScheduler(procedureFactory, 
nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1)
-
        // EmptyCluster would be scheduled an empty procedure.
        emptyCluster := test.InitEmptyCluster(ctx, t)
+       procedureFactory := coordinator.NewFactory(zap.NewNop(), 
test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t), 
emptyCluster.GetMetadata())
+       s := static.NewShardScheduler(procedureFactory, 
nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1)
        result, err := s.Schedule(ctx, 
emptyCluster.GetMetadata().GetClusterSnapshot())
        re.NoError(err)
        re.Empty(result)
 
        // PrepareCluster would be scheduled a transfer leader procedure.
        prepareCluster := test.InitPrepareCluster(ctx, t)
+       procedureFactory = coordinator.NewFactory(zap.NewNop(), 
test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t), 
prepareCluster.GetMetadata())
+       s = static.NewShardScheduler(procedureFactory, 
nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1)
        result, err = s.Schedule(ctx, 
prepareCluster.GetMetadata().GetClusterSnapshot())
        re.NoError(err)
        re.NotEmpty(result)
 
        // StableCluster with all shards assigned would be scheduled a transfer 
leader procedure by hash rule.
        stableCluster := test.InitStableCluster(ctx, t)
+       procedureFactory = coordinator.NewFactory(zap.NewNop(), 
test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t), 
stableCluster.GetMetadata())
+       s = static.NewShardScheduler(procedureFactory, 
nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1)
        result, err = s.Schedule(ctx, 
stableCluster.GetMetadata().GetClusterSnapshot())
        re.NoError(err)
        re.NotEmpty(result)
diff --git a/server/etcdutil/util.go b/server/etcdutil/util.go
index 5d29d80..62ef077 100644
--- a/server/etcdutil/util.go
+++ b/server/etcdutil/util.go
@@ -21,6 +21,7 @@ package etcdutil
 
 import (
        "context"
+       "path"
 
        "github.com/apache/incubator-horaedb-meta/pkg/log"
        clientv3 "go.etcd.io/etcd/client/v3"
@@ -118,7 +119,7 @@ func Scan(ctx context.Context, client *clientv3.Client, 
startKey, endKey string,
                        }
                }
 
-               // Check whether the keys is exhausted.
+               // Check whether the keys are exhausted.
                if len(resp.Kvs) < batchSize {
                        return nil
                }
@@ -126,3 +127,29 @@ func Scan(ctx context.Context, client *clientv3.Client, 
startKey, endKey string,
                lastKeyInPrevBatch = string(resp.Kvs[len(resp.Kvs)-1].Key)
        }
 }
+
+func ScanWithPrefix(ctx context.Context, client *clientv3.Client, prefix 
string, do func(key string, val []byte) error) error {
+       rangeEnd := clientv3.GetPrefixRangeEnd(prefix)
+       resp, err := client.Get(ctx, prefix, clientv3.WithRange(rangeEnd))
+       if err != nil {
+               return ErrEtcdKVGet.WithCause(err)
+       }
+       // Check whether the keys are exhausted.
+       if len(resp.Kvs) == 0 {
+               return nil
+       }
+
+       for _, item := range resp.Kvs {
+               err := do(string(item.Key), item.Value)
+               if err != nil {
+                       return err
+               }
+       }
+
+       return nil
+}
+
+// GetLastPathSegment get the last path segment from completePath, path is 
split by '/'.
+func GetLastPathSegment(completePath string) string {
+       return path.Base(path.Clean(completePath))
+}
diff --git a/server/etcdutil/util_test.go b/server/etcdutil/util_test.go
index fa2532d..6b816a4 100644
--- a/server/etcdutil/util_test.go
+++ b/server/etcdutil/util_test.go
@@ -105,3 +105,42 @@ func TestScanFailed(t *testing.T) {
        err := Scan(ctx, client, startKey, endKey, 10, do)
        r.Equal(fakeErr, err)
 }
+
+func TestScanWithPrefix(t *testing.T) {
+       r := require.New(t)
+
+       _, client, closeSrv := PrepareEtcdServerAndClient(t)
+       defer closeSrv()
+       ctx := context.Background()
+
+       // Build keys with different prefix.
+       keys := []string{}
+       keys = append(keys, "/prefix/0")
+       keys = append(keys, "/prefix/1")
+       keys = append(keys, "/diff/0")
+
+       // Put the keys.
+       for _, key := range keys {
+               // Let the value equal key for simplicity.
+               val := key
+               _, err := client.Put(ctx, key, val)
+               r.NoError(err)
+       }
+
+       var scanResult []string
+       do := func(key string, value []byte) error {
+               scanResult = append(scanResult, key)
+               return nil
+       }
+       err := ScanWithPrefix(ctx, client, "/prefix", do)
+       r.NoError(err)
+       r.Equal(len(scanResult), 2)
+}
+
+func TestGetLastPathSegment(t *testing.T) {
+       r := require.New(t)
+
+       path := "/prefix/a/b/c"
+       lastPathSegment := GetLastPathSegment(path)
+       r.Equal("c", lastPathSegment)
+}
diff --git a/server/storage/key_path.go b/server/storage/key_path.go
index 6036e59..4627bc0 100644
--- a/server/storage/key_path.go
+++ b/server/storage/key_path.go
@@ -36,6 +36,7 @@ const (
        shardView     = "shard_view"
        latestVersion = "latest_version"
        info          = "info"
+       tableAssign   = "table_assign"
 )
 
 // makeSchemaKey returns the key path to the schema meta info.
@@ -128,6 +129,19 @@ func makeNameToIDKey(rootPath string, clusterID uint32, 
schemaID uint32, tableNa
        return path.Join(rootPath, version, cluster, fmtID(uint64(clusterID)), 
schema, fmtID(uint64(schemaID)), tableNameToID, tableName)
 }
 
+// makeTableAssignKey return the tableAssign key path.
+func makeTableAssignKey(rootPath string, clusterID uint32, schemaID uint32, 
tableName string) string {
+       // Example:
+       //      v1/cluster/1/schema/1/table_assign/tableName1 -> shardID1
+       //      v1/cluster/1/schema/1/table_assign/tableName2 -> shardID2
+       return path.Join(rootPath, version, cluster, fmtID(uint64(clusterID)), 
schema, fmtID(uint64(schemaID)), tableAssign, tableName)
+}
+
+// makeTableAssignPrefixKey return the tableAssign prefix key path.
+func makeTableAssignPrefixKey(rootPath string, clusterID uint32, schemaID 
uint32) string {
+       return path.Join(rootPath, version, cluster, fmtID(uint64(clusterID)), 
schema, fmtID(uint64(schemaID)), tableAssign)
+}
+
 func fmtID(id uint64) string {
        return fmt.Sprintf("%020d", id)
 }
diff --git a/server/storage/meta.go b/server/storage/meta.go
index 842995d..b56da68 100644
--- a/server/storage/meta.go
+++ b/server/storage/meta.go
@@ -57,6 +57,13 @@ type Storage interface {
        // DeleteTable delete table by table name in specified cluster and 
schema.
        DeleteTable(ctx context.Context, req DeleteTableRequest) error
 
+       // AssignTableToShard save table assign result.
+       AssignTableToShard(ctx context.Context, req AssignTableToShardRequest) 
error
+       // DeleteTableAssignedShard delete table assign result.
+       DeleteTableAssignedShard(ctx context.Context, req 
DeleteTableAssignedRequest) error
+       // ListTableAssignedShard list table assign result.
+       ListTableAssignedShard(ctx context.Context, req ListAssignTableRequest) 
(ListTableAssignedShardResult, error)
+
        // CreateShardViews create shard views in specified cluster.
        CreateShardViews(ctx context.Context, req CreateShardViewsRequest) error
        // ListShardViews list all shard views in specified cluster.
diff --git a/server/storage/storage_impl.go b/server/storage/storage_impl.go
index 8aaafdd..e6b130e 100644
--- a/server/storage/storage_impl.go
+++ b/server/storage/storage_impl.go
@@ -418,6 +418,73 @@ func (s *metaStorageImpl) DeleteTable(ctx context.Context, 
req DeleteTableReques
        return nil
 }
 
+func (s *metaStorageImpl) AssignTableToShard(ctx context.Context, req 
AssignTableToShardRequest) error {
+       key := makeTableAssignKey(s.rootPath, uint32(req.ClusterID), 
uint32(req.SchemaID), req.TableName)
+
+       // Check if the key exists, if not,save table assign result; Otherwise, 
the table assign result already exists and return an error.
+       keyMissing := clientv3util.KeyMissing(key)
+       opCreateAssignTable := clientv3.OpPut(key, 
strconv.Itoa(int(req.ShardID)))
+
+       resp, err := s.client.Txn(ctx).
+               If(keyMissing).
+               Then(opCreateAssignTable).
+               Commit()
+       if err != nil {
+               return errors.WithMessagef(err, "create assign table, 
clusterID:%d, schemaID:%d, key:%s", req.ClusterID, req.ShardID, key)
+       }
+       if !resp.Succeeded {
+               return ErrCreateSchemaAgain.WithCausef("assign table may 
already exist, clusterID:%d, schemaID:%d, key:%s, resp:%v", req.ClusterID, 
req.SchemaID, key, resp)
+       }
+
+       return nil
+}
+
+func (s *metaStorageImpl) DeleteTableAssignedShard(ctx context.Context, req 
DeleteTableAssignedRequest) error {
+       key := makeTableAssignKey(s.rootPath, uint32(req.ClusterID), 
uint32(req.SchemaID), req.TableName)
+
+       keyExists := clientv3util.KeyExists(key)
+       opDeleteAssignTable := clientv3.OpDelete(key)
+
+       resp, err := s.client.Txn(ctx).
+               If(keyExists).
+               Then(opDeleteAssignTable).
+               Commit()
+       if err != nil {
+               return errors.WithMessagef(err, "delete assign table, 
clusterID:%d, schemaID:%d, tableName:%s", req.ClusterID, req.SchemaID, 
req.TableName)
+       }
+       if !resp.Succeeded {
+               return ErrDeleteTableAgain.WithCausef("assign table may have 
been deleted, clusterID:%d, schemaID:%d, tableName:%s", req.ClusterID, 
req.SchemaID, req.TableName)
+       }
+
+       return nil
+}
+
+func (s *metaStorageImpl) ListTableAssignedShard(ctx context.Context, req 
ListAssignTableRequest) (ListTableAssignedShardResult, error) {
+       key := makeTableAssignPrefixKey(s.rootPath, uint32(req.ClusterID), 
uint32(req.SchemaID))
+       rangeLimit := s.opts.MaxScanLimit
+
+       var tableAssigns []TableAssign
+       do := func(key string, value []byte) error {
+               tableName := etcdutil.GetLastPathSegment(key)
+               shardIDStr := string(value)
+               shardID, err := strconv.ParseUint(shardIDStr, 10, 32)
+               if err != nil {
+                       return err
+               }
+               tableAssigns = append(tableAssigns, TableAssign{
+                       TableName: tableName,
+                       ShardID:   ShardID(shardID),
+               })
+               return nil
+       }
+
+       if err := etcdutil.ScanWithPrefix(ctx, s.client, key, do); err != nil {
+               return ListTableAssignedShardResult{}, errors.WithMessagef(err, 
"scan tables, clusterID:%d, schemaID:%d, prefix key:%s, range limit:%d", 
req.ClusterID, req.SchemaID, key, rangeLimit)
+       }
+
+       return ListTableAssignedShardResult{TableAssigns: tableAssigns}, nil
+}
+
 func (s *metaStorageImpl) createNShardViews(ctx context.Context, clusterID 
ClusterID, shardViews []ShardView, ifConds []clientv3.Cmp, opCreates 
[]clientv3.Op) error {
        for _, shardView := range shardViews {
                shardViewPB := convertShardViewToPB(shardView)
diff --git a/server/storage/types.go b/server/storage/types.go
index baff5e2..5711283 100644
--- a/server/storage/types.go
+++ b/server/storage/types.go
@@ -141,6 +141,28 @@ type DeleteTableRequest struct {
        TableName string
 }
 
+type AssignTableToShardRequest struct {
+       ClusterID ClusterID
+       SchemaID  SchemaID
+       TableName string
+       ShardID   ShardID
+}
+
+type DeleteTableAssignedRequest struct {
+       ClusterID ClusterID
+       SchemaID  SchemaID
+       TableName string
+}
+
+type ListAssignTableRequest struct {
+       ClusterID ClusterID
+       SchemaID  SchemaID
+}
+
+type ListTableAssignedShardResult struct {
+       TableAssigns []TableAssign
+}
+
 type CreateShardViewsRequest struct {
        ClusterID  ClusterID
        ShardViews []ShardView
@@ -232,6 +254,11 @@ func (t Table) IsPartitioned() bool {
        return t.PartitionInfo.Info != nil
 }
 
+type TableAssign struct {
+       TableName string
+       ShardID   ShardID
+}
+
 type ShardView struct {
        ShardID   ShardID
        Version   uint64


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to