This is an automated email from the ASF dual-hosted git repository. abeizn pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
commit 7d7cbbeeff958465d106e6c8157a1e3f89b104ed Author: abeizn <[email protected]> AuthorDate: Fri Aug 5 17:04:24 2022 +0800 feat: github cicd convertor --- plugins/github/impl/impl.go | 2 + plugins/github/tasks/cicd_job_convertor.go | 124 ++++++++++++++++++++++++ plugins/github/tasks/cicd_pipeline_convertor.go | 105 ++++++++++++++++++++ plugins/github/tasks/cicd_run_enricher.go | 20 +++- 4 files changed, 249 insertions(+), 2 deletions(-) diff --git a/plugins/github/impl/impl.go b/plugins/github/impl/impl.go index bdb99223..94dc18de 100644 --- a/plugins/github/impl/impl.go +++ b/plugins/github/impl/impl.go @@ -83,6 +83,8 @@ func (plugin Github) SubTaskMetas() []core.SubTaskMeta { tasks.CollectJobsMeta, tasks.ExtractJobsMeta, tasks.EnrichPipelinesMeta, + tasks.ConvertPipelinesMeta, + tasks.ConvertTasksMeta, tasks.EnrichPullRequestIssuesMeta, tasks.ConvertRepoMeta, tasks.ConvertIssuesMeta, diff --git a/plugins/github/tasks/cicd_job_convertor.go b/plugins/github/tasks/cicd_job_convertor.go new file mode 100644 index 00000000..b5318959 --- /dev/null +++ b/plugins/github/tasks/cicd_job_convertor.go @@ -0,0 +1,124 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tasks + +import ( + "fmt" + "reflect" + + "github.com/apache/incubator-devlake/plugins/core/dal" + + "github.com/apache/incubator-devlake/plugins/core" + "github.com/apache/incubator-devlake/plugins/helper" + + "github.com/apache/incubator-devlake/models/domainlayer" + "github.com/apache/incubator-devlake/models/domainlayer/devops" + "github.com/apache/incubator-devlake/models/domainlayer/didgen" + githubModels "github.com/apache/incubator-devlake/plugins/github/models" +) + +var ConvertTasksMeta = core.SubTaskMeta{ + Name: "convertTasks", + EntryPoint: ConvertTasks, + EnabledByDefault: true, + Description: "Convert tool layer table github_jobs into domain layer table cicd_tasks", + DomainTypes: []string{core.DOMAIN_TYPE_CICD}, +} + +type SimpleBranch struct { + HeadBranch string `json:"head_branch" gorm:"type:varchar(255)"` +} + +func ConvertTasks(taskCtx core.SubTaskContext) error { + db := taskCtx.GetDal() + data := taskCtx.GetData().(*GithubTaskData) + repoId := data.Repo.GithubId + + job := &githubModels.GithubJob{} + cursor, err := db.Cursor( + dal.From(job), + dal.Where("repo_id = ? and connection_id=?", repoId, data.Options.ConnectionId), + ) + if err != nil { + return err + } + defer cursor.Close() + + jobIdGen := didgen.NewDomainIdGenerator(&githubModels.GithubJob{}) + converter, err := helper.NewDataConverter(helper.DataConverterArgs{ + RawDataSubTaskArgs: helper.RawDataSubTaskArgs{ + Ctx: taskCtx, + Params: GithubApiParams{ + ConnectionId: data.Options.ConnectionId, + Owner: data.Options.Owner, + Repo: data.Options.Repo, + }, + Table: RAW_JOB_TABLE, + }, + InputRowType: reflect.TypeOf(githubModels.GithubJob{}), + Input: cursor, + Convert: func(inputRow interface{}) ([]interface{}, error) { + line := inputRow.(*githubModels.GithubJob) + + tmp := make([]*SimpleBranch, 0) + clauses := []dal.Clause{ + dal.Select("head_branch"), + dal.From("_tool_github_runs"), + dal.Where("id = ?", line.RunID), + } + err = db.All(&tmp, clauses...) + if err != nil { + return nil, err + } + + domainjob := &devops.CICDTask{ + DomainEntity: domainlayer.DomainEntity{Id: jobIdGen.Generate(data.Options.ConnectionId, repoId, line.ID)}, + Name: line.Name, + Status: line.Status, + Type: line.Type, + StatedDate: *line.StartedAt, + FinishedDate: line.CompletedAt, + } + if len(tmp) > 0 { + domainjob.PipelineId = fmt.Sprintf("%s:%s:%d:%d:%s:%s", "github", "GithubPipeline", data.Options.ConnectionId, repoId, tmp[0].HeadBranch, line.HeadSha) + } + if line.Status == "completed" { + domainjob.DurationSec = uint64(line.CompletedAt.Sub(*line.StartedAt).Seconds()) + } + if line.Conclusion == "success" { + domainjob.Result = devops.SUCCESS + } else if line.Conclusion == "cancelled" { + domainjob.Result = devops.ABORT + } else { + domainjob.Result = devops.FAILURE + } + if line.Status != "completed" { + domainjob.Result = devops.IN_PROGRESS + } + + return []interface{}{ + domainjob, + }, nil + }, + }) + if err != nil { + return err + } + + return converter.Execute() +} diff --git a/plugins/github/tasks/cicd_pipeline_convertor.go b/plugins/github/tasks/cicd_pipeline_convertor.go new file mode 100644 index 00000000..eb28abed --- /dev/null +++ b/plugins/github/tasks/cicd_pipeline_convertor.go @@ -0,0 +1,105 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tasks + +import ( + "reflect" + "strconv" + + "github.com/apache/incubator-devlake/plugins/core/dal" + + "github.com/apache/incubator-devlake/plugins/core" + "github.com/apache/incubator-devlake/plugins/helper" + + "github.com/apache/incubator-devlake/models/domainlayer" + "github.com/apache/incubator-devlake/models/domainlayer/devops" + "github.com/apache/incubator-devlake/models/domainlayer/didgen" + githubModels "github.com/apache/incubator-devlake/plugins/github/models" +) + +var ConvertPipelinesMeta = core.SubTaskMeta{ + Name: "convertPipelines", + EntryPoint: ConvertPipelines, + EnabledByDefault: true, + Description: "Convert tool layer table github_pipelines into domain layer table cicd_pipeline", + DomainTypes: []string{core.DOMAIN_TYPE_CICD}, +} + +func ConvertPipelines(taskCtx core.SubTaskContext) error { + db := taskCtx.GetDal() + data := taskCtx.GetData().(*GithubTaskData) + repoId := data.Repo.GithubId + + pipeline := &githubModels.GithubPipeline{} + cursor, err := db.Cursor( + dal.From(pipeline), + dal.Where("repo_id = ? and connection_id=?", repoId, data.Options.ConnectionId), + ) + if err != nil { + return err + } + defer cursor.Close() + + pipelineIdGen := didgen.NewDomainIdGenerator(&githubModels.GithubPipeline{}) + converter, err := helper.NewDataConverter(helper.DataConverterArgs{ + RawDataSubTaskArgs: helper.RawDataSubTaskArgs{ + Ctx: taskCtx, + Params: GithubApiParams{ + ConnectionId: data.Options.ConnectionId, + Owner: data.Options.Owner, + Repo: data.Options.Repo, + }, + Table: RAW_RUN_TABLE, + }, + InputRowType: reflect.TypeOf(githubModels.GithubPipeline{}), + Input: cursor, + Convert: func(inputRow interface{}) ([]interface{}, error) { + line := inputRow.(*githubModels.GithubPipeline) + domainPipeline := &devops.CICDPipeline{ + DomainEntity: domainlayer.DomainEntity{Id: pipelineIdGen.Generate(data.Options.ConnectionId, repoId, line.Branch, line.Commit)}, + CommitSha: line.Commit, + Branch: line.Branch, + Repo: strconv.Itoa(repoId), + Status: line.Status, + Type: line.Type, + DurationSec: uint64(line.Duration), + CreatedDate: *line.StartedDate, + FinishedDate: line.FinishedDate, + } + if line.Result == "success" { + domainPipeline.Result = devops.SUCCESS + } else if line.Result == "cancelled" { + domainPipeline.Result = devops.ABORT + } else { + domainPipeline.Result = devops.FAILURE + } + if line.Status != "completed" { + domainPipeline.Result = devops.IN_PROGRESS + } + + return []interface{}{ + domainPipeline, + }, nil + }, + }) + if err != nil { + return err + } + + return converter.Execute() +} diff --git a/plugins/github/tasks/cicd_run_enricher.go b/plugins/github/tasks/cicd_run_enricher.go index ee90e7d6..ccd330f6 100644 --- a/plugins/github/tasks/cicd_run_enricher.go +++ b/plugins/github/tasks/cicd_run_enricher.go @@ -18,11 +18,15 @@ limitations under the License. package tasks import ( + "encoding/json" + "github.com/apache/incubator-devlake/plugins/core" "github.com/apache/incubator-devlake/plugins/core/dal" githubModels "github.com/apache/incubator-devlake/plugins/github/models" ) +const TOOL_PIPELINE_TABLE = "_tool_github_pipelines" + var EnrichPipelinesMeta = core.SubTaskMeta{ Name: "enrichPipelines", EntryPoint: EnrichPipelines, @@ -37,7 +41,7 @@ func EnrichPipelines(taskCtx core.SubTaskContext) (err error) { repoId := data.Repo.GithubId cursor, err := db.Cursor( - dal.Select("head_sha, head_branch, status, conclusion, github_created_at, github_updated_at, run_attempt, run_started_at"), + dal.Select("head_sha, head_branch, status, conclusion, github_created_at, github_updated_at, run_attempt, run_started_at, _raw_data_id"), dal.From(&githubModels.GithubRun{}), dal.Orderby("head_sha, github_created_at"), ) @@ -46,6 +50,15 @@ func EnrichPipelines(taskCtx core.SubTaskContext) (err error) { } defer cursor.Close() + apiParamsJson, err := json.Marshal(GithubApiParams{ + ConnectionId: data.Options.ConnectionId, + Owner: data.Options.Owner, + Repo: data.Options.Repo, + }) + if err != nil { + return err + } + for cursor.Next() { entity := &githubModels.GithubPipeline{} var item githubModels.GithubRun @@ -55,6 +68,9 @@ func EnrichPipelines(taskCtx core.SubTaskContext) (err error) { } if item.HeadSha != entity.Commit { + entity.NoPKModel.RawDataId = item.NoPKModel.RawDataId + entity.NoPKModel.RawDataTable = RAW_RUN_TABLE + entity.NoPKModel.RawDataParams = string(apiParamsJson) entity.ConnectionId = data.Options.ConnectionId entity.RepoId = repoId entity.Commit = item.HeadSha @@ -84,7 +100,7 @@ func EnrichPipelines(taskCtx core.SubTaskContext) (err error) { } } - err := db.CreateOrUpdate(entity) + err = db.CreateOrUpdate(entity) if err != nil { return err }
