klesh commented on code in PR #4248:
URL: 
https://github.com/apache/incubator-devlake/pull/4248#discussion_r1149960500


##########
backend/plugins/pagerduty/tasks/incidents_collector.go:
##########
@@ -18,37 +18,137 @@ limitations under the License.
 package tasks
 
 import (
+       "encoding/json"
+       "fmt"
+       "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/plugin"
-       helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
-       "github.com/apache/incubator-devlake/helpers/pluginhelper/tap"
+       "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
        "github.com/apache/incubator-devlake/plugins/pagerduty/models"
+       "net/http"
+       "net/url"
+       "reflect"
+       "time"
 )
 
 const RAW_INCIDENTS_TABLE = "pagerduty_incidents"
 
 var _ plugin.SubTaskEntryPoint = CollectIncidents
 
+type (
+       pagingInfo struct {
+               Limit  *int  `json:"limit"`
+               Offset *int  `json:"offset"`
+               Total  *int  `json:"total"`
+               More   *bool `json:"more"`
+       }
+       collectedIncidents struct {
+               pagingInfo
+               Incidents []json.RawMessage `json:"incidents"`
+       }
+
+       collectedIncident struct {
+               pagingInfo
+               Incident json.RawMessage `json:"incident"`
+       }
+       simplifiedRawIncident struct {
+               IncidentNumber int       `json:"incident_number"`
+               CreatedAt      time.Time `json:"created_at"`
+       }
+)
+
 func CollectIncidents(taskCtx plugin.SubTaskContext) errors.Error {
        data := taskCtx.GetData().(*PagerDutyTaskData)
-       collector, err := tap.NewTapCollector(
-               &tap.CollectorArgs[tap.SingerTapStream]{
-                       RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
-                               Ctx:   taskCtx,
-                               Table: RAW_INCIDENTS_TABLE,
-                               Params: models.PagerDutyParams{
-                                       Stream:       models.IncidentStream,
-                                       ConnectionId: data.Options.ConnectionId,
+       db := taskCtx.GetDal()
+       args := api.RawDataSubTaskArgs{
+               Ctx: taskCtx,
+               Params: PagerDutyParams{
+                       ConnectionId: data.Options.ConnectionId,
+               },
+               Table: RAW_INCIDENTS_TABLE,
+       }
+       collector, err := 
api.NewStatefulApiCollectorForFinalizableEntity(api.FinalizableApiCollectorArgs{
+               RawDataSubTaskArgs: args,
+               ApiClient:          data.Client,
+               TimeAfter:          data.TimeAfter,
+               CollectNewRecordsByList: api.FinalizableApiCollectorListArgs{
+                       PageSize: 1,
+                       GetCreated: func(item json.RawMessage) (time.Time, 
errors.Error) {
+                               incident := &simplifiedRawIncident{}
+                               err := json.Unmarshal(item, incident)
+                               if err != nil {
+                                       return time.Time{}, 
errors.BadInput.Wrap(err, "failed to unmarshal incident")
+                               }
+                               return incident.CreatedAt, nil
+                       },
+                       FinalizableApiCollectorCommonArgs: 
api.FinalizableApiCollectorCommonArgs{
+                               UrlTemplate: "incidents",
+                               Query: func(reqData *api.RequestData, 
createdAfter *time.Time) (url.Values, errors.Error) {
+                                       query := url.Values{}
+                                       if createdAfter != nil {
+                                               now := time.Now()
+                                               if 
now.Sub(*createdAfter).Seconds() > 180*24*time.Hour.Seconds() {
+                                                       // beyond 6 months 
Pagerduty API will just return nothing, so need to query for 'all' instead
+                                                       query.Set("date_range", 
"all")
+                                               } else {
+                                                       query.Set("since", 
data.TimeAfter.String())
+                                               }
+                                       } else {
+                                               query.Set("date_range", "all")
+                                       }
+                                       query.Set("sort_by", "created_at:asc")
+                                       query.Set("limit", fmt.Sprintf("%d", 
reqData.Pager.Size))
+                                       query.Set("offset", fmt.Sprintf("%d", 
reqData.Pager.Page))
+                                       query.Set("total", "true")
+                                       return query, nil
+                               },
+                               ResponseParser: func(res *http.Response) 
([]json.RawMessage, errors.Error) {
+                                       rawResult := collectedIncidents{}
+                                       err := api.UnmarshalResponse(res, 
&rawResult)
+                                       return rawResult.Incidents, err
+                               },
+                       },
+                       GetNextPageCustomData: func(prevReqData 
*api.RequestData, prevPageResponse *http.Response) (interface{}, errors.Error) {
+                               // not sure this is even necessary because the 
framework seems to auto-detect when to stop querying for the next page

Review Comment:
   It was restored in https://github.com/apache/incubator-devlake/pull/4626, 
but is yet to be tested.
   I will update the PR today, so you may merge it first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to