This is an automated email from the ASF dual-hosted git repository.
abeizn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 086eae728 feat: collect only finished jobs for Gitlab (#5889)
086eae728 is described below
commit 086eae72831e7d029354a81dd73a85cc4655f9f1
Author: Klesh Wong <[email protected]>
AuthorDate: Wed Aug 16 11:54:53 2023 +0800
feat: collect only finished jobs for Gitlab (#5889)
* fix: incorrect way to handle query string values
* feat: make CollectUnfinishedDetails optional
* feat: collect only finished job for gitlab
---
backend/helpers/pluginhelper/api/api_client.go | 10 +++---
.../pluginhelper/api/api_collector_with_state.go | 6 +++-
backend/plugins/github/tasks/cicd_run_collector.go | 2 +-
backend/plugins/github/tasks/event_collector.go | 2 +-
backend/plugins/github/tasks/pr_collector.go | 2 +-
backend/plugins/gitlab/tasks/job_collector.go | 36 ++--------------------
backend/plugins/jenkins/tasks/build_collector.go | 2 +-
.../plugins/pagerduty/tasks/incidents_collector.go | 11 ++++---
8 files changed, 23 insertions(+), 48 deletions(-)
diff --git a/backend/helpers/pluginhelper/api/api_client.go
b/backend/helpers/pluginhelper/api/api_client.go
index ab2af17d4..255af5dee 100644
--- a/backend/helpers/pluginhelper/api/api_client.go
+++ b/backend/helpers/pluginhelper/api/api_client.go
@@ -24,17 +24,17 @@ import (
"encoding/json"
"encoding/xml"
"fmt"
- aha "github.com/apache/incubator-devlake/core/plugin"
"io"
"net/http"
"net/url"
"reflect"
"regexp"
- "strings"
"sync"
"time"
"unicode/utf8"
+ aha "github.com/apache/incubator-devlake/core/plugin"
+
"github.com/apache/incubator-devlake/core/context"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/log"
@@ -417,8 +417,10 @@ func GetURIStringPointer(baseUrl string, relativePath
string, query url.Values)
}
if query != nil {
queryString := u.Query()
- for key, value := range query {
- queryString.Set(key, strings.Join(value, ""))
+ for key, values := range query {
+ for _, v := range values {
+ queryString.Add(key, v)
+ }
}
u.RawQuery = queryString.Encode()
diff --git a/backend/helpers/pluginhelper/api/api_collector_with_state.go
b/backend/helpers/pluginhelper/api/api_collector_with_state.go
index b3282459b..11dca16e6 100644
--- a/backend/helpers/pluginhelper/api/api_collector_with_state.go
+++ b/backend/helpers/pluginhelper/api/api_collector_with_state.go
@@ -226,6 +226,10 @@ func NewStatefulApiCollectorForFinalizableEntity(args
FinalizableApiCollectorArg
return nil, err
}
+ if args.CollectUnfinishedDetails == nil {
+ return manager, nil
+ }
+
// step 2: create another collector to collect updated records
// TODO: this creates cursor before previous step gets executed, which
is too early, to be optimized
input, err := args.CollectUnfinishedDetails.BuildInputIterator()
@@ -264,7 +268,7 @@ type FinalizableApiCollectorArgs struct {
ApiClient RateLimitedApiClient
TimeAfter *time.Time // leave it be nil to disable time
filter
CollectNewRecordsByList FinalizableApiCollectorListArgs
- CollectUnfinishedDetails FinalizableApiCollectorDetailArgs
+ CollectUnfinishedDetails *FinalizableApiCollectorDetailArgs
}
// FinalizableApiCollectorCommonArgs is the common arguments for both list and
detail collectors
diff --git a/backend/plugins/github/tasks/cicd_run_collector.go
b/backend/plugins/github/tasks/cicd_run_collector.go
index 641b50219..4d9158fcf 100644
--- a/backend/plugins/github/tasks/cicd_run_collector.go
+++ b/backend/plugins/github/tasks/cicd_run_collector.go
@@ -111,7 +111,7 @@ func CollectRuns(taskCtx plugin.SubTaskContext)
errors.Error {
return pj.CreatedAt.ToTime(), nil
},
},
- CollectUnfinishedDetails:
helper.FinalizableApiCollectorDetailArgs{
+ CollectUnfinishedDetails:
&helper.FinalizableApiCollectorDetailArgs{
BuildInputIterator: func() (helper.Iterator,
errors.Error) {
// load unfinished runs from the database
cursor, err := db.Cursor(
diff --git a/backend/plugins/github/tasks/event_collector.go
b/backend/plugins/github/tasks/event_collector.go
index 666bad47b..7652f068f 100644
--- a/backend/plugins/github/tasks/event_collector.go
+++ b/backend/plugins/github/tasks/event_collector.go
@@ -101,7 +101,7 @@ func CollectApiEvents(taskCtx plugin.SubTaskContext)
errors.Error {
return e.CreatedAt.ToTime(), nil
},
},
- CollectUnfinishedDetails:
helper.FinalizableApiCollectorDetailArgs{
+ CollectUnfinishedDetails:
&helper.FinalizableApiCollectorDetailArgs{
BuildInputIterator: func() (helper.Iterator,
errors.Error) {
cursor, err := db.Cursor(
dal.Select("github_id"),
diff --git a/backend/plugins/github/tasks/pr_collector.go
b/backend/plugins/github/tasks/pr_collector.go
index 02ab884d9..f94e96539 100644
--- a/backend/plugins/github/tasks/pr_collector.go
+++ b/backend/plugins/github/tasks/pr_collector.go
@@ -104,7 +104,7 @@ func CollectApiPullRequests(taskCtx plugin.SubTaskContext)
errors.Error {
return pr.CreatedAt, nil
},
},
- CollectUnfinishedDetails:
helper.FinalizableApiCollectorDetailArgs{
+ CollectUnfinishedDetails:
&helper.FinalizableApiCollectorDetailArgs{
BuildInputIterator: func() (helper.Iterator,
errors.Error) {
// select pull id from database
cursor, err := db.Cursor(
diff --git a/backend/plugins/gitlab/tasks/job_collector.go
b/backend/plugins/gitlab/tasks/job_collector.go
index 0accbaa9f..6a7475784 100644
--- a/backend/plugins/gitlab/tasks/job_collector.go
+++ b/backend/plugins/gitlab/tasks/job_collector.go
@@ -19,19 +19,15 @@ package tasks
import (
"encoding/json"
- "io"
"net/http"
"net/url"
- "reflect"
"strconv"
"time"
- "github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models/common"
"github.com/apache/incubator-devlake/core/plugin"
helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
- "github.com/apache/incubator-devlake/plugins/gitlab/models"
)
func init() {
@@ -56,7 +52,6 @@ var CollectApiJobsMeta = plugin.SubTaskMeta{
func CollectApiJobs(taskCtx plugin.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_JOB_TABLE)
- db := taskCtx.GetDal()
collector, err :=
helper.NewStatefulApiCollectorForFinalizableEntity(helper.FinalizableApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
ApiClient: data.ApiClient,
@@ -70,6 +65,8 @@ func CollectApiJobs(taskCtx plugin.SubTaskContext)
errors.Error {
query := url.Values{}
query.Set("page",
strconv.Itoa(reqData.Pager.Page))
query.Set("per_page",
strconv.Itoa(reqData.Pager.Size))
+ query.Set("scope[]", "failed")
+ query.Add("scope[]", "success")
return query, nil
},
ResponseParser: func(res *http.Response)
([]json.RawMessage, errors.Error) {
@@ -91,35 +88,6 @@ func CollectApiJobs(taskCtx plugin.SubTaskContext)
errors.Error {
return pr.CreatedAt.ToTime(), nil
},
},
- CollectUnfinishedDetails:
helper.FinalizableApiCollectorDetailArgs{
- BuildInputIterator: func() (helper.Iterator,
errors.Error) {
- // select pull id from database
- cursor, err := db.Cursor(
- dal.Select("gitlab_id"),
- dal.From(&models.GitlabJob{}),
- dal.Where(
- "project_id = ? AND
connection_id = ? AND finished_at is null",
- data.Options.ProjectId,
data.Options.ConnectionId,
- ),
- )
- if err != nil {
- return nil, err
- }
- return helper.NewDalCursorIterator(db, cursor,
reflect.TypeOf(SimpleGitlabApiJob{}))
- },
- FinalizableApiCollectorCommonArgs:
helper.FinalizableApiCollectorCommonArgs{
- UrlTemplate: "projects/{{ .Params.ProjectId
}}/jobs/{{ .Input.GitlabId }}",
- ResponseParser: func(res *http.Response)
([]json.RawMessage, errors.Error) {
- body, err := io.ReadAll(res.Body)
- if err != nil {
- return nil, errors.Convert(err)
- }
- res.Body.Close()
- return []json.RawMessage{body}, nil
- },
- AfterResponse: ignoreHTTPStatus403, // ignore
403 for CI/CD disable
- },
- },
})
if err != nil {
diff --git a/backend/plugins/jenkins/tasks/build_collector.go
b/backend/plugins/jenkins/tasks/build_collector.go
index 9ad7e8493..290d3a3e6 100644
--- a/backend/plugins/jenkins/tasks/build_collector.go
+++ b/backend/plugins/jenkins/tasks/build_collector.go
@@ -102,7 +102,7 @@ func CollectApiBuilds(taskCtx plugin.SubTaskContext)
errors.Error {
return time.Unix(seconds, nanos), nil
},
},
- CollectUnfinishedDetails:
helper.FinalizableApiCollectorDetailArgs{
+ CollectUnfinishedDetails:
&helper.FinalizableApiCollectorDetailArgs{
BuildInputIterator: func() (helper.Iterator,
errors.Error) {
cursor, err := db.Cursor(
dal.Select("number"),
diff --git a/backend/plugins/pagerduty/tasks/incidents_collector.go
b/backend/plugins/pagerduty/tasks/incidents_collector.go
index 146478f02..d8cae2f1f 100644
--- a/backend/plugins/pagerduty/tasks/incidents_collector.go
+++ b/backend/plugins/pagerduty/tasks/incidents_collector.go
@@ -20,15 +20,16 @@ package tasks
import (
"encoding/json"
"fmt"
+ "net/http"
+ "net/url"
+ "reflect"
+ "time"
+
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
"github.com/apache/incubator-devlake/plugins/pagerduty/models"
- "net/http"
- "net/url"
- "reflect"
- "time"
)
const RAW_INCIDENTS_TABLE = "pagerduty_incidents"
@@ -108,7 +109,7 @@ func CollectIncidents(taskCtx plugin.SubTaskContext)
errors.Error {
},
},
},
- CollectUnfinishedDetails: api.FinalizableApiCollectorDetailArgs{
+ CollectUnfinishedDetails:
&api.FinalizableApiCollectorDetailArgs{
FinalizableApiCollectorCommonArgs:
api.FinalizableApiCollectorCommonArgs{
// 2. "Input" here is the type:
simplifiedRawIncident which is the element type of the returned iterator from
BuildInputIterator
UrlTemplate: "incidents/{{ .Input.Number }}",