ShiKaiWi commented on code in PR #286:
URL: 
https://github.com/apache/incubator-horaedb-meta/pull/286#discussion_r1446960764


##########
server/cluster/metadata/topology_manager.go:
##########
@@ -291,6 +307,64 @@ func (m *TopologyManagerImpl) RemoveTable(ctx 
context.Context, shardID storage.S
        return nil
 }
 
+func (m *TopologyManagerImpl) GetTableShard(_ context.Context, table 
storage.Table) (storage.ShardID, bool) {
+       m.lock.RLock()
+       defer m.lock.RUnlock()
+
+       shardIDs, exists := m.tableShardMapping[table.ID]
+       if exists {
+               return shardIDs[0], true
+       }
+
+       return 0, false
+}
+
+func (m *TopologyManagerImpl) AssignTable(ctx context.Context, schemaID 
storage.SchemaID, tableName string, shardID storage.ShardID) error {
+       m.lock.Lock()
+       defer m.lock.Unlock()
+
+       if err := m.storage.AssignTable(ctx, storage.AssignTableRequest{
+               ClusterID: m.clusterID,
+               SchemaID:  schemaID,
+               TableName: tableName,
+               ShardID:   shardID,
+       }); err != nil {
+               return errors.WithMessage(err, "storage assign table")
+       }
+
+       // Update cache im memory.
+       if _, exists := m.tableAssignMapping[schemaID]; !exists {
+               m.tableAssignMapping[schemaID] = 
make(map[string]storage.ShardID, 0)
+       }
+
+       m.tableAssignMapping[schemaID][tableName] = shardID
+
+       return nil
+}
+
+func (m *TopologyManagerImpl) GetAssignTableResult(_ context.Context, schemaID 
storage.SchemaID, tableName string) (storage.ShardID, bool) {
+       assignResult, exists := m.tableAssignMapping[schemaID][tableName]
+       return assignResult, exists
+}
+
+func (m *TopologyManagerImpl) DeleteAssignTable(ctx context.Context, schemaID 
storage.SchemaID, tableName string) error {

Review Comment:
   ```suggestion
   func (m *TopologyManagerImpl) DeleteTableAssignedShard(ctx context.Context, 
schemaID storage.SchemaID, tableName string) error {
   ```



##########
server/cluster/metadata/topology_manager.go:
##########
@@ -291,6 +307,64 @@ func (m *TopologyManagerImpl) RemoveTable(ctx 
context.Context, shardID storage.S
        return nil
 }
 
+func (m *TopologyManagerImpl) GetTableShard(_ context.Context, table 
storage.Table) (storage.ShardID, bool) {
+       m.lock.RLock()
+       defer m.lock.RUnlock()
+
+       shardIDs, exists := m.tableShardMapping[table.ID]
+       if exists {
+               return shardIDs[0], true
+       }
+
+       return 0, false
+}
+
+func (m *TopologyManagerImpl) AssignTable(ctx context.Context, schemaID 
storage.SchemaID, tableName string, shardID storage.ShardID) error {
+       m.lock.Lock()
+       defer m.lock.Unlock()
+
+       if err := m.storage.AssignTable(ctx, storage.AssignTableRequest{
+               ClusterID: m.clusterID,
+               SchemaID:  schemaID,
+               TableName: tableName,
+               ShardID:   shardID,
+       }); err != nil {
+               return errors.WithMessage(err, "storage assign table")
+       }
+
+       // Update cache im memory.
+       if _, exists := m.tableAssignMapping[schemaID]; !exists {
+               m.tableAssignMapping[schemaID] = 
make(map[string]storage.ShardID, 0)
+       }
+
+       m.tableAssignMapping[schemaID][tableName] = shardID
+
+       return nil
+}
+
+func (m *TopologyManagerImpl) GetAssignTableResult(_ context.Context, schemaID 
storage.SchemaID, tableName string) (storage.ShardID, bool) {

Review Comment:
   ```suggestion
   func (m *TopologyManagerImpl) GetTableAssignedShard(_ context.Context, 
schemaID storage.SchemaID, tableName string) (storage.ShardID, bool) {
   ```



##########
server/coordinator/factory.go:
##########
@@ -126,22 +126,32 @@ func (f *Factory) makeCreateTableProcedure(ctx 
context.Context, request CreateTa
        }
        snapshot := request.ClusterMetadata.GetClusterSnapshot()
 
-       shards, err := f.shardPicker.PickShards(ctx, snapshot, 1)
+       var targetShardID storage.ShardID
+       shardID, exists, err := request.ClusterMetadata.GetAssignTable(ctx, 
request.SourceReq.SchemaName, request.SourceReq.Name)
        if err != nil {
-               f.logger.Error("pick table shard", zap.Error(err))
-               return nil, errors.WithMessage(err, "pick table shard")
+               return nil, err
        }
-       if len(shards) != 1 {
-               f.logger.Error("pick table shards length not equal 1", 
zap.Int("shards", len(shards)))
-               return nil, errors.WithMessagef(procedure.ErrPickShard, "pick 
table shard, shards length:%d", len(shards))
+       if exists {
+               targetShardID = shardID
+       } else {
+               shards, err := f.shardPicker.PickShards(ctx, snapshot, 1)
+               if err != nil {
+                       f.logger.Error("pick table shard", zap.Error(err))
+                       return nil, errors.WithMessage(err, "pick table shard")
+               }
+               if len(shards) != 1 {
+                       f.logger.Error("pick table shards length not equal 1", 
zap.Int("shards", len(shards)))
+                       return nil, errors.WithMessagef(procedure.ErrPickShard, 
"pick table shard, shards length:%d", len(shards))
+               }
+               targetShardID = shards[0].ID

Review Comment:
   I guess we can encapsulate the table assignment to the `ShardPicker`. Just 
provide a new `ShardPicker` wrapper around the original one, and do table 
assignment query/creation in the wrapper.



##########
server/cluster/metadata/topology_manager.go:
##########
@@ -291,6 +307,64 @@ func (m *TopologyManagerImpl) RemoveTable(ctx 
context.Context, shardID storage.S
        return nil
 }
 
+func (m *TopologyManagerImpl) GetTableShard(_ context.Context, table 
storage.Table) (storage.ShardID, bool) {
+       m.lock.RLock()
+       defer m.lock.RUnlock()
+
+       shardIDs, exists := m.tableShardMapping[table.ID]
+       if exists {
+               return shardIDs[0], true
+       }
+
+       return 0, false
+}
+
+func (m *TopologyManagerImpl) AssignTable(ctx context.Context, schemaID 
storage.SchemaID, tableName string, shardID storage.ShardID) error {

Review Comment:
   This method actually assigns the table to a picked shard, so how about 
making the name more explicit:
   ```suggestion
   func (m *TopologyManagerImpl) AssignTableToShard(ctx context.Context, 
schemaID storage.SchemaID, tableName string, shardID storage.ShardID) error {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to