This is an automated email from the ASF dual-hosted git repository.
klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 0b1c3fb5 refactor(gitextractor): issue #1914 - broke down gitextractor
run() method into smaller ones, and added "Closeable" plugin support (#2050)
0b1c3fb5 is described below
commit 0b1c3fb5b8edb097bd593e5383685ba700838682
Author: Keon Amini <[email protected]>
AuthorDate: Wed Jun 15 00:47:14 2022 -0700
refactor(gitextractor): issue #1914 - broke down gitextractor run() method
into smaller ones, and added "Closeable" plugin support (#2050)
Co-authored-by: Keon Amini <[email protected]>
---
plugins/core/plugin_task.go | 6 +
plugins/gitextractor/gitextractor.go | 38 ++-
plugins/gitextractor/main.go | 27 +-
plugins/gitextractor/parser/clone.go | 73 +++--
plugins/gitextractor/parser/libgit2.go | 281 ------------------
plugins/gitextractor/parser/repo.go | 351 +++++++++++++++++++++++
plugins/gitextractor/parser/repo_creator.go | 60 ++++
plugins/gitextractor/tasks/git_repo_collector.go | 76 +++--
runner/run_pipeline.go | 5 +-
runner/run_task.go | 3 +
10 files changed, 572 insertions(+), 348 deletions(-)
diff --git a/plugins/core/plugin_task.go b/plugins/core/plugin_task.go
index 9ca9aec8..6dd1b095 100644
--- a/plugins/core/plugin_task.go
+++ b/plugins/core/plugin_task.go
@@ -97,3 +97,9 @@ type PluginTask interface {
// based on task context and user input options, return data that
shared among all subtasks
PrepareTaskData(taskCtx TaskContext, options map[string]interface{})
(interface{}, error)
}
+
+// Extends PluginTask, and invokes a Close method after all subtasks are done
+type CloseablePluginTask interface {
+ PluginTask
+ Close(taskCtx TaskContext) error
+}
diff --git a/plugins/gitextractor/gitextractor.go
b/plugins/gitextractor/gitextractor.go
index 4cf9a508..95c6c07c 100644
--- a/plugins/gitextractor/gitextractor.go
+++ b/plugins/gitextractor/gitextractor.go
@@ -19,8 +19,12 @@ package main
import (
"github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/plugins/gitextractor/models"
+ "github.com/apache/incubator-devlake/plugins/gitextractor/parser"
+ "github.com/apache/incubator-devlake/plugins/gitextractor/store"
"github.com/apache/incubator-devlake/plugins/gitextractor/tasks"
"github.com/mitchellh/mapstructure"
+ "strings"
)
var _ core.PluginMeta = (*GitExtractor)(nil)
@@ -35,7 +39,9 @@ func (plugin GitExtractor) Description() string {
// return all available subtasks, framework will run them for you in order
func (plugin GitExtractor) SubTaskMetas() []core.SubTaskMeta {
return []core.SubTaskMeta{
- tasks.CollectGitRepoMeta,
+ tasks.CollectGitCommitMeta,
+ tasks.CollectGitBranchMeta,
+ tasks.CollectGitTagMeta,
}
}
@@ -50,12 +56,40 @@ func (plugin GitExtractor) PrepareTaskData(taskCtx
core.TaskContext, options map
if err != nil {
return nil, err
}
- return op, nil
+ storage := store.NewDatabase(taskCtx, op.Url)
+ repo, err := newGitRepo(taskCtx, storage, op)
+ if err != nil {
+ return nil, err
+ }
+ return repo, nil
+}
+
+func (plugin GitExtractor) Close(taskCtx core.TaskContext) error {
+ if repo, ok := taskCtx.GetData().(*parser.GitRepo); ok {
+ if err := repo.Close(); err != nil {
+ return err
+ }
+ }
+ return nil
}
func (plugin GitExtractor) RootPkgPath() string {
return "github.com/apache/incubator-devlake/plugins/gitextractor"
}
+func newGitRepo(ctx core.TaskContext, storage models.Store, op
tasks.GitExtractorOptions) (*parser.GitRepo, error) {
+ var err error
+ var repo *parser.GitRepo
+ p := parser.NewGitRepoCreator(storage, ctx)
+ if strings.HasPrefix(op.Url, "http") {
+ repo, err = p.CloneOverHTTP(op.RepoId, op.Url, op.User,
op.Password, op.Proxy)
+ } else if url := strings.TrimPrefix(op.Url, "ssh://");
strings.HasPrefix(url, "git@") {
+ repo, err = p.CloneOverSSH(op.RepoId, url, op.PrivateKey,
op.Passphrase)
+ } else if strings.HasPrefix(op.Url, "/") {
+ repo, err = p.LocalRepo(op.Url, op.RepoId)
+ }
+ return repo, err
+}
+
// Export a variable named PluginEntry for Framework to search and load
var PluginEntry GitExtractor //nolint
diff --git a/plugins/gitextractor/main.go b/plugins/gitextractor/main.go
index 337b274e..a64de8ce 100644
--- a/plugins/gitextractor/main.go
+++ b/plugins/gitextractor/main.go
@@ -20,13 +20,11 @@ package main
import (
"context"
"flag"
- "strings"
-
"github.com/apache/incubator-devlake/logger"
+ "github.com/apache/incubator-devlake/plugins/gitextractor/tasks"
"github.com/apache/incubator-devlake/config"
"github.com/apache/incubator-devlake/plugins/gitextractor/models"
- "github.com/apache/incubator-devlake/plugins/gitextractor/parser"
"github.com/apache/incubator-devlake/plugins/gitextractor/store"
"github.com/apache/incubator-devlake/plugins/helper"
"gorm.io/driver/mysql"
@@ -76,17 +74,18 @@ func main() {
"git extractor",
nil,
)
- p := parser.NewLibGit2(storage, subTaskCtx)
- if strings.HasPrefix(*url, "http") {
- err = p.CloneOverHTTP(*id, *url, *user, *password, *proxy)
- if err != nil {
- panic(err)
- }
+ repo, err := newGitRepo(subTaskCtx.TaskContext(), storage,
tasks.GitExtractorOptions{
+ RepoId: *id,
+ Url: *url,
+ User: *user,
+ Password: *password,
+ Proxy: *proxy,
+ })
+ if err != nil {
+ panic(err)
}
- if strings.HasPrefix(*url, "/") {
- err = p.LocalRepo(*url, *id)
- if err != nil {
- panic(err)
- }
+ defer repo.Close()
+ if err = repo.CollectAll(subTaskCtx); err != nil {
+ panic(err)
}
}
diff --git a/plugins/gitextractor/parser/clone.go
b/plugins/gitextractor/parser/clone.go
index d61fcc30..d5b943f2 100644
--- a/plugins/gitextractor/parser/clone.go
+++ b/plugins/gitextractor/parser/clone.go
@@ -52,41 +52,56 @@ func cloneOverSSH(url, dir, passphrase string, pk []byte)
error {
return nil
}
-func (l *LibGit2) CloneOverHTTP(repoId, url, user, password, proxy string)
error {
- cloneOptions := &git.CloneOptions{Bare: true}
- if proxy != "" {
- cloneOptions.FetchOptions.ProxyOptions.Type =
git.ProxyTypeSpecified
- cloneOptions.FetchOptions.ProxyOptions.Url = proxy
- }
- if user != "" {
- auth := fmt.Sprintf("Authorization: Basic %s",
base64.StdEncoding.EncodeToString([]byte(user+":"+password)))
- cloneOptions.FetchOptions.Headers = []string{auth}
- }
- dir, err := ioutil.TempDir("", "gitextractor")
- if err != nil {
- return err
- }
- defer os.RemoveAll(dir)
- repo, err := git.Clone(url, dir, cloneOptions)
- if err != nil {
- return err
- }
- return l.run(repo, repoId)
+func (l *GitRepoCreator) CloneOverHTTP(repoId, url, user, password, proxy
string) (*GitRepo, error) {
+ return withTempDirectory(func(dir string) (*GitRepo, error) {
+ cloneOptions := &git.CloneOptions{Bare: true}
+ if proxy != "" {
+ cloneOptions.FetchOptions.ProxyOptions.Type =
git.ProxyTypeSpecified
+ cloneOptions.FetchOptions.ProxyOptions.Url = proxy
+ }
+ if user != "" {
+ auth := fmt.Sprintf("Authorization: Basic %s",
base64.StdEncoding.EncodeToString([]byte(user+":"+password)))
+ cloneOptions.FetchOptions.Headers = []string{auth}
+ }
+ clonedRepo, err := git.Clone(url, dir, cloneOptions)
+ if err != nil {
+ return nil, err
+ }
+ return l.newGitRepo(repoId, clonedRepo), nil
+ })
+}
+
+func (l *GitRepoCreator) CloneOverSSH(repoId, url, privateKey, passphrase
string) (*GitRepo, error) {
+ return withTempDirectory(func(dir string) (*GitRepo, error) {
+ pk, err := base64.StdEncoding.DecodeString(privateKey)
+ if err != nil {
+ return nil, err
+ }
+ err = cloneOverSSH(url, dir, passphrase, pk)
+ if err != nil {
+ return nil, err
+ }
+ return l.LocalRepo(dir, repoId)
+ })
}
-func (l *LibGit2) CloneOverSSH(repoId, url, privateKey, passphrase string)
error {
+func withTempDirectory(f func(tempDir string) (*GitRepo, error)) (*GitRepo,
error) {
dir, err := ioutil.TempDir("", "gitextractor")
if err != nil {
- return err
+ return nil, err
}
- defer os.RemoveAll(dir)
- pk, err := base64.StdEncoding.DecodeString(privateKey)
- if err != nil {
- return err
+ cleanup := func() {
+ _ = os.RemoveAll(dir)
}
- err = cloneOverSSH(url, dir, passphrase, pk)
+ defer func() {
+ if err != nil {
+ cleanup()
+ }
+ }()
+ repo, err := f(dir)
if err != nil {
- return err
+ return nil, err
}
- return l.LocalRepo(dir, repoId)
+ repo.cleanup = cleanup
+ return repo, err
}
diff --git a/plugins/gitextractor/parser/libgit2.go
b/plugins/gitextractor/parser/libgit2.go
deleted file mode 100644
index 0e907c48..00000000
--- a/plugins/gitextractor/parser/libgit2.go
+++ /dev/null
@@ -1,281 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package parser
-
-import (
- "context"
- "fmt"
-
- git "github.com/libgit2/git2go/v33"
-
- "github.com/apache/incubator-devlake/models/domainlayer"
- "github.com/apache/incubator-devlake/models/domainlayer/code"
- "github.com/apache/incubator-devlake/plugins/core"
- "github.com/apache/incubator-devlake/plugins/gitextractor/models"
-)
-
-const (
- BRANCH = "BRANCH"
- TAG = "TAG"
-)
-
-type LibGit2 struct {
- store models.Store
- logger core.Logger
- ctx context.Context // for canceling
- subTaskCtx core.SubTaskContext // for updating progress
-}
-
-func NewLibGit2(store models.Store, subTaskCtx core.SubTaskContext) *LibGit2 {
- return &LibGit2{store: store,
- logger: subTaskCtx.GetLogger(),
- ctx: subTaskCtx.GetContext(),
- subTaskCtx: subTaskCtx}
-}
-
-func (l *LibGit2) LocalRepo(repoPath, repoId string) error {
- repo, err := git.OpenRepository(repoPath)
- if err != nil {
- return err
- }
- return l.run(repo, repoId)
-}
-
-func (l *LibGit2) run(repo *git.Repository, repoId string) error {
- defer l.store.Close()
- l.subTaskCtx.SetProgress(0, -1)
-
- // collect tags
- var err error
- err = repo.Tags.Foreach(func(name string, id *git.Oid) error {
- select {
- case <-l.ctx.Done():
- return l.ctx.Err()
- default:
- break
- }
- var err1 error
- var tag *git.Tag
- var tagCommit string
- tag, _ = repo.LookupTag(id)
- if tag != nil {
- tagCommit = tag.TargetId().String()
- } else {
- tagCommit = id.String()
- }
- l.logger.Info("tagCommit", tagCommit)
- if tagCommit != "" {
- ref := &code.Ref{
- DomainEntity: domainlayer.DomainEntity{Id:
fmt.Sprintf("%s:%s", repoId, name)},
- RepoId: repoId,
- Name: name,
- CommitSha: tagCommit,
- RefType: TAG,
- }
- err1 = l.store.Refs(ref)
- if err1 != nil {
- return err1
- }
- l.subTaskCtx.IncProgress(1)
- }
- return nil
- })
- if err != nil {
- return err
- }
-
- // collect branches
- var repoInter *git.BranchIterator
- repoInter, err = repo.NewBranchIterator(git.BranchAll)
- if err != nil {
- return err
- }
- err = repoInter.ForEach(func(branch *git.Branch, branchType
git.BranchType) error {
- select {
- case <-l.ctx.Done():
- return l.ctx.Err()
- default:
- break
- }
- if branch.IsBranch() || branch.IsRemote() {
- name, err1 := branch.Name()
- if err1 != nil {
- return err1
- }
- var sha string
- if oid := branch.Target(); oid != nil {
- sha = oid.String()
- }
- ref := &code.Ref{
- DomainEntity: domainlayer.DomainEntity{Id:
fmt.Sprintf("%s:%s", repoId, name)},
- RepoId: repoId,
- Name: name,
- CommitSha: sha,
- RefType: BRANCH,
- }
- ref.IsDefault, _ = branch.IsHead()
- err1 = l.store.Refs(ref)
- if err1 != nil {
- return err1
- }
- l.subTaskCtx.IncProgress(1)
- return nil
- }
- return nil
- })
- if err != nil {
- return err
- }
-
- // collect commits
- opts, err := git.DefaultDiffOptions()
- if err != nil {
- return err
- }
- opts.NotifyCallback = func(diffSoFar *git.Diff, delta git.DiffDelta,
matchedPathSpec string) error {
- return nil
- }
-
- odb, err := repo.Odb()
- if err != nil {
- return err
- }
- err = odb.ForEach(func(id *git.Oid) error {
- select {
- case <-l.ctx.Done():
- return l.ctx.Err()
- default:
- break
- }
- commit, _ := repo.LookupCommit(id)
- if commit == nil {
- return nil
- }
- commitSha := commit.Id().String()
- l.logger.Debug("process commit: %s", commitSha)
- c := &code.Commit{
- Sha: commitSha,
- Message: commit.Message(),
- }
- author := commit.Author()
- if author != nil {
- c.AuthorName = author.Name
- c.AuthorEmail = author.Email
- c.AuthorId = author.Email
- c.AuthoredDate = author.When
- }
- committer := commit.Committer()
- if committer != nil {
- c.CommitterName = committer.Name
- c.CommitterEmail = committer.Email
- c.CommitterId = committer.Email
- c.CommittedDate = committer.When
- }
- var commitParents []*code.CommitParent
- for i := uint(0); i < commit.ParentCount(); i++ {
- parent := commit.Parent(i)
- if parent != nil {
- if parentId := parent.Id(); parentId != nil {
- commitParents = append(commitParents,
&code.CommitParent{
- CommitSha: c.Sha,
- ParentCommitSha:
parentId.String(),
- })
- }
- }
- }
- err2 := l.store.CommitParents(commitParents)
- if err2 != nil {
- return err2
- }
- if commit.ParentCount() > 0 {
- parent := commit.Parent(0)
- if parent != nil {
- var parentTree, tree *git.Tree
- parentTree, err2 = parent.Tree()
- if err2 != nil {
- return err2
- }
- tree, err2 = commit.Tree()
- if err2 != nil {
- return err2
- }
- var diff *git.Diff
- diff, err2 = repo.DiffTreeToTree(parentTree,
tree, &opts)
- if err2 != nil {
- return err2
- }
- var commitFile *code.CommitFile
- err2 = diff.ForEach(func(file git.DiffDelta,
progress float64) (
- git.DiffForEachHunkCallback, error) {
- if commitFile != nil {
- err2 =
l.store.CommitFiles(commitFile)
- if err2 != nil {
-
l.logger.Error("CommitFiles error:", err)
- return nil, err2
- }
- }
- commitFile = new(code.CommitFile)
- commitFile.CommitSha = c.Sha
- commitFile.FilePath = file.NewFile.Path
- return func(hunk git.DiffHunk)
(git.DiffForEachLineCallback, error) {
- return func(line git.DiffLine)
error {
- if line.Origin ==
git.DiffLineAddition {
-
commitFile.Additions += line.NumLines
- }
- if line.Origin ==
git.DiffLineDeletion {
-
commitFile.Deletions += line.NumLines
- }
- return nil
- }, nil
- }, nil
- }, git.DiffDetailLines)
- if err2 != nil {
- return err2
- }
- if commitFile != nil {
- err2 = l.store.CommitFiles(commitFile)
- if err2 != nil {
- l.logger.Error("CommitFiles
error:", err)
- }
- }
- var stats *git.DiffStats
- stats, err2 = diff.Stats()
- if err2 != nil {
- return err2
- }
- c.Additions += stats.Insertions()
- c.Deletions += stats.Deletions()
- }
- }
- err2 = l.store.Commits(c)
- if err2 != nil {
- return err2
- }
- repoCommit := &code.RepoCommit{
- RepoId: repoId,
- CommitSha: c.Sha,
- }
- err2 = l.store.RepoCommits(repoCommit)
- if err2 != nil {
- return err2
- }
- l.subTaskCtx.IncProgress(1)
- return nil
- })
- return err
-}
diff --git a/plugins/gitextractor/parser/repo.go
b/plugins/gitextractor/parser/repo.go
new file mode 100644
index 00000000..cebe5f17
--- /dev/null
+++ b/plugins/gitextractor/parser/repo.go
@@ -0,0 +1,351 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package parser
+
+import (
+ "context"
+ "fmt"
+ "github.com/apache/incubator-devlake/models/domainlayer"
+ "github.com/apache/incubator-devlake/models/domainlayer/code"
+ "github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/plugins/gitextractor/models"
+ git "github.com/libgit2/git2go/v33"
+)
+
+type GitRepo struct {
+ store models.Store
+ ctx context.Context
+ logger core.Logger
+ id string
+ repo *git.Repository
+ cleanup func()
+}
+
+func (r *GitRepo) CollectAll(subtaskCtx core.SubTaskContext) error {
+ subtaskCtx.SetProgress(0, -1)
+ err := r.CollectTags(subtaskCtx)
+ if err != nil {
+ return err
+ }
+ err = r.CollectBranches(subtaskCtx)
+ if err != nil {
+ return err
+ }
+ return r.CollectCommits(subtaskCtx)
+}
+
+func (r *GitRepo) Close() error {
+ defer func() {
+ if r.cleanup != nil {
+ r.cleanup()
+ }
+ }()
+ return r.store.Close()
+}
+
+func (r *GitRepo) CountTags() (int, error) {
+ tags, err := r.repo.Tags.List()
+ if err != nil {
+ return 0, err
+ }
+ return len(tags), nil
+}
+
+func (r *GitRepo) CountBranches() (int, error) {
+ var branchIter *git.BranchIterator
+ branchIter, err := r.repo.NewBranchIterator(git.BranchAll)
+ if err != nil {
+ return 0, err
+ }
+ count := 0
+ err = branchIter.ForEach(func(branch *git.Branch, branchType
git.BranchType) error {
+ select {
+ case <-r.ctx.Done():
+ return r.ctx.Err()
+ default:
+ break
+ }
+ if branch.IsBranch() || branch.IsRemote() {
+ count++
+ }
+ return nil
+ })
+ return count, err
+}
+
+func (r *GitRepo) CountCommits() (int, error) {
+ odb, err := r.repo.Odb()
+ if err != nil {
+ return 0, err
+ }
+ count := 0
+ err = odb.ForEach(func(id *git.Oid) error {
+ select {
+ case <-r.ctx.Done():
+ return r.ctx.Err()
+ default:
+ break
+ }
+ commit, _ := r.repo.LookupCommit(id)
+ if commit != nil {
+ count++
+ }
+ return nil
+ })
+ return count, err
+}
+
+func (r *GitRepo) CollectTags(subtaskCtx core.SubTaskContext) error {
+ return r.repo.Tags.Foreach(func(name string, id *git.Oid) error {
+ select {
+ case <-r.ctx.Done():
+ return r.ctx.Err()
+ default:
+ break
+ }
+ var err1 error
+ var tag *git.Tag
+ var tagCommit string
+ tag, _ = r.repo.LookupTag(id)
+ if tag != nil {
+ tagCommit = tag.TargetId().String()
+ } else {
+ tagCommit = id.String()
+ }
+ r.logger.Info("tagCommit", tagCommit)
+ if tagCommit != "" {
+ ref := &code.Ref{
+ DomainEntity: domainlayer.DomainEntity{Id:
fmt.Sprintf("%s:%s", r.id, name)},
+ RepoId: r.id,
+ Name: name,
+ CommitSha: tagCommit,
+ RefType: TAG,
+ }
+ err1 = r.store.Refs(ref)
+ if err1 != nil {
+ return err1
+ }
+ subtaskCtx.IncProgress(1)
+ }
+ return nil
+ })
+}
+
+func (r *GitRepo) CollectBranches(subtaskCtx core.SubTaskContext) error {
+ var repoInter *git.BranchIterator
+ repoInter, err := r.repo.NewBranchIterator(git.BranchAll)
+ if err != nil {
+ return err
+ }
+ return repoInter.ForEach(func(branch *git.Branch, branchType
git.BranchType) error {
+ select {
+ case <-r.ctx.Done():
+ return r.ctx.Err()
+ default:
+ break
+ }
+ if branch.IsBranch() || branch.IsRemote() {
+ name, err1 := branch.Name()
+ if err1 != nil {
+ return err1
+ }
+ var sha string
+ if oid := branch.Target(); oid != nil {
+ sha = oid.String()
+ }
+ ref := &code.Ref{
+ DomainEntity: domainlayer.DomainEntity{Id:
fmt.Sprintf("%s:%s", r.id, name)},
+ RepoId: r.id,
+ Name: name,
+ CommitSha: sha,
+ RefType: BRANCH,
+ }
+ ref.IsDefault, _ = branch.IsHead()
+ err1 = r.store.Refs(ref)
+ if err1 != nil {
+ return err1
+ }
+ subtaskCtx.IncProgress(1)
+ return nil
+ }
+ return nil
+ })
+}
+
+func (r *GitRepo) CollectCommits(subtaskCtx core.SubTaskContext) error {
+ opts, err := getDiffOpts()
+ if err != nil {
+ return err
+ }
+ odb, err := r.repo.Odb()
+ if err != nil {
+ return err
+ }
+ return odb.ForEach(func(id *git.Oid) error {
+ select {
+ case <-r.ctx.Done():
+ return r.ctx.Err()
+ default:
+ break
+ }
+ commit, _ := r.repo.LookupCommit(id)
+ if commit == nil {
+ return nil
+ }
+ commitSha := commit.Id().String()
+ r.logger.Debug("process commit: %s", commitSha)
+ c := &code.Commit{
+ Sha: commitSha,
+ Message: commit.Message(),
+ }
+ author := commit.Author()
+ if author != nil {
+ c.AuthorName = author.Name
+ c.AuthorEmail = author.Email
+ c.AuthorId = author.Email
+ c.AuthoredDate = author.When
+ }
+ committer := commit.Committer()
+ if committer != nil {
+ c.CommitterName = committer.Name
+ c.CommitterEmail = committer.Email
+ c.CommitterId = committer.Email
+ c.CommittedDate = committer.When
+ }
+ if err != r.storeParentCommits(commitSha, commit) {
+ return err
+ }
+ if commit.ParentCount() > 0 {
+ parent := commit.Parent(0)
+ if parent != nil {
+ var stats *git.DiffStats
+ if stats, err =
r.getDiffComparedToParent(c.Sha, commit, parent, opts); err != nil {
+ return err
+ } else {
+ c.Additions += stats.Insertions()
+ c.Deletions += stats.Deletions()
+ }
+ }
+ }
+ err = r.store.Commits(c)
+ if err != nil {
+ return err
+ }
+ repoCommit := &code.RepoCommit{
+ RepoId: r.id,
+ CommitSha: c.Sha,
+ }
+ err = r.store.RepoCommits(repoCommit)
+ if err != nil {
+ return err
+ }
+ subtaskCtx.IncProgress(1)
+ return nil
+ })
+}
+
+func (r *GitRepo) storeParentCommits(commitSha string, commit *git.Commit)
error {
+ var commitParents []*code.CommitParent
+ for i := uint(0); i < commit.ParentCount(); i++ {
+ parent := commit.Parent(i)
+ if parent != nil {
+ if parentId := parent.Id(); parentId != nil {
+ commitParents = append(commitParents,
&code.CommitParent{
+ CommitSha: commitSha,
+ ParentCommitSha: parentId.String(),
+ })
+ }
+ }
+ }
+ return r.store.CommitParents(commitParents)
+}
+
+func (r *GitRepo) getDiffComparedToParent(commitSha string, commit
*git.Commit, parent *git.Commit, opts *git.DiffOptions) (*git.DiffStats, error)
{
+ var err error
+ var parentTree, tree *git.Tree
+ parentTree, err = parent.Tree()
+ if err != nil {
+ return nil, err
+ }
+ tree, err = commit.Tree()
+ if err != nil {
+ return nil, err
+ }
+ var diff *git.Diff
+ diff, err = r.repo.DiffTreeToTree(parentTree, tree, opts)
+ if err != nil {
+ return nil, err
+ }
+ err = r.storeCommitFilesFromDiff(commitSha, diff)
+ if err != nil {
+ return nil, err
+ }
+ var stats *git.DiffStats
+ stats, err = diff.Stats()
+ if err != nil {
+ return nil, err
+ }
+ return stats, nil
+}
+
+func (r *GitRepo) storeCommitFilesFromDiff(commitSha string, diff *git.Diff)
error {
+ var commitFile *code.CommitFile
+ var err error
+ err = diff.ForEach(func(file git.DiffDelta, progress float64) (
+ git.DiffForEachHunkCallback, error) {
+ if commitFile != nil {
+ err = r.store.CommitFiles(commitFile)
+ if err != nil {
+ r.logger.Error("CommitFiles error:", err)
+ return nil, err
+ }
+ }
+ commitFile = new(code.CommitFile)
+ commitFile.CommitSha = commitSha
+ commitFile.FilePath = file.NewFile.Path
+ return func(hunk git.DiffHunk) (git.DiffForEachLineCallback,
error) {
+ return func(line git.DiffLine) error {
+ if line.Origin == git.DiffLineAddition {
+ commitFile.Additions += line.NumLines
+ }
+ if line.Origin == git.DiffLineDeletion {
+ commitFile.Deletions += line.NumLines
+ }
+ return nil
+ }, nil
+ }, nil
+ }, git.DiffDetailLines)
+ if commitFile != nil {
+ err = r.store.CommitFiles(commitFile)
+ if err != nil {
+ r.logger.Error("CommitFiles error:", err)
+ }
+ }
+ return err
+}
+
+func getDiffOpts() (*git.DiffOptions, error) {
+ opts, err := git.DefaultDiffOptions()
+ if err != nil {
+ return nil, err
+ }
+ opts.NotifyCallback = func(diffSoFar *git.Diff, delta git.DiffDelta,
matchedPathSpec string) error {
+ return nil
+ }
+ return &opts, nil
+}
diff --git a/plugins/gitextractor/parser/repo_creator.go
b/plugins/gitextractor/parser/repo_creator.go
new file mode 100644
index 00000000..d05f3914
--- /dev/null
+++ b/plugins/gitextractor/parser/repo_creator.go
@@ -0,0 +1,60 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package parser
+
+import (
+ git "github.com/libgit2/git2go/v33"
+
+ "github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/plugins/gitextractor/models"
+)
+
+const (
+ BRANCH = "BRANCH"
+ TAG = "TAG"
+)
+
+type GitRepoCreator struct {
+ store models.Store
+ taskCtx core.TaskContext
+}
+
+func NewGitRepoCreator(store models.Store, taskCtx core.TaskContext)
*GitRepoCreator {
+ return &GitRepoCreator{
+ store: store,
+ taskCtx: taskCtx,
+ }
+}
+
+func (l *GitRepoCreator) LocalRepo(repoPath, repoId string) (*GitRepo, error) {
+ repo, err := git.OpenRepository(repoPath)
+ if err != nil {
+ return nil, err
+ }
+ return l.newGitRepo(repoId, repo), nil
+}
+
+func (l *GitRepoCreator) newGitRepo(id string, repo *git.Repository) *GitRepo {
+ return &GitRepo{
+ store: l.store,
+ ctx: l.taskCtx.GetContext(),
+ logger: l.taskCtx.GetLogger(),
+ id: id,
+ repo: repo,
+ }
+}
diff --git a/plugins/gitextractor/tasks/git_repo_collector.go
b/plugins/gitextractor/tasks/git_repo_collector.go
index c5b86bd6..c750de84 100644
--- a/plugins/gitextractor/tasks/git_repo_collector.go
+++ b/plugins/gitextractor/tasks/git_repo_collector.go
@@ -19,11 +19,10 @@ package tasks
import (
"errors"
+ "github.com/apache/incubator-devlake/plugins/gitextractor/parser"
"strings"
"github.com/apache/incubator-devlake/plugins/core"
- "github.com/apache/incubator-devlake/plugins/gitextractor/parser"
- "github.com/apache/incubator-devlake/plugins/gitextractor/store"
)
type GitExtractorOptions struct {
@@ -53,27 +52,64 @@ func (o GitExtractorOptions) Valid() error {
return nil
}
-func CollectGitRepo(subTaskCtx core.SubTaskContext) error {
- var err error
- op := subTaskCtx.GetData().(GitExtractorOptions)
- storage := store.NewDatabase(subTaskCtx, op.Url)
- p := parser.NewLibGit2(storage, subTaskCtx)
- if strings.HasPrefix(op.Url, "http") {
- err = p.CloneOverHTTP(op.RepoId, op.Url, op.User, op.Password,
op.Proxy)
- } else if url := strings.TrimPrefix(op.Url, "ssh://");
strings.HasPrefix(url, "git@") {
- err = p.CloneOverSSH(op.RepoId, url, op.PrivateKey,
op.Passphrase)
- } else if strings.HasPrefix(op.Url, "/") {
- err = p.LocalRepo(op.Url, op.RepoId)
+func CollectGitCommits(subTaskCtx core.SubTaskContext) error {
+ repo := getGitRepo(subTaskCtx)
+ if count, err := repo.CountCommits(); err != nil {
+ subTaskCtx.GetLogger().Error("unable to get commit count: %v",
err)
+ subTaskCtx.SetProgress(0, -1)
+ } else {
+ subTaskCtx.SetProgress(0, count)
}
- if err != nil {
- return err
+ return repo.CollectCommits(subTaskCtx)
+}
+
+func CollectGitBranches(subTaskCtx core.SubTaskContext) error {
+ repo := getGitRepo(subTaskCtx)
+ if count, err := repo.CountBranches(); err != nil {
+ subTaskCtx.GetLogger().Error("unable to get branch count: %v",
err)
+ subTaskCtx.SetProgress(0, -1)
+ } else {
+ subTaskCtx.SetProgress(0, count)
}
- return nil
+ return repo.CollectBranches(subTaskCtx)
+}
+
+func CollectGitTags(subTaskCtx core.SubTaskContext) error {
+ repo := getGitRepo(subTaskCtx)
+ if count, err := repo.CountTags(); err != nil {
+ subTaskCtx.GetLogger().Error("unable to get tag count: %v", err)
+ subTaskCtx.SetProgress(0, -1)
+ } else {
+ subTaskCtx.SetProgress(0, count)
+ }
+ return repo.CollectTags(subTaskCtx)
+}
+
+func getGitRepo(subTaskCtx core.SubTaskContext) *parser.GitRepo {
+ repo, ok := subTaskCtx.GetData().(*parser.GitRepo)
+ if !ok {
+ panic("git repo reference not found on context")
+ }
+ return repo
+}
+
+var CollectGitCommitMeta = core.SubTaskMeta{
+ Name: "collectGitCommits",
+ EntryPoint: CollectGitCommits,
+ EnabledByDefault: true,
+ Description: "collect git commits into Domain Layer Tables",
+}
+
+var CollectGitBranchMeta = core.SubTaskMeta{
+ Name: "collectGitBranches",
+ EntryPoint: CollectGitBranches,
+ EnabledByDefault: true,
+ Description: "collect git branch into Domain Layer Tables",
}
-var CollectGitRepoMeta = core.SubTaskMeta{
- Name: "collectGitRepo",
- EntryPoint: CollectGitRepo,
+var CollectGitTagMeta = core.SubTaskMeta{
+ Name: "collectGitTags",
+ EntryPoint: CollectGitTags,
EnabledByDefault: true,
- Description: "collect git commits/branches/tags int Domain Layer
Tables",
+ Description: "collect git tag into Domain Layer Tables",
}
diff --git a/runner/run_pipeline.go b/runner/run_pipeline.go
index 724a1909..d777fc66 100644
--- a/runner/run_pipeline.go
+++ b/runner/run_pipeline.go
@@ -33,6 +33,7 @@ func RunPipeline(
pipelineId uint64,
runTasks func([]uint64) error,
) error {
+ startTime := time.Now()
// load pipeline from db
pipeline := &models.Pipeline{}
err := db.Find(pipeline, pipelineId).Error
@@ -93,7 +94,7 @@ func RunPipeline(
return err
}
}
-
- log.Info("pipeline finished: %w", err)
+ endTime := time.Now()
+ log.Info("pipeline finished in %d ms: %w",
endTime.UnixMilli()-startTime.UnixMilli(), err)
return err
}
diff --git a/runner/run_task.go b/runner/run_task.go
index 24c8fd24..a2739bd8 100644
--- a/runner/run_task.go
+++ b/runner/run_task.go
@@ -217,6 +217,9 @@ func RunPluginSubTasks(
}
taskCtx := helper.NewDefaultTaskContext(cfg, logger, db, ctx, name,
subtasksFlag, progress)
+ if closeablePlugin, ok := pluginTask.(core.CloseablePluginTask); ok {
+ defer closeablePlugin.Close(taskCtx)
+ }
taskData, err := pluginTask.PrepareTaskData(taskCtx, options)
if err != nil {
return err