This is an automated email from the ASF dual-hosted git repository.

likyh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 5cfe8f0fe gitextractor should delete old data (#4089)
5cfe8f0fe is described below

commit 5cfe8f0febced111e125dc752fa1bbe02b16d548
Author: Klesh Wong <[email protected]>
AuthorDate: Tue Jan 3 21:04:08 2023 +0800

    gitextractor should delete old data (#4089)
    
    * fix: gitext doesn't delete previous data
    
    * fix: previous running task should be set to FAILED on startup
---
 plugins/gitextractor/impl/impl.go      |  2 +-
 plugins/gitextractor/main.go           |  2 +-
 plugins/gitextractor/store/database.go | 32 ++++++++++++++++++++++++++------
 services/pipeline.go                   |  8 ++++++++
 4 files changed, 36 insertions(+), 8 deletions(-)

diff --git a/plugins/gitextractor/impl/impl.go 
b/plugins/gitextractor/impl/impl.go
index c9ffee992..65fb4674b 100644
--- a/plugins/gitextractor/impl/impl.go
+++ b/plugins/gitextractor/impl/impl.go
@@ -64,7 +64,7 @@ func (plugin GitExtractor) PrepareTaskData(taskCtx 
core.TaskContext, options map
        if err := op.Valid(); err != nil {
                return nil, err
        }
-       storage := store.NewDatabase(taskCtx, op.Url)
+       storage := store.NewDatabase(taskCtx, op.RepoId)
        repo, err := NewGitRepo(taskCtx.GetLogger(), storage, op)
        if err != nil {
                return nil, err
diff --git a/plugins/gitextractor/main.go b/plugins/gitextractor/main.go
index d5c94feab..b38b4caef 100644
--- a/plugins/gitextractor/main.go
+++ b/plugins/gitextractor/main.go
@@ -72,7 +72,7 @@ func main() {
        }
        // If we didn't specify output or dburl, we will use db by default
        if storage == nil {
-               storage = store.NewDatabase(basicRes, *url)
+               storage = store.NewDatabase(basicRes, *id)
        }
        defer storage.Close()
        ctx := context.Background()
diff --git a/plugins/gitextractor/store/database.go 
b/plugins/gitextractor/store/database.go
index f34212b3c..257939bec 100644
--- a/plugins/gitextractor/store/database.go
+++ b/plugins/gitextractor/store/database.go
@@ -18,10 +18,11 @@ limitations under the License.
 package store
 
 import (
-       "fmt"
-       "github.com/apache/incubator-devlake/errors"
        "reflect"
 
+       "github.com/apache/incubator-devlake/errors"
+
+       "github.com/apache/incubator-devlake/models/common"
        "github.com/apache/incubator-devlake/models/domainlayer"
        "github.com/apache/incubator-devlake/models/domainlayer/code"
        "github.com/apache/incubator-devlake/models/domainlayer/crossdomain"
@@ -33,24 +34,35 @@ const BathSize = 100
 
 type Database struct {
        driver *helper.BatchSaveDivider
+       table  string
+       params string
 }
 
-func NewDatabase(basicRes core.BasicRes, repoUrl string) *Database {
-       database := new(Database)
+func NewDatabase(basicRes core.BasicRes, repoId string) *Database {
+       database := &Database{
+               table:  "gitextractor",
+               params: repoId,
+       }
        database.driver = helper.NewBatchSaveDivider(
                basicRes,
                BathSize,
-               "gitextractor",
-               fmt.Sprintf(`{"RepoUrl": "%s"}`, repoUrl),
+               database.table,
+               database.params,
        )
        return database
 }
 
+func (d *Database) updateRawDataFields(rawData *common.RawDataOrigin) {
+       rawData.RawDataTable = d.table
+       rawData.RawDataParams = d.params
+}
+
 func (d *Database) RepoCommits(repoCommit *code.RepoCommit) errors.Error {
        batch, err := d.driver.ForType(reflect.TypeOf(repoCommit))
        if err != nil {
                return err
        }
+       d.updateRawDataFields(&repoCommit.RawDataOrigin)
        return batch.Add(repoCommit)
 }
 
@@ -65,6 +77,7 @@ func (d *Database) Commits(commit *code.Commit) errors.Error {
        if err != nil {
                return err
        }
+       d.updateRawDataFields(&account.RawDataOrigin)
        err = accountBatch.Add(account)
        if err != nil {
                return err
@@ -73,6 +86,7 @@ func (d *Database) Commits(commit *code.Commit) errors.Error {
        if err != nil {
                return err
        }
+       d.updateRawDataFields(&account.RawDataOrigin)
        return commitBatch.Add(commit)
 }
 
@@ -81,6 +95,7 @@ func (d *Database) Refs(ref *code.Ref) errors.Error {
        if err != nil {
                return err
        }
+       d.updateRawDataFields(&ref.RawDataOrigin)
        return batch.Add(ref)
 }
 
@@ -89,6 +104,7 @@ func (d *Database) CommitFiles(file *code.CommitFile) 
errors.Error {
        if err != nil {
                return err
        }
+       d.updateRawDataFields(&file.RawDataOrigin)
        return batch.Add(file)
 }
 
@@ -97,6 +113,7 @@ func (d *Database) CommitFileComponents(commitFileComponent 
*code.CommitFileComp
        if err != nil {
                return err
        }
+       d.updateRawDataFields(&commitFileComponent.RawDataOrigin)
        return batch.Add(commitFileComponent)
 }
 
@@ -105,6 +122,7 @@ func (d *Database) RepoSnapshot(snapshotElement 
*code.RepoSnapshot) errors.Error
        if err != nil {
                return err
        }
+       d.updateRawDataFields(&snapshotElement.RawDataOrigin)
        return batch.Add(snapshotElement)
 }
 
@@ -113,6 +131,7 @@ func (d *Database) CommitLineChange(commitLineChange 
*code.CommitLineChange) err
        if err != nil {
                return err
        }
+       d.updateRawDataFields(&commitLineChange.RawDataOrigin)
        return batch.Add(commitLineChange)
 }
 
@@ -125,6 +144,7 @@ func (d *Database) CommitParents(pp []*code.CommitParent) 
errors.Error {
                return err
        }
        for _, cp := range pp {
+               d.updateRawDataFields(&cp.RawDataOrigin)
                err = batch.Add(cp)
                if err != nil {
                        return err
diff --git a/services/pipeline.go b/services/pipeline.go
index 341529600..92cc643a0 100644
--- a/services/pipeline.go
+++ b/services/pipeline.go
@@ -82,6 +82,14 @@ func pipelineServiceInit() {
                if err != nil {
                        panic(err)
                }
+               err = db.UpdateColumn(
+                       &models.Task{},
+                       "status", models.TASK_FAILED,
+                       dal.Where("status = ?", models.TASK_RUNNING),
+               )
+               if err != nil {
+                       panic(err)
+               }
        }
 
        err := ReloadBlueprints(cronManager)

Reply via email to