This is an automated email from the ASF dual-hosted git repository.
abeizn pushed a commit to branch fix#6075
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/fix#6075 by this push:
new 1f84d11b2 fix: jenkins incremental collectors
1f84d11b2 is described below
commit 1f84d11b2232fdb4cf6db6aac8cc20043a7c1eee
Author: abeizn <[email protected]>
AuthorDate: Thu Sep 14 13:56:56 2023 +0800
fix: jenkins incremental collectors
---
backend/plugins/jenkins/tasks/stage_collector.go | 32 ++++++++++++++----------
1 file changed, 19 insertions(+), 13 deletions(-)
diff --git a/backend/plugins/jenkins/tasks/stage_collector.go
b/backend/plugins/jenkins/tasks/stage_collector.go
index 6c8478750..bf159c1d6 100644
--- a/backend/plugins/jenkins/tasks/stage_collector.go
+++ b/backend/plugins/jenkins/tasks/stage_collector.go
@@ -48,15 +48,29 @@ type SimpleBuild struct {
func CollectApiStages(taskCtx plugin.SubTaskContext) errors.Error {
db := taskCtx.GetDal()
data := taskCtx.GetData().(*JenkinsTaskData)
+
+ collectorWithState, err :=
api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+ Params: JenkinsApiParams{
+ ConnectionId: data.Options.ConnectionId,
+ FullName: data.Options.JobFullName,
+ },
+ Ctx: taskCtx,
+ Table: RAW_STAGE_TABLE,
+ })
+ if err != nil {
+ return err
+ }
+
clauses := []dal.Clause{
dal.Select("tjb.number,tjb.full_name"),
dal.From("_tool_jenkins_builds as tjb"),
dal.Where(`tjb.connection_id = ? and tjb.job_path = ? and
tjb.job_name = ? and tjb.class = ?`,
data.Options.ConnectionId, data.Options.JobPath,
data.Options.JobName, "WorkflowRun"),
}
- syncPolicy := taskCtx.TaskContext().SyncPolicy()
- if syncPolicy != nil && syncPolicy.TimeAfter != nil {
- clauses = append(clauses, dal.Where(`tjb.start_time >= ?`,
syncPolicy.TimeAfter))
+
+ incremental := collectorWithState.IsIncremental()
+ if incremental && collectorWithState.LatestState.LatestSuccessStart !=
nil {
+ clauses = append(clauses, dal.Where(`tjb.start_time >= ?`,
collectorWithState.LatestState.LatestSuccessStart))
}
cursor, err := db.Cursor(clauses...)
@@ -70,15 +84,7 @@ func CollectApiStages(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- collector, err := api.NewApiCollector(api.ApiCollectorArgs{
- RawDataSubTaskArgs: api.RawDataSubTaskArgs{
- Params: JenkinsApiParams{
- ConnectionId: data.Options.ConnectionId,
- FullName: data.Options.JobFullName,
- },
- Ctx: taskCtx,
- Table: RAW_STAGE_TABLE,
- },
+ err = collectorWithState.InitCollector(api.ApiCollectorArgs{
ApiClient: data.ApiClient,
Input: iterator,
UrlTemplate: fmt.Sprintf("%sjob/%s/{{ .Input.Number
}}/wfapi/describe", data.Options.JobPath, data.Options.JobName),
@@ -106,5 +112,5 @@ func CollectApiStages(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- return collector.Execute()
+ return collectorWithState.Execute()
}