This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 0b1c3fb5 refactor(gitextractor): issue #1914 - broke down gitextractor 
run() method into smaller ones, and added "Closeable" plugin support (#2050)
0b1c3fb5 is described below

commit 0b1c3fb5b8edb097bd593e5383685ba700838682
Author: Keon Amini <[email protected]>
AuthorDate: Wed Jun 15 00:47:14 2022 -0700

    refactor(gitextractor): issue #1914 - broke down gitextractor run() method 
into smaller ones, and added "Closeable" plugin support (#2050)
    
    Co-authored-by: Keon Amini <[email protected]>
---
 plugins/core/plugin_task.go                      |   6 +
 plugins/gitextractor/gitextractor.go             |  38 ++-
 plugins/gitextractor/main.go                     |  27 +-
 plugins/gitextractor/parser/clone.go             |  73 +++--
 plugins/gitextractor/parser/libgit2.go           | 281 ------------------
 plugins/gitextractor/parser/repo.go              | 351 +++++++++++++++++++++++
 plugins/gitextractor/parser/repo_creator.go      |  60 ++++
 plugins/gitextractor/tasks/git_repo_collector.go |  76 +++--
 runner/run_pipeline.go                           |   5 +-
 runner/run_task.go                               |   3 +
 10 files changed, 572 insertions(+), 348 deletions(-)

diff --git a/plugins/core/plugin_task.go b/plugins/core/plugin_task.go
index 9ca9aec8..6dd1b095 100644
--- a/plugins/core/plugin_task.go
+++ b/plugins/core/plugin_task.go
@@ -97,3 +97,9 @@ type PluginTask interface {
        // based on task context and user input options, return data that 
shared among all subtasks
        PrepareTaskData(taskCtx TaskContext, options map[string]interface{}) 
(interface{}, error)
 }
+
+// Extends PluginTask, and invokes a Close method after all subtasks are done
+type CloseablePluginTask interface {
+       PluginTask
+       Close(taskCtx TaskContext) error
+}
diff --git a/plugins/gitextractor/gitextractor.go 
b/plugins/gitextractor/gitextractor.go
index 4cf9a508..95c6c07c 100644
--- a/plugins/gitextractor/gitextractor.go
+++ b/plugins/gitextractor/gitextractor.go
@@ -19,8 +19,12 @@ package main
 
 import (
        "github.com/apache/incubator-devlake/plugins/core"
+       "github.com/apache/incubator-devlake/plugins/gitextractor/models"
+       "github.com/apache/incubator-devlake/plugins/gitextractor/parser"
+       "github.com/apache/incubator-devlake/plugins/gitextractor/store"
        "github.com/apache/incubator-devlake/plugins/gitextractor/tasks"
        "github.com/mitchellh/mapstructure"
+       "strings"
 )
 
 var _ core.PluginMeta = (*GitExtractor)(nil)
@@ -35,7 +39,9 @@ func (plugin GitExtractor) Description() string {
 // return all available subtasks, framework will run them for you in order
 func (plugin GitExtractor) SubTaskMetas() []core.SubTaskMeta {
        return []core.SubTaskMeta{
-               tasks.CollectGitRepoMeta,
+               tasks.CollectGitCommitMeta,
+               tasks.CollectGitBranchMeta,
+               tasks.CollectGitTagMeta,
        }
 }
 
@@ -50,12 +56,40 @@ func (plugin GitExtractor) PrepareTaskData(taskCtx 
core.TaskContext, options map
        if err != nil {
                return nil, err
        }
-       return op, nil
+       storage := store.NewDatabase(taskCtx, op.Url)
+       repo, err := newGitRepo(taskCtx, storage, op)
+       if err != nil {
+               return nil, err
+       }
+       return repo, nil
+}
+
+func (plugin GitExtractor) Close(taskCtx core.TaskContext) error {
+       if repo, ok := taskCtx.GetData().(*parser.GitRepo); ok {
+               if err := repo.Close(); err != nil {
+                       return err
+               }
+       }
+       return nil
 }
 
 func (plugin GitExtractor) RootPkgPath() string {
        return "github.com/apache/incubator-devlake/plugins/gitextractor"
 }
 
+func newGitRepo(ctx core.TaskContext, storage models.Store, op 
tasks.GitExtractorOptions) (*parser.GitRepo, error) {
+       var err error
+       var repo *parser.GitRepo
+       p := parser.NewGitRepoCreator(storage, ctx)
+       if strings.HasPrefix(op.Url, "http") {
+               repo, err = p.CloneOverHTTP(op.RepoId, op.Url, op.User, 
op.Password, op.Proxy)
+       } else if url := strings.TrimPrefix(op.Url, "ssh://"); 
strings.HasPrefix(url, "git@") {
+               repo, err = p.CloneOverSSH(op.RepoId, url, op.PrivateKey, 
op.Passphrase)
+       } else if strings.HasPrefix(op.Url, "/") {
+               repo, err = p.LocalRepo(op.Url, op.RepoId)
+       }
+       return repo, err
+}
+
 // Export a variable named PluginEntry for Framework to search and load
 var PluginEntry GitExtractor //nolint
diff --git a/plugins/gitextractor/main.go b/plugins/gitextractor/main.go
index 337b274e..a64de8ce 100644
--- a/plugins/gitextractor/main.go
+++ b/plugins/gitextractor/main.go
@@ -20,13 +20,11 @@ package main
 import (
        "context"
        "flag"
-       "strings"
-
        "github.com/apache/incubator-devlake/logger"
+       "github.com/apache/incubator-devlake/plugins/gitextractor/tasks"
 
        "github.com/apache/incubator-devlake/config"
        "github.com/apache/incubator-devlake/plugins/gitextractor/models"
-       "github.com/apache/incubator-devlake/plugins/gitextractor/parser"
        "github.com/apache/incubator-devlake/plugins/gitextractor/store"
        "github.com/apache/incubator-devlake/plugins/helper"
        "gorm.io/driver/mysql"
@@ -76,17 +74,18 @@ func main() {
                "git extractor",
                nil,
        )
-       p := parser.NewLibGit2(storage, subTaskCtx)
-       if strings.HasPrefix(*url, "http") {
-               err = p.CloneOverHTTP(*id, *url, *user, *password, *proxy)
-               if err != nil {
-                       panic(err)
-               }
+       repo, err := newGitRepo(subTaskCtx.TaskContext(), storage, 
tasks.GitExtractorOptions{
+               RepoId:   *id,
+               Url:      *url,
+               User:     *user,
+               Password: *password,
+               Proxy:    *proxy,
+       })
+       if err != nil {
+               panic(err)
        }
-       if strings.HasPrefix(*url, "/") {
-               err = p.LocalRepo(*url, *id)
-               if err != nil {
-                       panic(err)
-               }
+       defer repo.Close()
+       if err = repo.CollectAll(subTaskCtx); err != nil {
+               panic(err)
        }
 }
diff --git a/plugins/gitextractor/parser/clone.go 
b/plugins/gitextractor/parser/clone.go
index d61fcc30..d5b943f2 100644
--- a/plugins/gitextractor/parser/clone.go
+++ b/plugins/gitextractor/parser/clone.go
@@ -52,41 +52,56 @@ func cloneOverSSH(url, dir, passphrase string, pk []byte) 
error {
        return nil
 }
 
-func (l *LibGit2) CloneOverHTTP(repoId, url, user, password, proxy string) 
error {
-       cloneOptions := &git.CloneOptions{Bare: true}
-       if proxy != "" {
-               cloneOptions.FetchOptions.ProxyOptions.Type = 
git.ProxyTypeSpecified
-               cloneOptions.FetchOptions.ProxyOptions.Url = proxy
-       }
-       if user != "" {
-               auth := fmt.Sprintf("Authorization: Basic %s", 
base64.StdEncoding.EncodeToString([]byte(user+":"+password)))
-               cloneOptions.FetchOptions.Headers = []string{auth}
-       }
-       dir, err := ioutil.TempDir("", "gitextractor")
-       if err != nil {
-               return err
-       }
-       defer os.RemoveAll(dir)
-       repo, err := git.Clone(url, dir, cloneOptions)
-       if err != nil {
-               return err
-       }
-       return l.run(repo, repoId)
+func (l *GitRepoCreator) CloneOverHTTP(repoId, url, user, password, proxy 
string) (*GitRepo, error) {
+       return withTempDirectory(func(dir string) (*GitRepo, error) {
+               cloneOptions := &git.CloneOptions{Bare: true}
+               if proxy != "" {
+                       cloneOptions.FetchOptions.ProxyOptions.Type = 
git.ProxyTypeSpecified
+                       cloneOptions.FetchOptions.ProxyOptions.Url = proxy
+               }
+               if user != "" {
+                       auth := fmt.Sprintf("Authorization: Basic %s", 
base64.StdEncoding.EncodeToString([]byte(user+":"+password)))
+                       cloneOptions.FetchOptions.Headers = []string{auth}
+               }
+               clonedRepo, err := git.Clone(url, dir, cloneOptions)
+               if err != nil {
+                       return nil, err
+               }
+               return l.newGitRepo(repoId, clonedRepo), nil
+       })
+}
+
+func (l *GitRepoCreator) CloneOverSSH(repoId, url, privateKey, passphrase 
string) (*GitRepo, error) {
+       return withTempDirectory(func(dir string) (*GitRepo, error) {
+               pk, err := base64.StdEncoding.DecodeString(privateKey)
+               if err != nil {
+                       return nil, err
+               }
+               err = cloneOverSSH(url, dir, passphrase, pk)
+               if err != nil {
+                       return nil, err
+               }
+               return l.LocalRepo(dir, repoId)
+       })
 }
 
-func (l *LibGit2) CloneOverSSH(repoId, url, privateKey, passphrase string) 
error {
+func withTempDirectory(f func(tempDir string) (*GitRepo, error)) (*GitRepo, 
error) {
        dir, err := ioutil.TempDir("", "gitextractor")
        if err != nil {
-               return err
+               return nil, err
        }
-       defer os.RemoveAll(dir)
-       pk, err := base64.StdEncoding.DecodeString(privateKey)
-       if err != nil {
-               return err
+       cleanup := func() {
+               _ = os.RemoveAll(dir)
        }
-       err = cloneOverSSH(url, dir, passphrase, pk)
+       defer func() {
+               if err != nil {
+                       cleanup()
+               }
+       }()
+       repo, err := f(dir)
        if err != nil {
-               return err
+               return nil, err
        }
-       return l.LocalRepo(dir, repoId)
+       repo.cleanup = cleanup
+       return repo, err
 }
diff --git a/plugins/gitextractor/parser/libgit2.go 
b/plugins/gitextractor/parser/libgit2.go
deleted file mode 100644
index 0e907c48..00000000
--- a/plugins/gitextractor/parser/libgit2.go
+++ /dev/null
@@ -1,281 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package parser
-
-import (
-       "context"
-       "fmt"
-
-       git "github.com/libgit2/git2go/v33"
-
-       "github.com/apache/incubator-devlake/models/domainlayer"
-       "github.com/apache/incubator-devlake/models/domainlayer/code"
-       "github.com/apache/incubator-devlake/plugins/core"
-       "github.com/apache/incubator-devlake/plugins/gitextractor/models"
-)
-
-const (
-       BRANCH = "BRANCH"
-       TAG    = "TAG"
-)
-
-type LibGit2 struct {
-       store      models.Store
-       logger     core.Logger
-       ctx        context.Context     // for canceling
-       subTaskCtx core.SubTaskContext // for updating progress
-}
-
-func NewLibGit2(store models.Store, subTaskCtx core.SubTaskContext) *LibGit2 {
-       return &LibGit2{store: store,
-               logger:     subTaskCtx.GetLogger(),
-               ctx:        subTaskCtx.GetContext(),
-               subTaskCtx: subTaskCtx}
-}
-
-func (l *LibGit2) LocalRepo(repoPath, repoId string) error {
-       repo, err := git.OpenRepository(repoPath)
-       if err != nil {
-               return err
-       }
-       return l.run(repo, repoId)
-}
-
-func (l *LibGit2) run(repo *git.Repository, repoId string) error {
-       defer l.store.Close()
-       l.subTaskCtx.SetProgress(0, -1)
-
-       // collect tags
-       var err error
-       err = repo.Tags.Foreach(func(name string, id *git.Oid) error {
-               select {
-               case <-l.ctx.Done():
-                       return l.ctx.Err()
-               default:
-                       break
-               }
-               var err1 error
-               var tag *git.Tag
-               var tagCommit string
-               tag, _ = repo.LookupTag(id)
-               if tag != nil {
-                       tagCommit = tag.TargetId().String()
-               } else {
-                       tagCommit = id.String()
-               }
-               l.logger.Info("tagCommit", tagCommit)
-               if tagCommit != "" {
-                       ref := &code.Ref{
-                               DomainEntity: domainlayer.DomainEntity{Id: 
fmt.Sprintf("%s:%s", repoId, name)},
-                               RepoId:       repoId,
-                               Name:         name,
-                               CommitSha:    tagCommit,
-                               RefType:      TAG,
-                       }
-                       err1 = l.store.Refs(ref)
-                       if err1 != nil {
-                               return err1
-                       }
-                       l.subTaskCtx.IncProgress(1)
-               }
-               return nil
-       })
-       if err != nil {
-               return err
-       }
-
-       // collect branches
-       var repoInter *git.BranchIterator
-       repoInter, err = repo.NewBranchIterator(git.BranchAll)
-       if err != nil {
-               return err
-       }
-       err = repoInter.ForEach(func(branch *git.Branch, branchType 
git.BranchType) error {
-               select {
-               case <-l.ctx.Done():
-                       return l.ctx.Err()
-               default:
-                       break
-               }
-               if branch.IsBranch() || branch.IsRemote() {
-                       name, err1 := branch.Name()
-                       if err1 != nil {
-                               return err1
-                       }
-                       var sha string
-                       if oid := branch.Target(); oid != nil {
-                               sha = oid.String()
-                       }
-                       ref := &code.Ref{
-                               DomainEntity: domainlayer.DomainEntity{Id: 
fmt.Sprintf("%s:%s", repoId, name)},
-                               RepoId:       repoId,
-                               Name:         name,
-                               CommitSha:    sha,
-                               RefType:      BRANCH,
-                       }
-                       ref.IsDefault, _ = branch.IsHead()
-                       err1 = l.store.Refs(ref)
-                       if err1 != nil {
-                               return err1
-                       }
-                       l.subTaskCtx.IncProgress(1)
-                       return nil
-               }
-               return nil
-       })
-       if err != nil {
-               return err
-       }
-
-       // collect commits
-       opts, err := git.DefaultDiffOptions()
-       if err != nil {
-               return err
-       }
-       opts.NotifyCallback = func(diffSoFar *git.Diff, delta git.DiffDelta, 
matchedPathSpec string) error {
-               return nil
-       }
-
-       odb, err := repo.Odb()
-       if err != nil {
-               return err
-       }
-       err = odb.ForEach(func(id *git.Oid) error {
-               select {
-               case <-l.ctx.Done():
-                       return l.ctx.Err()
-               default:
-                       break
-               }
-               commit, _ := repo.LookupCommit(id)
-               if commit == nil {
-                       return nil
-               }
-               commitSha := commit.Id().String()
-               l.logger.Debug("process commit: %s", commitSha)
-               c := &code.Commit{
-                       Sha:     commitSha,
-                       Message: commit.Message(),
-               }
-               author := commit.Author()
-               if author != nil {
-                       c.AuthorName = author.Name
-                       c.AuthorEmail = author.Email
-                       c.AuthorId = author.Email
-                       c.AuthoredDate = author.When
-               }
-               committer := commit.Committer()
-               if committer != nil {
-                       c.CommitterName = committer.Name
-                       c.CommitterEmail = committer.Email
-                       c.CommitterId = committer.Email
-                       c.CommittedDate = committer.When
-               }
-               var commitParents []*code.CommitParent
-               for i := uint(0); i < commit.ParentCount(); i++ {
-                       parent := commit.Parent(i)
-                       if parent != nil {
-                               if parentId := parent.Id(); parentId != nil {
-                                       commitParents = append(commitParents, 
&code.CommitParent{
-                                               CommitSha:       c.Sha,
-                                               ParentCommitSha: 
parentId.String(),
-                                       })
-                               }
-                       }
-               }
-               err2 := l.store.CommitParents(commitParents)
-               if err2 != nil {
-                       return err2
-               }
-               if commit.ParentCount() > 0 {
-                       parent := commit.Parent(0)
-                       if parent != nil {
-                               var parentTree, tree *git.Tree
-                               parentTree, err2 = parent.Tree()
-                               if err2 != nil {
-                                       return err2
-                               }
-                               tree, err2 = commit.Tree()
-                               if err2 != nil {
-                                       return err2
-                               }
-                               var diff *git.Diff
-                               diff, err2 = repo.DiffTreeToTree(parentTree, 
tree, &opts)
-                               if err2 != nil {
-                                       return err2
-                               }
-                               var commitFile *code.CommitFile
-                               err2 = diff.ForEach(func(file git.DiffDelta, 
progress float64) (
-                                       git.DiffForEachHunkCallback, error) {
-                                       if commitFile != nil {
-                                               err2 = 
l.store.CommitFiles(commitFile)
-                                               if err2 != nil {
-                                                       
l.logger.Error("CommitFiles error:", err)
-                                                       return nil, err2
-                                               }
-                                       }
-                                       commitFile = new(code.CommitFile)
-                                       commitFile.CommitSha = c.Sha
-                                       commitFile.FilePath = file.NewFile.Path
-                                       return func(hunk git.DiffHunk) 
(git.DiffForEachLineCallback, error) {
-                                               return func(line git.DiffLine) 
error {
-                                                       if line.Origin == 
git.DiffLineAddition {
-                                                               
commitFile.Additions += line.NumLines
-                                                       }
-                                                       if line.Origin == 
git.DiffLineDeletion {
-                                                               
commitFile.Deletions += line.NumLines
-                                                       }
-                                                       return nil
-                                               }, nil
-                                       }, nil
-                               }, git.DiffDetailLines)
-                               if err2 != nil {
-                                       return err2
-                               }
-                               if commitFile != nil {
-                                       err2 = l.store.CommitFiles(commitFile)
-                                       if err2 != nil {
-                                               l.logger.Error("CommitFiles 
error:", err)
-                                       }
-                               }
-                               var stats *git.DiffStats
-                               stats, err2 = diff.Stats()
-                               if err2 != nil {
-                                       return err2
-                               }
-                               c.Additions += stats.Insertions()
-                               c.Deletions += stats.Deletions()
-                       }
-               }
-               err2 = l.store.Commits(c)
-               if err2 != nil {
-                       return err2
-               }
-               repoCommit := &code.RepoCommit{
-                       RepoId:    repoId,
-                       CommitSha: c.Sha,
-               }
-               err2 = l.store.RepoCommits(repoCommit)
-               if err2 != nil {
-                       return err2
-               }
-               l.subTaskCtx.IncProgress(1)
-               return nil
-       })
-       return err
-}
diff --git a/plugins/gitextractor/parser/repo.go 
b/plugins/gitextractor/parser/repo.go
new file mode 100644
index 00000000..cebe5f17
--- /dev/null
+++ b/plugins/gitextractor/parser/repo.go
@@ -0,0 +1,351 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package parser
+
+import (
+       "context"
+       "fmt"
+       "github.com/apache/incubator-devlake/models/domainlayer"
+       "github.com/apache/incubator-devlake/models/domainlayer/code"
+       "github.com/apache/incubator-devlake/plugins/core"
+       "github.com/apache/incubator-devlake/plugins/gitextractor/models"
+       git "github.com/libgit2/git2go/v33"
+)
+
+type GitRepo struct {
+       store   models.Store
+       ctx     context.Context
+       logger  core.Logger
+       id      string
+       repo    *git.Repository
+       cleanup func()
+}
+
+func (r *GitRepo) CollectAll(subtaskCtx core.SubTaskContext) error {
+       subtaskCtx.SetProgress(0, -1)
+       err := r.CollectTags(subtaskCtx)
+       if err != nil {
+               return err
+       }
+       err = r.CollectBranches(subtaskCtx)
+       if err != nil {
+               return err
+       }
+       return r.CollectCommits(subtaskCtx)
+}
+
+func (r *GitRepo) Close() error {
+       defer func() {
+               if r.cleanup != nil {
+                       r.cleanup()
+               }
+       }()
+       return r.store.Close()
+}
+
+func (r *GitRepo) CountTags() (int, error) {
+       tags, err := r.repo.Tags.List()
+       if err != nil {
+               return 0, err
+       }
+       return len(tags), nil
+}
+
+func (r *GitRepo) CountBranches() (int, error) {
+       var branchIter *git.BranchIterator
+       branchIter, err := r.repo.NewBranchIterator(git.BranchAll)
+       if err != nil {
+               return 0, err
+       }
+       count := 0
+       err = branchIter.ForEach(func(branch *git.Branch, branchType 
git.BranchType) error {
+               select {
+               case <-r.ctx.Done():
+                       return r.ctx.Err()
+               default:
+                       break
+               }
+               if branch.IsBranch() || branch.IsRemote() {
+                       count++
+               }
+               return nil
+       })
+       return count, err
+}
+
+func (r *GitRepo) CountCommits() (int, error) {
+       odb, err := r.repo.Odb()
+       if err != nil {
+               return 0, err
+       }
+       count := 0
+       err = odb.ForEach(func(id *git.Oid) error {
+               select {
+               case <-r.ctx.Done():
+                       return r.ctx.Err()
+               default:
+                       break
+               }
+               commit, _ := r.repo.LookupCommit(id)
+               if commit != nil {
+                       count++
+               }
+               return nil
+       })
+       return count, err
+}
+
+func (r *GitRepo) CollectTags(subtaskCtx core.SubTaskContext) error {
+       return r.repo.Tags.Foreach(func(name string, id *git.Oid) error {
+               select {
+               case <-r.ctx.Done():
+                       return r.ctx.Err()
+               default:
+                       break
+               }
+               var err1 error
+               var tag *git.Tag
+               var tagCommit string
+               tag, _ = r.repo.LookupTag(id)
+               if tag != nil {
+                       tagCommit = tag.TargetId().String()
+               } else {
+                       tagCommit = id.String()
+               }
+               r.logger.Info("tagCommit", tagCommit)
+               if tagCommit != "" {
+                       ref := &code.Ref{
+                               DomainEntity: domainlayer.DomainEntity{Id: 
fmt.Sprintf("%s:%s", r.id, name)},
+                               RepoId:       r.id,
+                               Name:         name,
+                               CommitSha:    tagCommit,
+                               RefType:      TAG,
+                       }
+                       err1 = r.store.Refs(ref)
+                       if err1 != nil {
+                               return err1
+                       }
+                       subtaskCtx.IncProgress(1)
+               }
+               return nil
+       })
+}
+
+func (r *GitRepo) CollectBranches(subtaskCtx core.SubTaskContext) error {
+       var repoInter *git.BranchIterator
+       repoInter, err := r.repo.NewBranchIterator(git.BranchAll)
+       if err != nil {
+               return err
+       }
+       return repoInter.ForEach(func(branch *git.Branch, branchType 
git.BranchType) error {
+               select {
+               case <-r.ctx.Done():
+                       return r.ctx.Err()
+               default:
+                       break
+               }
+               if branch.IsBranch() || branch.IsRemote() {
+                       name, err1 := branch.Name()
+                       if err1 != nil {
+                               return err1
+                       }
+                       var sha string
+                       if oid := branch.Target(); oid != nil {
+                               sha = oid.String()
+                       }
+                       ref := &code.Ref{
+                               DomainEntity: domainlayer.DomainEntity{Id: 
fmt.Sprintf("%s:%s", r.id, name)},
+                               RepoId:       r.id,
+                               Name:         name,
+                               CommitSha:    sha,
+                               RefType:      BRANCH,
+                       }
+                       ref.IsDefault, _ = branch.IsHead()
+                       err1 = r.store.Refs(ref)
+                       if err1 != nil {
+                               return err1
+                       }
+                       subtaskCtx.IncProgress(1)
+                       return nil
+               }
+               return nil
+       })
+}
+
+func (r *GitRepo) CollectCommits(subtaskCtx core.SubTaskContext) error {
+       opts, err := getDiffOpts()
+       if err != nil {
+               return err
+       }
+       odb, err := r.repo.Odb()
+       if err != nil {
+               return err
+       }
+       return odb.ForEach(func(id *git.Oid) error {
+               select {
+               case <-r.ctx.Done():
+                       return r.ctx.Err()
+               default:
+                       break
+               }
+               commit, _ := r.repo.LookupCommit(id)
+               if commit == nil {
+                       return nil
+               }
+               commitSha := commit.Id().String()
+               r.logger.Debug("process commit: %s", commitSha)
+               c := &code.Commit{
+                       Sha:     commitSha,
+                       Message: commit.Message(),
+               }
+               author := commit.Author()
+               if author != nil {
+                       c.AuthorName = author.Name
+                       c.AuthorEmail = author.Email
+                       c.AuthorId = author.Email
+                       c.AuthoredDate = author.When
+               }
+               committer := commit.Committer()
+               if committer != nil {
+                       c.CommitterName = committer.Name
+                       c.CommitterEmail = committer.Email
+                       c.CommitterId = committer.Email
+                       c.CommittedDate = committer.When
+               }
+               if err != r.storeParentCommits(commitSha, commit) {
+                       return err
+               }
+               if commit.ParentCount() > 0 {
+                       parent := commit.Parent(0)
+                       if parent != nil {
+                               var stats *git.DiffStats
+                               if stats, err = 
r.getDiffComparedToParent(c.Sha, commit, parent, opts); err != nil {
+                                       return err
+                               } else {
+                                       c.Additions += stats.Insertions()
+                                       c.Deletions += stats.Deletions()
+                               }
+                       }
+               }
+               err = r.store.Commits(c)
+               if err != nil {
+                       return err
+               }
+               repoCommit := &code.RepoCommit{
+                       RepoId:    r.id,
+                       CommitSha: c.Sha,
+               }
+               err = r.store.RepoCommits(repoCommit)
+               if err != nil {
+                       return err
+               }
+               subtaskCtx.IncProgress(1)
+               return nil
+       })
+}
+
+func (r *GitRepo) storeParentCommits(commitSha string, commit *git.Commit) 
error {
+       var commitParents []*code.CommitParent
+       for i := uint(0); i < commit.ParentCount(); i++ {
+               parent := commit.Parent(i)
+               if parent != nil {
+                       if parentId := parent.Id(); parentId != nil {
+                               commitParents = append(commitParents, 
&code.CommitParent{
+                                       CommitSha:       commitSha,
+                                       ParentCommitSha: parentId.String(),
+                               })
+                       }
+               }
+       }
+       return r.store.CommitParents(commitParents)
+}
+
+func (r *GitRepo) getDiffComparedToParent(commitSha string, commit 
*git.Commit, parent *git.Commit, opts *git.DiffOptions) (*git.DiffStats, error) 
{
+       var err error
+       var parentTree, tree *git.Tree
+       parentTree, err = parent.Tree()
+       if err != nil {
+               return nil, err
+       }
+       tree, err = commit.Tree()
+       if err != nil {
+               return nil, err
+       }
+       var diff *git.Diff
+       diff, err = r.repo.DiffTreeToTree(parentTree, tree, opts)
+       if err != nil {
+               return nil, err
+       }
+       err = r.storeCommitFilesFromDiff(commitSha, diff)
+       if err != nil {
+               return nil, err
+       }
+       var stats *git.DiffStats
+       stats, err = diff.Stats()
+       if err != nil {
+               return nil, err
+       }
+       return stats, nil
+}
+
+func (r *GitRepo) storeCommitFilesFromDiff(commitSha string, diff *git.Diff) 
error {
+       var commitFile *code.CommitFile
+       var err error
+       err = diff.ForEach(func(file git.DiffDelta, progress float64) (
+               git.DiffForEachHunkCallback, error) {
+               if commitFile != nil {
+                       err = r.store.CommitFiles(commitFile)
+                       if err != nil {
+                               r.logger.Error("CommitFiles error:", err)
+                               return nil, err
+                       }
+               }
+               commitFile = new(code.CommitFile)
+               commitFile.CommitSha = commitSha
+               commitFile.FilePath = file.NewFile.Path
+               return func(hunk git.DiffHunk) (git.DiffForEachLineCallback, 
error) {
+                       return func(line git.DiffLine) error {
+                               if line.Origin == git.DiffLineAddition {
+                                       commitFile.Additions += line.NumLines
+                               }
+                               if line.Origin == git.DiffLineDeletion {
+                                       commitFile.Deletions += line.NumLines
+                               }
+                               return nil
+                       }, nil
+               }, nil
+       }, git.DiffDetailLines)
+       if commitFile != nil {
+               err = r.store.CommitFiles(commitFile)
+               if err != nil {
+                       r.logger.Error("CommitFiles error:", err)
+               }
+       }
+       return err
+}
+
+func getDiffOpts() (*git.DiffOptions, error) {
+       opts, err := git.DefaultDiffOptions()
+       if err != nil {
+               return nil, err
+       }
+       opts.NotifyCallback = func(diffSoFar *git.Diff, delta git.DiffDelta, 
matchedPathSpec string) error {
+               return nil
+       }
+       return &opts, nil
+}
diff --git a/plugins/gitextractor/parser/repo_creator.go 
b/plugins/gitextractor/parser/repo_creator.go
new file mode 100644
index 00000000..d05f3914
--- /dev/null
+++ b/plugins/gitextractor/parser/repo_creator.go
@@ -0,0 +1,60 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package parser
+
+import (
+       git "github.com/libgit2/git2go/v33"
+
+       "github.com/apache/incubator-devlake/plugins/core"
+       "github.com/apache/incubator-devlake/plugins/gitextractor/models"
+)
+
+const (
+       BRANCH = "BRANCH"
+       TAG    = "TAG"
+)
+
+type GitRepoCreator struct {
+       store   models.Store
+       taskCtx core.TaskContext
+}
+
+func NewGitRepoCreator(store models.Store, taskCtx core.TaskContext) 
*GitRepoCreator {
+       return &GitRepoCreator{
+               store:   store,
+               taskCtx: taskCtx,
+       }
+}
+
+func (l *GitRepoCreator) LocalRepo(repoPath, repoId string) (*GitRepo, error) {
+       repo, err := git.OpenRepository(repoPath)
+       if err != nil {
+               return nil, err
+       }
+       return l.newGitRepo(repoId, repo), nil
+}
+
+func (l *GitRepoCreator) newGitRepo(id string, repo *git.Repository) *GitRepo {
+       return &GitRepo{
+               store:  l.store,
+               ctx:    l.taskCtx.GetContext(),
+               logger: l.taskCtx.GetLogger(),
+               id:     id,
+               repo:   repo,
+       }
+}
diff --git a/plugins/gitextractor/tasks/git_repo_collector.go 
b/plugins/gitextractor/tasks/git_repo_collector.go
index c5b86bd6..c750de84 100644
--- a/plugins/gitextractor/tasks/git_repo_collector.go
+++ b/plugins/gitextractor/tasks/git_repo_collector.go
@@ -19,11 +19,10 @@ package tasks
 
 import (
        "errors"
+       "github.com/apache/incubator-devlake/plugins/gitextractor/parser"
        "strings"
 
        "github.com/apache/incubator-devlake/plugins/core"
-       "github.com/apache/incubator-devlake/plugins/gitextractor/parser"
-       "github.com/apache/incubator-devlake/plugins/gitextractor/store"
 )
 
 type GitExtractorOptions struct {
@@ -53,27 +52,64 @@ func (o GitExtractorOptions) Valid() error {
        return nil
 }
 
-func CollectGitRepo(subTaskCtx core.SubTaskContext) error {
-       var err error
-       op := subTaskCtx.GetData().(GitExtractorOptions)
-       storage := store.NewDatabase(subTaskCtx, op.Url)
-       p := parser.NewLibGit2(storage, subTaskCtx)
-       if strings.HasPrefix(op.Url, "http") {
-               err = p.CloneOverHTTP(op.RepoId, op.Url, op.User, op.Password, 
op.Proxy)
-       } else if url := strings.TrimPrefix(op.Url, "ssh://"); 
strings.HasPrefix(url, "git@") {
-               err = p.CloneOverSSH(op.RepoId, url, op.PrivateKey, 
op.Passphrase)
-       } else if strings.HasPrefix(op.Url, "/") {
-               err = p.LocalRepo(op.Url, op.RepoId)
+func CollectGitCommits(subTaskCtx core.SubTaskContext) error {
+       repo := getGitRepo(subTaskCtx)
+       if count, err := repo.CountCommits(); err != nil {
+               subTaskCtx.GetLogger().Error("unable to get commit count: %v", 
err)
+               subTaskCtx.SetProgress(0, -1)
+       } else {
+               subTaskCtx.SetProgress(0, count)
        }
-       if err != nil {
-               return err
+       return repo.CollectCommits(subTaskCtx)
+}
+
+func CollectGitBranches(subTaskCtx core.SubTaskContext) error {
+       repo := getGitRepo(subTaskCtx)
+       if count, err := repo.CountBranches(); err != nil {
+               subTaskCtx.GetLogger().Error("unable to get branch count: %v", 
err)
+               subTaskCtx.SetProgress(0, -1)
+       } else {
+               subTaskCtx.SetProgress(0, count)
        }
-       return nil
+       return repo.CollectBranches(subTaskCtx)
+}
+
+func CollectGitTags(subTaskCtx core.SubTaskContext) error {
+       repo := getGitRepo(subTaskCtx)
+       if count, err := repo.CountTags(); err != nil {
+               subTaskCtx.GetLogger().Error("unable to get tag count: %v", err)
+               subTaskCtx.SetProgress(0, -1)
+       } else {
+               subTaskCtx.SetProgress(0, count)
+       }
+       return repo.CollectTags(subTaskCtx)
+}
+
+func getGitRepo(subTaskCtx core.SubTaskContext) *parser.GitRepo {
+       repo, ok := subTaskCtx.GetData().(*parser.GitRepo)
+       if !ok {
+               panic("git repo reference not found on context")
+       }
+       return repo
+}
+
+var CollectGitCommitMeta = core.SubTaskMeta{
+       Name:             "collectGitCommits",
+       EntryPoint:       CollectGitCommits,
+       EnabledByDefault: true,
+       Description:      "collect git commits into Domain Layer Tables",
+}
+
+var CollectGitBranchMeta = core.SubTaskMeta{
+       Name:             "collectGitBranches",
+       EntryPoint:       CollectGitBranches,
+       EnabledByDefault: true,
+       Description:      "collect git branch into Domain Layer Tables",
 }
 
-var CollectGitRepoMeta = core.SubTaskMeta{
-       Name:             "collectGitRepo",
-       EntryPoint:       CollectGitRepo,
+var CollectGitTagMeta = core.SubTaskMeta{
+       Name:             "collectGitTags",
+       EntryPoint:       CollectGitTags,
        EnabledByDefault: true,
-       Description:      "collect git commits/branches/tags int Domain Layer 
Tables",
+       Description:      "collect git tag into Domain Layer Tables",
 }
diff --git a/runner/run_pipeline.go b/runner/run_pipeline.go
index 724a1909..d777fc66 100644
--- a/runner/run_pipeline.go
+++ b/runner/run_pipeline.go
@@ -33,6 +33,7 @@ func RunPipeline(
        pipelineId uint64,
        runTasks func([]uint64) error,
 ) error {
+       startTime := time.Now()
        // load pipeline from db
        pipeline := &models.Pipeline{}
        err := db.Find(pipeline, pipelineId).Error
@@ -93,7 +94,7 @@ func RunPipeline(
                        return err
                }
        }
-
-       log.Info("pipeline finished: %w", err)
+       endTime := time.Now()
+       log.Info("pipeline finished in %d ms: %w", 
endTime.UnixMilli()-startTime.UnixMilli(), err)
        return err
 }
diff --git a/runner/run_task.go b/runner/run_task.go
index 24c8fd24..a2739bd8 100644
--- a/runner/run_task.go
+++ b/runner/run_task.go
@@ -217,6 +217,9 @@ func RunPluginSubTasks(
        }
 
        taskCtx := helper.NewDefaultTaskContext(cfg, logger, db, ctx, name, 
subtasksFlag, progress)
+       if closeablePlugin, ok := pluginTask.(core.CloseablePluginTask); ok {
+               defer closeablePlugin.Close(taskCtx)
+       }
        taskData, err := pluginTask.PrepareTaskData(taskCtx, options)
        if err != nil {
                return err

Reply via email to