This is an automated email from the ASF dual-hosted git repository.

mappjzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 5443b7745 Sonarqube fix (#4836)
5443b7745 is described below

commit 5443b77453b80ff50ae499445844611ef03354d5
Author: mappjzc <[email protected]>
AuthorDate: Mon Apr 3 21:03:34 2023 +0800

    Sonarqube fix (#4836)
    
    * fix: fix cron
    
    Fix cron use error blueprint to run pipelines.
    
    Nddtfjiang <[email protected]>
    
    * fix: fix sonarqube 10000 limit
    
    Support sonarqube collect more than 10000 issues.
    
    Nddtfjiang <[email protected]>
---
 backend/helpers/pluginhelper/api/iterator.go       |  16 ++-
 backend/helpers/pluginhelper/api/queue.go          |  74 ++++++++++--
 backend/helpers/pluginhelper/api/queue_test.go     |  61 ++++++++--
 .../plugins/sonarqube/tasks/issues_collector.go    | 134 ++++++++++++++++++++-
 4 files changed, 259 insertions(+), 26 deletions(-)

diff --git a/backend/helpers/pluginhelper/api/iterator.go 
b/backend/helpers/pluginhelper/api/iterator.go
index 7e52b58d7..37ffd2b85 100644
--- a/backend/helpers/pluginhelper/api/iterator.go
+++ b/backend/helpers/pluginhelper/api/iterator.go
@@ -18,10 +18,11 @@ limitations under the License.
 package api
 
 import (
-       "github.com/apache/incubator-devlake/core/dal"
-       "github.com/apache/incubator-devlake/core/errors"
        "reflect"
        "time"
+
+       "github.com/apache/incubator-devlake/core/dal"
+       "github.com/apache/incubator-devlake/core/errors"
 )
 
 // Iterator FIXME ...
@@ -155,12 +156,17 @@ func (q *QueueIterator) HasNext() bool {
 
 // Fetch current item
 func (q *QueueIterator) Fetch() (interface{}, errors.Error) {
-       return q.queue.PullWithOutLock(), nil
+       node := q.queue.Pull()
+       if node == nil {
+               return nil, nil
+       } else {
+               return node.Data(), nil
+       }
 }
 
 // Push a data into queue
-func (q *QueueIterator) Push(data QueueNode) {
-       q.queue.PushWithoutLock(data)
+func (q *QueueIterator) Push(data interface{}) {
+       q.queue.Push(NewQueueIteratorNode(data))
 }
 
 // Close releases resources
diff --git a/backend/helpers/pluginhelper/api/queue.go 
b/backend/helpers/pluginhelper/api/queue.go
index be4cdb6ad..890d24d9e 100644
--- a/backend/helpers/pluginhelper/api/queue.go
+++ b/backend/helpers/pluginhelper/api/queue.go
@@ -19,7 +19,7 @@ package api
 
 import (
        "sync"
-       "sync/atomic"
+       "time"
 )
 
 // QueueNode represents a node in the queue
@@ -31,33 +31,62 @@ type QueueNode interface {
 
 // Queue represetns a queue
 type Queue struct {
-       count int64
-       head  QueueNode
-       tail  QueueNode
-       mux   sync.Mutex
+       count   int64
+       head    QueueNode
+       tail    QueueNode
+       mux     sync.Mutex
+       working int64 // working count
+}
+
+// reduce working count
+func (q *Queue) Finish(count int64) {
+       q.mux.Lock()
+       defer q.mux.Unlock()
+
+       q.working -= count
 }
 
 // Push add a node to queue
 func (q *Queue) Push(node QueueNode) {
        q.mux.Lock()
        defer q.mux.Unlock()
+
        q.PushWithoutLock(node)
 }
 
 // Pull get a node from queue
-func (q *Queue) Pull(add *int64) QueueNode {
+// it will add the working count and blocked when there are no node on queue 
but working count not zero
+func (q *Queue) Pull() QueueNode {
        q.mux.Lock()
        defer q.mux.Unlock()
-
        node := q.PullWithOutLock()
-
-       if node == nil {
+       if node != nil {
+               return node
+       } else {
                return nil
        }
-       if add != nil {
-               atomic.AddInt64(add, 1)
+}
+
+func (q *Queue) PullWithWorkingBlock() QueueNode {
+       q.mux.Lock()
+       defer q.mux.Unlock()
+
+       for {
+               node := q.PullWithOutLock()
+               if node != nil {
+                       q.working++
+
+                       return node
+               } else if q.working > 0 {
+                       q.mux.Unlock()
+
+                       time.Sleep(time.Second)
+
+                       q.mux.Lock()
+               } else {
+                       return nil
+               }
        }
-       return node
 }
 
 // PushWithoutLock is no lock mode of Push
@@ -90,6 +119,7 @@ func (q *Queue) PullWithOutLock() QueueNode {
        } else {
                q.count = 0
        }
+
        return node
 }
 
@@ -104,6 +134,7 @@ func (q *Queue) CleanWithOutLock() {
 func (q *Queue) Clean() {
        q.mux.Lock()
        defer q.mux.Unlock()
+
        q.CleanWithOutLock()
 }
 
@@ -116,9 +147,28 @@ func (q *Queue) GetCountWithOutLock() int64 {
 func (q *Queue) GetCount() int64 {
        q.mux.Lock()
        defer q.mux.Unlock()
+
        return q.count
 }
 
+// GetCount get the node count in query and only return zero when working zero
+func (q *Queue) GetCountWithWorkingBlock() int64 {
+       q.mux.Lock()
+       defer q.mux.Unlock()
+
+       for {
+               if q.count == 0 && q.working > 0 {
+                       q.mux.Unlock()
+
+                       time.Sleep(time.Second)
+
+                       q.mux.Lock()
+               } else {
+                       return q.count
+               }
+       }
+}
+
 // NewQueue create and init a new Queue
 func NewQueue() *Queue {
        return &Queue{
diff --git a/backend/helpers/pluginhelper/api/queue_test.go 
b/backend/helpers/pluginhelper/api/queue_test.go
index 7da9117e7..3e0baf167 100644
--- a/backend/helpers/pluginhelper/api/queue_test.go
+++ b/backend/helpers/pluginhelper/api/queue_test.go
@@ -18,23 +18,68 @@ limitations under the License.
 package api
 
 import (
-       "github.com/stretchr/testify/require"
        "testing"
+       "time"
+
+       "github.com/stretchr/testify/require"
 )
 
-func TestQueue(t *testing.T) {
+func TestQueueIterator(t *testing.T) {
        it := NewQueueIterator()
-       it.Push(NewQueueIteratorNode("a"))
-       it.Push(NewQueueIteratorNode("b"))
+       it.Push("a")
+       it.Push("b")
        require.True(t, it.HasNext())
        folderRaw, err := it.Fetch()
        require.NoError(t, err)
-       node := folderRaw.(*QueueIteratorNode)
-       require.Equal(t, "a", node.Data())
+       data := folderRaw.(string)
+       require.Equal(t, "a", data)
        require.True(t, it.HasNext())
        folderRaw, err = it.Fetch()
        require.NoError(t, err)
-       node = folderRaw.(*QueueIteratorNode)
-       require.Equal(t, "b", node.Data())
+       data = folderRaw.(string)
+       require.Equal(t, "b", data)
        require.False(t, it.HasNext())
 }
+
+func TestQueue(t *testing.T) {
+       q := NewQueue()
+       q.Push(NewQueueIteratorNode("a"))
+       q.Push(NewQueueIteratorNode("b"))
+
+       require.Equal(t, q.GetCount(), int64(2))
+       folderRaw := q.PullWithWorkingBlock()
+       data, ok := folderRaw.Data().(string)
+       require.True(t, ok)
+       require.Equal(t, "a", data)
+       require.Equal(t, q.GetCount(), int64(1))
+       folderRaw = q.PullWithWorkingBlock()
+       data, ok = folderRaw.Data().(string)
+       require.True(t, ok)
+       require.Equal(t, "b", data)
+       require.Equal(t, q.GetCount(), int64(0))
+
+       empty := false
+       waited := false
+       go func() {
+               require.Equal(t, q.GetCountWithWorkingBlock(), int64(1))
+               data, ok := q.PullWithWorkingBlock().Data().(string)
+               require.True(t, ok)
+               require.Equal(t, data, "c")
+               dataNode := q.PullWithWorkingBlock()
+               require.Equal(t, dataNode, nil)
+               empty = true
+       }()
+
+       go func() {
+               time.Sleep(100 * time.Millisecond)
+               q.Push(NewQueueIteratorNode("c"))
+               waited = true
+               q.Finish(3)
+       }()
+
+       for !empty {
+               time.Sleep(time.Millisecond)
+       }
+
+       require.True(t, waited)
+}
diff --git a/backend/plugins/sonarqube/tasks/issues_collector.go 
b/backend/plugins/sonarqube/tasks/issues_collector.go
index 7e3292a3a..d210191ed 100644
--- a/backend/plugins/sonarqube/tasks/issues_collector.go
+++ b/backend/plugins/sonarqube/tasks/issues_collector.go
@@ -22,6 +22,7 @@ import (
        "fmt"
        "net/http"
        "net/url"
+       "time"
 
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/plugin"
@@ -32,24 +33,125 @@ const RAW_ISSUES_TABLE = "sonarqube_api_issues"
 
 var _ plugin.SubTaskEntryPoint = CollectIssues
 
+type SonarqubeIssueTimeIteratorNode struct {
+       CreatedAfter  *time.Time
+       CreatedBefore *time.Time
+}
+
 func CollectIssues(taskCtx plugin.SubTaskContext) (err errors.Error) {
        logger := taskCtx.GetLogger()
        logger.Info("collect issues")
 
+       iterator := helper.NewQueueIterator()
+       iterator.Push(
+               &SonarqubeIssueTimeIteratorNode{
+                       CreatedAfter:  nil,
+                       CreatedBefore: nil,
+               },
+       )
+
+       // fix sonarqube issue do not surpport + in time
+       loc := time.FixedZone("sonarqube", -60)
+
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_ISSUES_TABLE)
        collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
                RawDataSubTaskArgs: *rawDataSubTaskArgs,
                ApiClient:          data.ApiClient,
                PageSize:           100,
                UrlTemplate:        "issues/search",
+               Input:              iterator,
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
                        query.Set("componentKeys", fmt.Sprintf("%v", 
data.Options.ProjectKey))
+                       input, ok := 
reqData.Input.(*SonarqubeIssueTimeIteratorNode)
+                       if !ok {
+                               return nil, 
errors.Default.New(fmt.Sprintf("Input to SonarqubeIssueTimeIteratorNode 
failed:%+v", reqData.Input))
+                       }
+
+                       if input.CreatedAfter != nil {
+                               query.Set("createdAfter", 
GetFormatTime(input.CreatedAfter, loc))
+                       }
+
+                       if input.CreatedBefore != nil {
+                               query.Set("createdBefore", 
GetFormatTime(input.CreatedBefore, loc))
+                       }
+
                        query.Set("p", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("ps", fmt.Sprintf("%v", reqData.Pager.Size))
                        return query, nil
                },
-               GetTotalPages: GetTotalPagesFromResponse,
+               GetTotalPages: func(res *http.Response, args 
*helper.ApiCollectorArgs) (int, errors.Error) {
+                       body := &SonarqubePagination{}
+                       err := helper.UnmarshalResponse(res, body)
+                       if err != nil {
+                               return 0, err
+                       }
+
+                       pages := body.Paging.Total / args.PageSize
+                       if body.Paging.Total%args.PageSize > 0 {
+                               pages++
+                       }
+
+                       query := res.Request.URL.Query()
+
+                       // if get more than 10000 data, that need split it
+                       if pages > 100 {
+                               var createdAfterUnix int64
+                               var createdBeforeUnix int64
+
+                               createdAfter, err := 
getTimeFromFormatTime(query.Get("createdAfter"))
+                               if err != nil {
+                                       return 0, err
+                               }
+                               createdBefore, err := 
getTimeFromFormatTime(query.Get("createdBefore"))
+                               if err != nil {
+                                       return 0, err
+                               }
+
+                               if createdAfter == nil {
+                                       createdAfterUnix = 0
+                               } else {
+                                       createdAfterUnix = createdAfter.Unix()
+                               }
+
+                               if createdBefore == nil {
+                                       createdBeforeUnix = time.Now().Unix()
+                               } else {
+                                       createdBeforeUnix = createdBefore.Unix()
+                               }
+
+                               // can not split it any more
+                               if createdBeforeUnix-createdAfterUnix < 1 {
+                                       return 100, nil
+                               }
+
+                               // split it
+                               MidTime := 
time.Unix((createdAfterUnix+createdBeforeUnix)/2+1, 0)
+
+                               // left part
+                               iterator.Push(&SonarqubeIssueTimeIteratorNode{
+                                       CreatedAfter:  createdAfter,
+                                       CreatedBefore: &MidTime,
+                               })
+
+                               // right part
+                               iterator.Push(&SonarqubeIssueTimeIteratorNode{
+                                       CreatedAfter:  &MidTime,
+                                       CreatedBefore: createdBefore,
+                               })
+
+                               logger.Info("split [%s][%s] by mid [%s] for it 
has pages:[%d] and total:[%d]",
+                                       query.Get("createdAfter"), 
query.Get("createdBefore"), GetFormatTime(&MidTime, loc), pages, 
body.Paging.Total)
+
+                               return 0, nil
+                       } else {
+                               logger.Info("[%s][%s] has pages:[%d] and 
total:[%d]",
+                                       query.Get("createdAfter"), 
query.Get("createdBefore"), pages, body.Paging.Total)
+
+                               return pages, nil
+                       }
+               },
+
                ResponseParser: func(res *http.Response) ([]json.RawMessage, 
errors.Error) {
                        var resData struct {
                                Data []json.RawMessage `json:"issues"`
@@ -71,6 +173,7 @@ func CollectIssues(taskCtx plugin.SubTaskContext) (err 
errors.Error) {
                                                Please recollect this project: 
%s`, data.Options.ProjectKey))
                                }
                        }
+
                        return resData.Data, nil
                },
        })
@@ -78,6 +181,7 @@ func CollectIssues(taskCtx plugin.SubTaskContext) (err 
errors.Error) {
                return err
        }
        return collector.Execute()
+
 }
 
 var CollectIssuesMeta = plugin.SubTaskMeta{
@@ -87,3 +191,31 @@ var CollectIssuesMeta = plugin.SubTaskMeta{
        Description:      "Collect issues data from Sonarqube api",
        DomainTypes:      []string{plugin.DOMAIN_TYPE_CODE_QUALITY},
 }
+
+func GetFormatTime(t *time.Time, loc *time.Location) string {
+       if t == nil {
+               return ""
+       }
+       if loc == nil {
+               return t.Format("2006-01-02T15:04:05-0700")
+       }
+       return t.In(loc).Format("2006-01-02T15:04:05-0700")
+}
+
+func getTimeFromFormatTime(formatTime string) (*time.Time, errors.Error) {
+       if formatTime == "" {
+               return nil, nil
+       }
+
+       if len(formatTime) < 20 {
+               return nil, errors.Default.New(fmt.Sprintf("formatTime [%s] is 
too short ", formatTime))
+       }
+
+       t, err := time.Parse("2006-01-02T15:04:05-0700", formatTime)
+
+       if err != nil {
+               return nil, errors.Default.New(fmt.Sprintf("Failed to Parse the 
time from [%s]:%s", formatTime, err.Error()))
+       }
+
+       return &t, nil
+}

Reply via email to