This is an automated email from the ASF dual-hosted git repository.
mappjzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 5443b7745 Sonarqube fix (#4836)
5443b7745 is described below
commit 5443b77453b80ff50ae499445844611ef03354d5
Author: mappjzc <[email protected]>
AuthorDate: Mon Apr 3 21:03:34 2023 +0800
Sonarqube fix (#4836)
* fix: fix cron
Fix cron use error blueprint to run pipelines.
Nddtfjiang <[email protected]>
* fix: fix sonarqube 10000 limit
Support sonarqube collect more than 10000 issues.
Nddtfjiang <[email protected]>
---
backend/helpers/pluginhelper/api/iterator.go | 16 ++-
backend/helpers/pluginhelper/api/queue.go | 74 ++++++++++--
backend/helpers/pluginhelper/api/queue_test.go | 61 ++++++++--
.../plugins/sonarqube/tasks/issues_collector.go | 134 ++++++++++++++++++++-
4 files changed, 259 insertions(+), 26 deletions(-)
diff --git a/backend/helpers/pluginhelper/api/iterator.go
b/backend/helpers/pluginhelper/api/iterator.go
index 7e52b58d7..37ffd2b85 100644
--- a/backend/helpers/pluginhelper/api/iterator.go
+++ b/backend/helpers/pluginhelper/api/iterator.go
@@ -18,10 +18,11 @@ limitations under the License.
package api
import (
- "github.com/apache/incubator-devlake/core/dal"
- "github.com/apache/incubator-devlake/core/errors"
"reflect"
"time"
+
+ "github.com/apache/incubator-devlake/core/dal"
+ "github.com/apache/incubator-devlake/core/errors"
)
// Iterator FIXME ...
@@ -155,12 +156,17 @@ func (q *QueueIterator) HasNext() bool {
// Fetch current item
func (q *QueueIterator) Fetch() (interface{}, errors.Error) {
- return q.queue.PullWithOutLock(), nil
+ node := q.queue.Pull()
+ if node == nil {
+ return nil, nil
+ } else {
+ return node.Data(), nil
+ }
}
// Push a data into queue
-func (q *QueueIterator) Push(data QueueNode) {
- q.queue.PushWithoutLock(data)
+func (q *QueueIterator) Push(data interface{}) {
+ q.queue.Push(NewQueueIteratorNode(data))
}
// Close releases resources
diff --git a/backend/helpers/pluginhelper/api/queue.go
b/backend/helpers/pluginhelper/api/queue.go
index be4cdb6ad..890d24d9e 100644
--- a/backend/helpers/pluginhelper/api/queue.go
+++ b/backend/helpers/pluginhelper/api/queue.go
@@ -19,7 +19,7 @@ package api
import (
"sync"
- "sync/atomic"
+ "time"
)
// QueueNode represents a node in the queue
@@ -31,33 +31,62 @@ type QueueNode interface {
// Queue represetns a queue
type Queue struct {
- count int64
- head QueueNode
- tail QueueNode
- mux sync.Mutex
+ count int64
+ head QueueNode
+ tail QueueNode
+ mux sync.Mutex
+ working int64 // working count
+}
+
+// reduce working count
+func (q *Queue) Finish(count int64) {
+ q.mux.Lock()
+ defer q.mux.Unlock()
+
+ q.working -= count
}
// Push add a node to queue
func (q *Queue) Push(node QueueNode) {
q.mux.Lock()
defer q.mux.Unlock()
+
q.PushWithoutLock(node)
}
// Pull get a node from queue
-func (q *Queue) Pull(add *int64) QueueNode {
+// it will add the working count and blocked when there are no node on queue
but working count not zero
+func (q *Queue) Pull() QueueNode {
q.mux.Lock()
defer q.mux.Unlock()
-
node := q.PullWithOutLock()
-
- if node == nil {
+ if node != nil {
+ return node
+ } else {
return nil
}
- if add != nil {
- atomic.AddInt64(add, 1)
+}
+
+func (q *Queue) PullWithWorkingBlock() QueueNode {
+ q.mux.Lock()
+ defer q.mux.Unlock()
+
+ for {
+ node := q.PullWithOutLock()
+ if node != nil {
+ q.working++
+
+ return node
+ } else if q.working > 0 {
+ q.mux.Unlock()
+
+ time.Sleep(time.Second)
+
+ q.mux.Lock()
+ } else {
+ return nil
+ }
}
- return node
}
// PushWithoutLock is no lock mode of Push
@@ -90,6 +119,7 @@ func (q *Queue) PullWithOutLock() QueueNode {
} else {
q.count = 0
}
+
return node
}
@@ -104,6 +134,7 @@ func (q *Queue) CleanWithOutLock() {
func (q *Queue) Clean() {
q.mux.Lock()
defer q.mux.Unlock()
+
q.CleanWithOutLock()
}
@@ -116,9 +147,28 @@ func (q *Queue) GetCountWithOutLock() int64 {
func (q *Queue) GetCount() int64 {
q.mux.Lock()
defer q.mux.Unlock()
+
return q.count
}
+// GetCount get the node count in query and only return zero when working zero
+func (q *Queue) GetCountWithWorkingBlock() int64 {
+ q.mux.Lock()
+ defer q.mux.Unlock()
+
+ for {
+ if q.count == 0 && q.working > 0 {
+ q.mux.Unlock()
+
+ time.Sleep(time.Second)
+
+ q.mux.Lock()
+ } else {
+ return q.count
+ }
+ }
+}
+
// NewQueue create and init a new Queue
func NewQueue() *Queue {
return &Queue{
diff --git a/backend/helpers/pluginhelper/api/queue_test.go
b/backend/helpers/pluginhelper/api/queue_test.go
index 7da9117e7..3e0baf167 100644
--- a/backend/helpers/pluginhelper/api/queue_test.go
+++ b/backend/helpers/pluginhelper/api/queue_test.go
@@ -18,23 +18,68 @@ limitations under the License.
package api
import (
- "github.com/stretchr/testify/require"
"testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
)
-func TestQueue(t *testing.T) {
+func TestQueueIterator(t *testing.T) {
it := NewQueueIterator()
- it.Push(NewQueueIteratorNode("a"))
- it.Push(NewQueueIteratorNode("b"))
+ it.Push("a")
+ it.Push("b")
require.True(t, it.HasNext())
folderRaw, err := it.Fetch()
require.NoError(t, err)
- node := folderRaw.(*QueueIteratorNode)
- require.Equal(t, "a", node.Data())
+ data := folderRaw.(string)
+ require.Equal(t, "a", data)
require.True(t, it.HasNext())
folderRaw, err = it.Fetch()
require.NoError(t, err)
- node = folderRaw.(*QueueIteratorNode)
- require.Equal(t, "b", node.Data())
+ data = folderRaw.(string)
+ require.Equal(t, "b", data)
require.False(t, it.HasNext())
}
+
+func TestQueue(t *testing.T) {
+ q := NewQueue()
+ q.Push(NewQueueIteratorNode("a"))
+ q.Push(NewQueueIteratorNode("b"))
+
+ require.Equal(t, q.GetCount(), int64(2))
+ folderRaw := q.PullWithWorkingBlock()
+ data, ok := folderRaw.Data().(string)
+ require.True(t, ok)
+ require.Equal(t, "a", data)
+ require.Equal(t, q.GetCount(), int64(1))
+ folderRaw = q.PullWithWorkingBlock()
+ data, ok = folderRaw.Data().(string)
+ require.True(t, ok)
+ require.Equal(t, "b", data)
+ require.Equal(t, q.GetCount(), int64(0))
+
+ empty := false
+ waited := false
+ go func() {
+ require.Equal(t, q.GetCountWithWorkingBlock(), int64(1))
+ data, ok := q.PullWithWorkingBlock().Data().(string)
+ require.True(t, ok)
+ require.Equal(t, data, "c")
+ dataNode := q.PullWithWorkingBlock()
+ require.Equal(t, dataNode, nil)
+ empty = true
+ }()
+
+ go func() {
+ time.Sleep(100 * time.Millisecond)
+ q.Push(NewQueueIteratorNode("c"))
+ waited = true
+ q.Finish(3)
+ }()
+
+ for !empty {
+ time.Sleep(time.Millisecond)
+ }
+
+ require.True(t, waited)
+}
diff --git a/backend/plugins/sonarqube/tasks/issues_collector.go
b/backend/plugins/sonarqube/tasks/issues_collector.go
index 7e3292a3a..d210191ed 100644
--- a/backend/plugins/sonarqube/tasks/issues_collector.go
+++ b/backend/plugins/sonarqube/tasks/issues_collector.go
@@ -22,6 +22,7 @@ import (
"fmt"
"net/http"
"net/url"
+ "time"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
@@ -32,24 +33,125 @@ const RAW_ISSUES_TABLE = "sonarqube_api_issues"
var _ plugin.SubTaskEntryPoint = CollectIssues
+type SonarqubeIssueTimeIteratorNode struct {
+ CreatedAfter *time.Time
+ CreatedBefore *time.Time
+}
+
func CollectIssues(taskCtx plugin.SubTaskContext) (err errors.Error) {
logger := taskCtx.GetLogger()
logger.Info("collect issues")
+ iterator := helper.NewQueueIterator()
+ iterator.Push(
+ &SonarqubeIssueTimeIteratorNode{
+ CreatedAfter: nil,
+ CreatedBefore: nil,
+ },
+ )
+
+ // fix sonarqube issue do not surpport + in time
+ loc := time.FixedZone("sonarqube", -60)
+
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_ISSUES_TABLE)
collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
ApiClient: data.ApiClient,
PageSize: 100,
UrlTemplate: "issues/search",
+ Input: iterator,
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
query.Set("componentKeys", fmt.Sprintf("%v",
data.Options.ProjectKey))
+ input, ok :=
reqData.Input.(*SonarqubeIssueTimeIteratorNode)
+ if !ok {
+ return nil,
errors.Default.New(fmt.Sprintf("Input to SonarqubeIssueTimeIteratorNode
failed:%+v", reqData.Input))
+ }
+
+ if input.CreatedAfter != nil {
+ query.Set("createdAfter",
GetFormatTime(input.CreatedAfter, loc))
+ }
+
+ if input.CreatedBefore != nil {
+ query.Set("createdBefore",
GetFormatTime(input.CreatedBefore, loc))
+ }
+
query.Set("p", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("ps", fmt.Sprintf("%v", reqData.Pager.Size))
return query, nil
},
- GetTotalPages: GetTotalPagesFromResponse,
+ GetTotalPages: func(res *http.Response, args
*helper.ApiCollectorArgs) (int, errors.Error) {
+ body := &SonarqubePagination{}
+ err := helper.UnmarshalResponse(res, body)
+ if err != nil {
+ return 0, err
+ }
+
+ pages := body.Paging.Total / args.PageSize
+ if body.Paging.Total%args.PageSize > 0 {
+ pages++
+ }
+
+ query := res.Request.URL.Query()
+
+ // if get more than 10000 data, that need split it
+ if pages > 100 {
+ var createdAfterUnix int64
+ var createdBeforeUnix int64
+
+ createdAfter, err :=
getTimeFromFormatTime(query.Get("createdAfter"))
+ if err != nil {
+ return 0, err
+ }
+ createdBefore, err :=
getTimeFromFormatTime(query.Get("createdBefore"))
+ if err != nil {
+ return 0, err
+ }
+
+ if createdAfter == nil {
+ createdAfterUnix = 0
+ } else {
+ createdAfterUnix = createdAfter.Unix()
+ }
+
+ if createdBefore == nil {
+ createdBeforeUnix = time.Now().Unix()
+ } else {
+ createdBeforeUnix = createdBefore.Unix()
+ }
+
+ // can not split it any more
+ if createdBeforeUnix-createdAfterUnix < 1 {
+ return 100, nil
+ }
+
+ // split it
+ MidTime :=
time.Unix((createdAfterUnix+createdBeforeUnix)/2+1, 0)
+
+ // left part
+ iterator.Push(&SonarqubeIssueTimeIteratorNode{
+ CreatedAfter: createdAfter,
+ CreatedBefore: &MidTime,
+ })
+
+ // right part
+ iterator.Push(&SonarqubeIssueTimeIteratorNode{
+ CreatedAfter: &MidTime,
+ CreatedBefore: createdBefore,
+ })
+
+ logger.Info("split [%s][%s] by mid [%s] for it
has pages:[%d] and total:[%d]",
+ query.Get("createdAfter"),
query.Get("createdBefore"), GetFormatTime(&MidTime, loc), pages,
body.Paging.Total)
+
+ return 0, nil
+ } else {
+ logger.Info("[%s][%s] has pages:[%d] and
total:[%d]",
+ query.Get("createdAfter"),
query.Get("createdBefore"), pages, body.Paging.Total)
+
+ return pages, nil
+ }
+ },
+
ResponseParser: func(res *http.Response) ([]json.RawMessage,
errors.Error) {
var resData struct {
Data []json.RawMessage `json:"issues"`
@@ -71,6 +173,7 @@ func CollectIssues(taskCtx plugin.SubTaskContext) (err
errors.Error) {
Please recollect this project:
%s`, data.Options.ProjectKey))
}
}
+
return resData.Data, nil
},
})
@@ -78,6 +181,7 @@ func CollectIssues(taskCtx plugin.SubTaskContext) (err
errors.Error) {
return err
}
return collector.Execute()
+
}
var CollectIssuesMeta = plugin.SubTaskMeta{
@@ -87,3 +191,31 @@ var CollectIssuesMeta = plugin.SubTaskMeta{
Description: "Collect issues data from Sonarqube api",
DomainTypes: []string{plugin.DOMAIN_TYPE_CODE_QUALITY},
}
+
+func GetFormatTime(t *time.Time, loc *time.Location) string {
+ if t == nil {
+ return ""
+ }
+ if loc == nil {
+ return t.Format("2006-01-02T15:04:05-0700")
+ }
+ return t.In(loc).Format("2006-01-02T15:04:05-0700")
+}
+
+func getTimeFromFormatTime(formatTime string) (*time.Time, errors.Error) {
+ if formatTime == "" {
+ return nil, nil
+ }
+
+ if len(formatTime) < 20 {
+ return nil, errors.Default.New(fmt.Sprintf("formatTime [%s] is
too short ", formatTime))
+ }
+
+ t, err := time.Parse("2006-01-02T15:04:05-0700", formatTime)
+
+ if err != nil {
+ return nil, errors.Default.New(fmt.Sprintf("Failed to Parse the
time from [%s]:%s", formatTime, err.Error()))
+ }
+
+ return &t, nil
+}