ShiKaiWi commented on code in PR #286:
URL:
https://github.com/apache/incubator-horaedb-meta/pull/286#discussion_r1446960764
##########
server/cluster/metadata/topology_manager.go:
##########
@@ -291,6 +307,64 @@ func (m *TopologyManagerImpl) RemoveTable(ctx
context.Context, shardID storage.S
return nil
}
+func (m *TopologyManagerImpl) GetTableShard(_ context.Context, table
storage.Table) (storage.ShardID, bool) {
+ m.lock.RLock()
+ defer m.lock.RUnlock()
+
+ shardIDs, exists := m.tableShardMapping[table.ID]
+ if exists {
+ return shardIDs[0], true
+ }
+
+ return 0, false
+}
+
+func (m *TopologyManagerImpl) AssignTable(ctx context.Context, schemaID
storage.SchemaID, tableName string, shardID storage.ShardID) error {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+
+ if err := m.storage.AssignTable(ctx, storage.AssignTableRequest{
+ ClusterID: m.clusterID,
+ SchemaID: schemaID,
+ TableName: tableName,
+ ShardID: shardID,
+ }); err != nil {
+ return errors.WithMessage(err, "storage assign table")
+ }
+
+ // Update cache im memory.
+ if _, exists := m.tableAssignMapping[schemaID]; !exists {
+ m.tableAssignMapping[schemaID] =
make(map[string]storage.ShardID, 0)
+ }
+
+ m.tableAssignMapping[schemaID][tableName] = shardID
+
+ return nil
+}
+
+func (m *TopologyManagerImpl) GetAssignTableResult(_ context.Context, schemaID
storage.SchemaID, tableName string) (storage.ShardID, bool) {
+ assignResult, exists := m.tableAssignMapping[schemaID][tableName]
+ return assignResult, exists
+}
+
+func (m *TopologyManagerImpl) DeleteAssignTable(ctx context.Context, schemaID
storage.SchemaID, tableName string) error {
Review Comment:
```suggestion
func (m *TopologyManagerImpl) DeleteTableAssignedShard(ctx context.Context,
schemaID storage.SchemaID, tableName string) error {
```
##########
server/cluster/metadata/topology_manager.go:
##########
@@ -291,6 +307,64 @@ func (m *TopologyManagerImpl) RemoveTable(ctx
context.Context, shardID storage.S
return nil
}
+func (m *TopologyManagerImpl) GetTableShard(_ context.Context, table
storage.Table) (storage.ShardID, bool) {
+ m.lock.RLock()
+ defer m.lock.RUnlock()
+
+ shardIDs, exists := m.tableShardMapping[table.ID]
+ if exists {
+ return shardIDs[0], true
+ }
+
+ return 0, false
+}
+
+func (m *TopologyManagerImpl) AssignTable(ctx context.Context, schemaID
storage.SchemaID, tableName string, shardID storage.ShardID) error {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+
+ if err := m.storage.AssignTable(ctx, storage.AssignTableRequest{
+ ClusterID: m.clusterID,
+ SchemaID: schemaID,
+ TableName: tableName,
+ ShardID: shardID,
+ }); err != nil {
+ return errors.WithMessage(err, "storage assign table")
+ }
+
+ // Update cache im memory.
+ if _, exists := m.tableAssignMapping[schemaID]; !exists {
+ m.tableAssignMapping[schemaID] =
make(map[string]storage.ShardID, 0)
+ }
+
+ m.tableAssignMapping[schemaID][tableName] = shardID
+
+ return nil
+}
+
+func (m *TopologyManagerImpl) GetAssignTableResult(_ context.Context, schemaID
storage.SchemaID, tableName string) (storage.ShardID, bool) {
Review Comment:
```suggestion
func (m *TopologyManagerImpl) GetTableAssignedShard(_ context.Context,
schemaID storage.SchemaID, tableName string) (storage.ShardID, bool) {
```
##########
server/coordinator/factory.go:
##########
@@ -126,22 +126,32 @@ func (f *Factory) makeCreateTableProcedure(ctx
context.Context, request CreateTa
}
snapshot := request.ClusterMetadata.GetClusterSnapshot()
- shards, err := f.shardPicker.PickShards(ctx, snapshot, 1)
+ var targetShardID storage.ShardID
+ shardID, exists, err := request.ClusterMetadata.GetAssignTable(ctx,
request.SourceReq.SchemaName, request.SourceReq.Name)
if err != nil {
- f.logger.Error("pick table shard", zap.Error(err))
- return nil, errors.WithMessage(err, "pick table shard")
+ return nil, err
}
- if len(shards) != 1 {
- f.logger.Error("pick table shards length not equal 1",
zap.Int("shards", len(shards)))
- return nil, errors.WithMessagef(procedure.ErrPickShard, "pick
table shard, shards length:%d", len(shards))
+ if exists {
+ targetShardID = shardID
+ } else {
+ shards, err := f.shardPicker.PickShards(ctx, snapshot, 1)
+ if err != nil {
+ f.logger.Error("pick table shard", zap.Error(err))
+ return nil, errors.WithMessage(err, "pick table shard")
+ }
+ if len(shards) != 1 {
+ f.logger.Error("pick table shards length not equal 1",
zap.Int("shards", len(shards)))
+ return nil, errors.WithMessagef(procedure.ErrPickShard,
"pick table shard, shards length:%d", len(shards))
+ }
+ targetShardID = shards[0].ID
Review Comment:
I guess we can encapsulate the table assignment to the `ShardPicker`. Just
provide a new `ShardPicker` wrapper around the original one, and do table
assignment query/creation in the wrapper.
##########
server/cluster/metadata/topology_manager.go:
##########
@@ -291,6 +307,64 @@ func (m *TopologyManagerImpl) RemoveTable(ctx
context.Context, shardID storage.S
return nil
}
+func (m *TopologyManagerImpl) GetTableShard(_ context.Context, table
storage.Table) (storage.ShardID, bool) {
+ m.lock.RLock()
+ defer m.lock.RUnlock()
+
+ shardIDs, exists := m.tableShardMapping[table.ID]
+ if exists {
+ return shardIDs[0], true
+ }
+
+ return 0, false
+}
+
+func (m *TopologyManagerImpl) AssignTable(ctx context.Context, schemaID
storage.SchemaID, tableName string, shardID storage.ShardID) error {
Review Comment:
This method actually assigns the table to a picked shard, so how about
making the name more explicit:
```suggestion
func (m *TopologyManagerImpl) AssignTableToShard(ctx context.Context,
schemaID storage.SchemaID, tableName string, shardID storage.ShardID) error {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]