This is an automated email from the ASF dual-hosted git repository. xikai pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-horaedb-meta.git
commit 475647fce2a7876cdf133b894a93dc10c05b0027 Author: CooooolFrog <[email protected]> AuthorDate: Thu Dec 7 17:09:13 2023 +0800 refactor: refactor create table procedure (#288) ## Rationale Due to the implementation problem of create table procedure, the current table creation error message of ceresmeta cannot be returned to ceresdb normally. We need to reconstruct it. ## Detailed Changes * Refactor create table procedure, make it could return error information normally. * Modify the error message returned in the service. ## Test Plan Pass CI. --- .../procedure/ddl/createtable/create_table.go | 120 ++++++++++++--------- server/service/grpc/service.go | 10 +- 2 files changed, 74 insertions(+), 56 deletions(-) diff --git a/server/coordinator/procedure/ddl/createtable/create_table.go b/server/coordinator/procedure/ddl/createtable/create_table.go index 414caaf..49138a4 100644 --- a/server/coordinator/procedure/ddl/createtable/create_table.go +++ b/server/coordinator/procedure/ddl/createtable/create_table.go @@ -37,30 +37,30 @@ import ( ) const ( - eventPrepare = "EventPrepare" - eventFailed = "EventFailed" - eventSuccess = "EventSuccess" - - stateBegin = "StateBegin" - stateWaiting = "StateWaiting" - stateFinish = "StateFinish" - stateFailed = "StateFailed" + eventCreateMetadata = "EventCreateMetadata" + eventCreateOnShard = "EventCreateOnShard" + eventFinish = "EventFinish" + + stateBegin = "StateBegin" + stateCreateMetadata = "StateCreateMetadata" + stateCreateOnShard = "StateCreateOnShard" + stateFinish = "StateFinish" ) var ( createTableEvents = fsm.Events{ - {Name: eventPrepare, Src: []string{stateBegin}, Dst: stateWaiting}, - {Name: eventSuccess, Src: []string{stateWaiting}, Dst: stateFinish}, - {Name: eventFailed, Src: []string{stateWaiting}, Dst: stateFailed}, + {Name: eventCreateMetadata, Src: []string{stateBegin}, Dst: stateCreateMetadata}, + {Name: eventCreateOnShard, Src: []string{stateCreateMetadata}, Dst: stateCreateOnShard}, + {Name: eventFinish, Src: []string{stateCreateOnShard}, Dst: stateFinish}, } createTableCallbacks = fsm.Callbacks{ - eventPrepare: prepareCallback, - eventFailed: failedCallback, - eventSuccess: successCallback, + eventCreateMetadata: createMetadataCallback, + eventCreateOnShard: createOnShard, + eventFinish: createFinish, } ) -func prepareCallback(event *fsm.Event) { +func createMetadataCallback(event *fsm.Event) { req, err := procedure.GetRequestFromEvent[*callbackRequest](event) if err != nil { procedure.CancelEventWithLog(event, err, "get request from event") @@ -73,44 +73,67 @@ func prepareCallback(event *fsm.Event) { TableName: params.SourceReq.GetName(), PartitionInfo: storage.PartitionInfo{Info: params.SourceReq.PartitionTableInfo.GetPartitionInfo()}, } - result, err := params.ClusterMetadata.CreateTableMetadata(req.ctx, createTableMetadataRequest) + _, err = params.ClusterMetadata.CreateTableMetadata(req.ctx, createTableMetadataRequest) if err != nil { procedure.CancelEventWithLog(event, err, "create table metadata") return } log.Debug("create table metadata finish", zap.String("tableName", createTableMetadataRequest.TableName)) +} + +func createOnShard(event *fsm.Event) { + req, err := procedure.GetRequestFromEvent[*callbackRequest](event) + if err != nil { + procedure.CancelEventWithLog(event, err, "get request from event") + return + } + params := req.p.params + + table, ok, err := params.ClusterMetadata.GetTable(params.SourceReq.GetSchemaName(), params.SourceReq.GetName()) + if err != nil { + procedure.CancelEventWithLog(event, err, "get table metadata failed", zap.String("schemaName", params.SourceReq.GetSchemaName()), zap.String("tableName", params.SourceReq.GetName())) + return + } + if !ok { + procedure.CancelEventWithLog(event, err, "table metadata not found", zap.String("schemaName", params.SourceReq.GetSchemaName()), zap.String("tableName", params.SourceReq.GetName())) + return + } shardVersionUpdate := metadata.ShardVersionUpdate{ ShardID: params.ShardID, LatestVersion: req.p.relatedVersionInfo.ShardWithVersion[params.ShardID], } - createTableRequest := ddl.BuildCreateTableRequest(result.Table, shardVersionUpdate, params.SourceReq) + createTableRequest := ddl.BuildCreateTableRequest(table, shardVersionUpdate, params.SourceReq) latestShardVersion, err := ddl.CreateTableOnShard(req.ctx, params.ClusterMetadata, params.Dispatch, params.ShardID, createTableRequest) if err != nil { procedure.CancelEventWithLog(event, err, "dispatch create table on shard") return } - log.Debug("dispatch createTableOnShard finish", zap.String("tableName", createTableMetadataRequest.TableName)) + log.Debug("dispatch createTableOnShard finish", zap.String("tableName", table.Name)) + + shardVersionUpdate = metadata.ShardVersionUpdate{ + ShardID: params.ShardID, + LatestVersion: latestShardVersion, + } - shardVersionUpdate.LatestVersion = latestShardVersion - err = params.ClusterMetadata.AddTableTopology(req.ctx, shardVersionUpdate, result.Table) + err = params.ClusterMetadata.AddTableTopology(req.ctx, shardVersionUpdate, table) if err != nil { procedure.CancelEventWithLog(event, err, "add table topology") return } - log.Debug("add table topology finish", zap.String("tableName", createTableMetadataRequest.TableName)) - req.createTableResult = &metadata.CreateTableResult{ - Table: result.Table, + Table: table, ShardVersionUpdate: shardVersionUpdate, } + + log.Debug("add table topology finish", zap.String("tableName", table.Name)) } -func successCallback(event *fsm.Event) { +func createFinish(event *fsm.Event) { req, err := procedure.GetRequestFromEvent[*callbackRequest](event) if err != nil { procedure.CancelEventWithLog(event, err, "get request from event") @@ -123,18 +146,6 @@ func successCallback(event *fsm.Event) { } } -func failedCallback(event *fsm.Event) { - req, err := procedure.GetRequestFromEvent[*callbackRequest](event) - if err != nil { - procedure.CancelEventWithLog(event, err, "get request from event") - return - } - - if err := req.p.params.OnFailed(event.Err); err != nil { - log.Error("exec failed callback failed") - } -} - // callbackRequest is fsm callbacks param. type callbackRequest struct { ctx context.Context @@ -179,6 +190,7 @@ type Procedure struct { fsm *fsm.FSM params ProcedureParams relatedVersionInfo procedure.RelatedVersionInfo + // Protect the state. lock sync.RWMutex state procedure.State @@ -217,29 +229,35 @@ func (p *Procedure) Kind() procedure.Kind { func (p *Procedure) Start(ctx context.Context) error { p.updateState(procedure.StateRunning) + // Try to load persist data. req := &callbackRequest{ ctx: ctx, p: p, createTableResult: nil, } - if err := p.fsm.Event(eventPrepare, req); err != nil { - err1 := p.fsm.Event(eventFailed, req) - p.updateState(procedure.StateFailed) - if err1 != nil { - err = errors.WithMessagef(err, "send eventFailed, err:%v", err1) + for { + switch p.fsm.Current() { + case stateBegin: + if err := p.fsm.Event(eventCreateMetadata, req); err != nil { + _ = p.params.OnFailed(err) + return err + } + case stateCreateMetadata: + if err := p.fsm.Event(eventCreateOnShard, req); err != nil { + _ = p.params.OnFailed(err) + return err + } + case stateCreateOnShard: + if err := p.fsm.Event(eventFinish, req); err != nil { + _ = p.params.OnFailed(err) + return err + } + case stateFinish: + p.updateState(procedure.StateFinished) + return nil } - _ = p.params.OnFailed(err) - return errors.WithMessage(err, "send eventPrepare") } - - if err := p.fsm.Event(eventSuccess, req); err != nil { - _ = p.params.OnFailed(err) - return errors.WithMessage(err, "send eventSuccess") - } - - p.updateState(procedure.StateFinished) - return nil } func (p *Procedure) Cancel(_ context.Context) error { diff --git a/server/service/grpc/service.go b/server/service/grpc/service.go index b5f64f4..d4693de 100644 --- a/server/service/grpc/service.go +++ b/server/service/grpc/service.go @@ -174,7 +174,7 @@ func (s *Service) CreateTable(ctx context.Context, req *metaservicepb.CreateTabl metaClient, err := s.getForwardedMetaClient(ctx) if err != nil { - return &metaservicepb.CreateTableResponse{Header: responseHeader(err, "create table")}, nil + return &metaservicepb.CreateTableResponse{Header: responseHeader(err, err.Error())}, nil } // Forward request to the leader. @@ -188,7 +188,7 @@ func (s *Service) CreateTable(ctx context.Context, req *metaservicepb.CreateTabl c, err := clusterManager.GetCluster(ctx, req.GetHeader().GetClusterName()) if err != nil { log.Error("fail to create table", zap.Error(err)) - return &metaservicepb.CreateTableResponse{Header: responseHeader(err, "create table")}, nil + return &metaservicepb.CreateTableResponse{Header: responseHeader(err, err.Error())}, nil } errorCh := make(chan error, 1) @@ -211,13 +211,13 @@ func (s *Service) CreateTable(ctx context.Context, req *metaservicepb.CreateTabl }) if err != nil { log.Error("fail to create table, factory create procedure", zap.Error(err)) - return &metaservicepb.CreateTableResponse{Header: responseHeader(err, "create table")}, nil + return &metaservicepb.CreateTableResponse{Header: responseHeader(err, err.Error())}, nil } err = c.GetProcedureManager().Submit(ctx, p) if err != nil { log.Error("fail to create table, manager submit procedure", zap.Error(err)) - return &metaservicepb.CreateTableResponse{Header: responseHeader(err, "create table")}, nil + return &metaservicepb.CreateTableResponse{Header: responseHeader(err, err.Error())}, nil } select { @@ -239,7 +239,7 @@ func (s *Service) CreateTable(ctx context.Context, req *metaservicepb.CreateTabl }, nil case err = <-errorCh: log.Warn("create table failed", zap.String("tableName", req.Name), zap.Int64("costTime", time.Since(start).Milliseconds()), zap.Error(err)) - return &metaservicepb.CreateTableResponse{Header: responseHeader(err, "create table")}, nil + return &metaservicepb.CreateTableResponse{Header: responseHeader(err, err.Error())}, nil } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
