This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new aacc38d5b [feat-4220][PagerDuty plugin]: Rework Pagerduty to not use 
Singer tap and support Blueprint v200 (#4248)
aacc38d5b is described below

commit aacc38d5b09bb7e94f1ae55f90645df9000dd4e9
Author: Keon Amini <[email protected]>
AuthorDate: Wed Mar 29 08:41:07 2023 -0500

    [feat-4220][PagerDuty plugin]: Rework Pagerduty to not use Singer tap and 
support Blueprint v200 (#4248)
    
    * feat: Rework Pagerduty plugin to use direct API instead of Singer spec 
due to licensing issues
    
    * refactor: use scopeHelper and TransformationRuleHelper libs for 
Scope/TxRules API
    
    * test: adjust IT client for new helper APIs
    
    * fix: better error messages for remote errors on startup
    
    * fix: GetNextPageCustomData removed from incident collector per PR review
    
    * fix: PR review changes: GetTotalPagers implemented
    
    * fix: BuildInputIterator fixed for incidents_collector
    
    * fix: swagger doc fix
    
    * feat: add timeout to IT client to prevent broken tests from dragging out 
+ fixed transformation_rules endpoints for remote plugins
    
    * fix: swagger doc for python updated with new transformation rules 
endpoints
---
 .../pluginhelper/api/api_collector_with_state.go   |   4 +-
 backend/plugins/pagerduty/api/blueprint.go         |   7 +-
 backend/plugins/pagerduty/api/blueprint_v200.go    | 144 ++++++++++
 backend/plugins/pagerduty/api/init.go              |  14 +
 backend/plugins/pagerduty/api/remote.go            | 314 +++++++++++++++++++++
 backend/plugins/pagerduty/api/scope.go             |  91 ++++++
 .../plugins/pagerduty/api/transformation_rules.go  |  79 ++++++
 backend/plugins/pagerduty/e2e/incident_test.go     |  39 ++-
 .../e2e/raw_tables/_raw_pagerduty_incidents.csv    |   6 +-
 .../e2e/raw_tables/_tool_tapd_bug_statuses.csv     |   8 -
 .../e2e/raw_tables/_tool_tapd_story_statuses.csv   |  22 --
 .../e2e/raw_tables/_tool_tapd_workitem_types.csv   |   5 -
 .../_tool_pagerduty_assignments.csv                |   6 +-
 .../snapshot_tables/_tool_pagerduty_incidents.csv  |   6 +-
 .../snapshot_tables/_tool_pagerduty_services.csv   |   2 -
 .../e2e/snapshot_tables/_tool_pagerduty_users.csv  |   4 +-
 .../pagerduty/e2e/snapshot_tables/issue_bug.csv    |  21 --
 backend/plugins/pagerduty/impl/impl.go             |  69 +++--
 backend/plugins/pagerduty/models/config.go         |  16 +-
 .../20230123_add_connection_fields.go              |  51 ++++
 .../20230203_add_transformation_rules.go           |  50 ++++
 .../archived/transformation_rules.go}              |  20 +-
 .../pagerduty/models/migrationscripts/register.go  |   2 +
 .../models/{generated => raw}/incidents.go         |   3 +-
 .../models/{generated => raw}/notifications.go     |   3 +-
 .../models/{generated => raw}/services.go          |   3 +-
 backend/plugins/pagerduty/models/service.go        |   9 +-
 .../models/{service.go => transformation_rules.go} |  14 +-
 .../plugins/pagerduty/tasks/incidents_collector.go | 125 ++++++--
 .../plugins/pagerduty/tasks/incidents_converter.go |   3 +-
 .../plugins/pagerduty/tasks/incidents_extractor.go |   7 +-
 backend/plugins/pagerduty/tasks/task_data.go       |  62 +++-
 .../python/pydevlake/pydevlake/doc.template.json   |   4 +-
 backend/server/api/remote/register.go              |   2 +-
 backend/server/services/remote/init.go             |   5 +-
 .../server/services/remote/models/conversion.go    |   2 +-
 .../server/services/remote/plugin/default_api.go   |   4 +-
 .../server/services/remote/plugin/plugin_impl.go   |   6 +-
 backend/test/integration/helper/api.go             |  76 ++---
 backend/test/integration/helper/client.go          | 171 ++++++-----
 backend/test/integration/remote/helper.go          |   4 +-
 41 files changed, 1178 insertions(+), 305 deletions(-)

diff --git a/backend/helpers/pluginhelper/api/api_collector_with_state.go 
b/backend/helpers/pluginhelper/api/api_collector_with_state.go
index e6f727284..f35fb0419 100644
--- a/backend/helpers/pluginhelper/api/api_collector_with_state.go
+++ b/backend/helpers/pluginhelper/api/api_collector_with_state.go
@@ -210,7 +210,7 @@ func NewStatefulApiCollectorForFinalizableEntity(args 
FinalizableApiCollectorArg
                PageSize:              args.CollectNewRecordsByList.PageSize,
                Concurrency:           args.CollectNewRecordsByList.Concurrency,
                GetNextPageCustomData: 
args.CollectNewRecordsByList.GetNextPageCustomData,
-               // GetTotalPages:         
args.CollectNewRecordsByList.GetTotalPages,
+               GetTotalPages:         
args.CollectNewRecordsByList.GetTotalPages,
        })
 
        if err != nil {
@@ -276,7 +276,7 @@ type FinalizableApiCollectorListArgs struct {
        PageSize              int
        GetNextPageCustomData func(prevReqData *RequestData, prevPageResponse 
*http.Response) (interface{}, errors.Error)
        // need to consider the data missing problem: what if new data gets 
created during collection?
-       // GetTotalPages         func(res *http.Response, args 
*ApiCollectorArgs) (int, errors.Error)
+       GetTotalPages func(res *http.Response, args *ApiCollectorArgs) (int, 
errors.Error)
 }
 type FinalizableApiCollectorDetailArgs struct {
        FinalizableApiCollectorCommonArgs
diff --git a/backend/plugins/pagerduty/api/blueprint.go 
b/backend/plugins/pagerduty/api/blueprint.go
index f16cff3ed..a0235d70d 100644
--- a/backend/plugins/pagerduty/api/blueprint.go
+++ b/backend/plugins/pagerduty/api/blueprint.go
@@ -21,7 +21,8 @@ import (
        "encoding/json"
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/plugin"
-       helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+       "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+       "github.com/apache/incubator-devlake/plugins/pagerduty/models"
        "github.com/apache/incubator-devlake/plugins/pagerduty/tasks"
 )
 
@@ -34,7 +35,7 @@ func MakePipelinePlan(subtaskMetas []plugin.SubTaskMeta, 
connectionId uint64, sc
                if err != nil {
                        return nil, errors.Default.Wrap(err, "error 
unmarshalling task options")
                }
-               var transformationRules tasks.TransformationRules
+               var transformationRules models.PagerdutyTransformationRule
                if len(scopeElem.Transformation) > 0 {
                        err = 
errors.Convert(json.Unmarshal(scopeElem.Transformation, &transformationRules))
                        if err != nil {
@@ -48,7 +49,7 @@ func MakePipelinePlan(subtaskMetas []plugin.SubTaskMeta, 
connectionId uint64, sc
                        return nil, err
                }
                // subtasks
-               subtasks, err := helper.MakePipelinePlanSubtasks(subtaskMetas, 
scopeElem.Entities)
+               subtasks, err := api.MakePipelinePlanSubtasks(subtaskMetas, 
scopeElem.Entities)
                if err != nil {
                        return nil, err
                }
diff --git a/backend/plugins/pagerduty/api/blueprint_v200.go 
b/backend/plugins/pagerduty/api/blueprint_v200.go
new file mode 100644
index 000000000..af7a077e3
--- /dev/null
+++ b/backend/plugins/pagerduty/api/blueprint_v200.go
@@ -0,0 +1,144 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package api
+
+import (
+       "fmt"
+       "github.com/apache/incubator-devlake/core/dal"
+       "github.com/apache/incubator-devlake/core/errors"
+       "github.com/apache/incubator-devlake/core/models/domainlayer"
+       "github.com/apache/incubator-devlake/core/models/domainlayer/devops"
+       "github.com/apache/incubator-devlake/core/models/domainlayer/didgen"
+       "github.com/apache/incubator-devlake/core/models/domainlayer/ticket"
+       "github.com/apache/incubator-devlake/core/plugin"
+       "github.com/apache/incubator-devlake/core/utils"
+       "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+       "github.com/apache/incubator-devlake/plugins/pagerduty/models"
+       "github.com/apache/incubator-devlake/plugins/pagerduty/tasks"
+       "github.com/go-playground/validator/v10"
+       "time"
+)
+
+func MakeDataSourcePipelinePlanV200(subtaskMetas []plugin.SubTaskMeta, 
connectionId uint64, bpScopes []*plugin.BlueprintScopeV200, syncPolicy 
*plugin.BlueprintSyncPolicy,
+) (plugin.PipelinePlan, []plugin.Scope, errors.Error) {
+       connHelper := api.NewConnectionHelper(basicRes, validator.New())
+       // get the connection info for url
+       connection := &models.PagerDutyConnection{}
+       err := connHelper.FirstById(connection, connectionId)
+       if err != nil {
+               return nil, nil, err
+       }
+
+       plan := make(plugin.PipelinePlan, len(bpScopes))
+       plan, err = makeDataSourcePipelinePlanV200(subtaskMetas, plan, 
bpScopes, connection, syncPolicy)
+       if err != nil {
+               return nil, nil, err
+       }
+       scopes, err := makeScopesV200(bpScopes, connection)
+       if err != nil {
+               return nil, nil, err
+       }
+
+       return plan, scopes, nil
+}
+
+func makeDataSourcePipelinePlanV200(
+       subtaskMetas []plugin.SubTaskMeta,
+       plan plugin.PipelinePlan,
+       bpScopes []*plugin.BlueprintScopeV200,
+       connection *models.PagerDutyConnection,
+       syncPolicy *plugin.BlueprintSyncPolicy,
+) (plugin.PipelinePlan, errors.Error) {
+       var err errors.Error
+       for i, bpScope := range bpScopes {
+               service := &models.Service{}
+               // get repo from db
+               err = basicRes.GetDal().First(service, dal.Where(`connection_id 
= ? AND id = ?`, connection.ID, bpScope.Id))
+               if err != nil {
+                       return nil, errors.Default.Wrap(err, fmt.Sprintf("fail 
to find service %s", bpScope.Id))
+               }
+               transformationRule := &models.PagerdutyTransformationRule{}
+               // get transformation rules from db
+               db := basicRes.GetDal()
+               err = db.First(transformationRule, dal.Where(`id = ?`, 
service.TransformationRuleId))
+               if err != nil && !db.IsErrorNotFound(err) {
+                       return nil, err
+               }
+               // construct task options for pagerduty
+               op := &tasks.PagerDutyOptions{
+                       ConnectionId: service.ConnectionId,
+                       ServiceId:    service.Id,
+                       ServiceName:  service.Name,
+               }
+               if syncPolicy.TimeAfter != nil {
+                       op.TimeAfter = syncPolicy.TimeAfter.Format(time.RFC3339)
+               }
+               var options map[string]any
+               options, err = tasks.EncodeTaskOptions(op)
+               if err != nil {
+                       return nil, err
+               }
+               var subtasks []string
+               subtasks, err = api.MakePipelinePlanSubtasks(subtaskMetas, 
bpScope.Entities)
+               if err != nil {
+                       return nil, err
+               }
+               stage := []*plugin.PipelineTask{
+                       {
+                               Plugin:   "pagerduty",
+                               Subtasks: subtasks,
+                               Options:  options,
+                       },
+               }
+               plan[i] = stage
+       }
+       return plan, nil
+}
+
+func makeScopesV200(bpScopes []*plugin.BlueprintScopeV200, connection 
*models.PagerDutyConnection) ([]plugin.Scope, errors.Error) {
+       scopes := make([]plugin.Scope, 0)
+       for _, bpScope := range bpScopes {
+               service := &models.Service{}
+               // get repo from db
+               err := basicRes.GetDal().First(service, 
dal.Where(`connection_id = ? AND id = ?`, connection.ID, bpScope.Id))
+               if err != nil {
+                       return nil, errors.Default.Wrap(err, fmt.Sprintf("fail 
to find service: %s", bpScope.Id))
+               }
+               // add cicd_scope to scopes
+               if utils.StringsContains(bpScope.Entities, 
plugin.DOMAIN_TYPE_CICD) {
+                       scopeCICD := &devops.CicdScope{
+                               DomainEntity: domainlayer.DomainEntity{
+                                       Id: 
didgen.NewDomainIdGenerator(&models.Service{}).Generate(connection.ID, 
service.Id),
+                               },
+                               Name: service.Name,
+                       }
+                       scopes = append(scopes, scopeCICD)
+               }
+               // add board to scopes
+               if utils.StringsContains(bpScope.Entities, 
plugin.DOMAIN_TYPE_TICKET) {
+                       scopeTicket := &ticket.Board{
+                               DomainEntity: domainlayer.DomainEntity{
+                                       Id: 
didgen.NewDomainIdGenerator(&models.Incident{}).Generate(connection.ID, 
service.Id),
+                               },
+                               Name: service.Name,
+                       }
+                       scopes = append(scopes, scopeTicket)
+               }
+       }
+       return scopes, nil
+}
diff --git a/backend/plugins/pagerduty/api/init.go 
b/backend/plugins/pagerduty/api/init.go
index d92c2b334..52568e174 100644
--- a/backend/plugins/pagerduty/api/init.go
+++ b/backend/plugins/pagerduty/api/init.go
@@ -20,11 +20,16 @@ package api
 import (
        "github.com/apache/incubator-devlake/core/context"
        "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+       "github.com/apache/incubator-devlake/plugins/pagerduty/models"
        "github.com/go-playground/validator/v10"
 )
 
 var vld *validator.Validate
 var connectionHelper *api.ConnectionApiHelper
+
+var scopeHelper *api.ScopeApiHelper[models.PagerDutyConnection, 
models.Service, models.PagerdutyTransformationRule]
+
+var trHelper *api.TransformationRuleHelper[models.PagerdutyTransformationRule]
 var basicRes context.BasicRes
 
 func Init(br context.BasicRes) {
@@ -34,4 +39,13 @@ func Init(br context.BasicRes) {
                basicRes,
                vld,
        )
+       scopeHelper = api.NewScopeHelper[models.PagerDutyConnection, 
models.Service, models.PagerdutyTransformationRule](
+               basicRes,
+               vld,
+               connectionHelper,
+       )
+       trHelper = 
api.NewTransformationRuleHelper[models.PagerdutyTransformationRule](
+               basicRes,
+               vld,
+       )
 }
diff --git a/backend/plugins/pagerduty/api/remote.go 
b/backend/plugins/pagerduty/api/remote.go
new file mode 100644
index 000000000..4f2ef6edd
--- /dev/null
+++ b/backend/plugins/pagerduty/api/remote.go
@@ -0,0 +1,314 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package api
+
+import (
+       "context"
+       "encoding/base64"
+       "encoding/json"
+       "fmt"
+       "github.com/apache/incubator-devlake/core/errors"
+       "github.com/apache/incubator-devlake/core/plugin"
+       "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+       "github.com/apache/incubator-devlake/plugins/pagerduty/models"
+       "net/http"
+       "net/url"
+       "strconv"
+       "time"
+)
+
+type RemoteScopesChild struct {
+       Type     string      `json:"type"`
+       ParentId *string     `json:"parentId"`
+       Id       string      `json:"id"`
+       Name     string      `json:"name"`
+       Data     interface{} `json:"data"`
+}
+
+type RemoteScopesOutput struct {
+       Children      []RemoteScopesChild `json:"children"`
+       NextPageToken string              `json:"nextPageToken"`
+}
+
+type SearchRemoteScopesOutput struct {
+       Children []RemoteScopesChild `json:"children"`
+       Page     int                 `json:"page"`
+       PageSize int                 `json:"pageSize"`
+}
+
+type PageData struct {
+       Page    int `json:"page"`
+       PerPage int `json:"per_page"`
+}
+
+type TeamResponse struct {
+       Offset int  `json:"offset"`
+       Limit  int  `json:"limit"`
+       More   bool `json:"more"`
+       Total  int  `json:"total"`
+       Teams  []struct {
+               Id   string `json:"id"`
+               Name string `json:"name"`
+       } `json:"teams"`
+}
+
+type WorkspaceItem struct {
+       //Type string `json:"type"`
+       //Uuid string `json:"uuid"`
+       Slug string `json:"slug"`
+       Name string `json:"name"`
+}
+
+type ReposResponse struct {
+       Pagelen int              `json:"pagelen"`
+       Page    int              `json:"page"`
+       Size    int              `json:"size"`
+       Values  []models.Service `json:"values"`
+}
+
+type ServiceResponse struct {
+       Offset   int  `json:"offset"`
+       Limit    int  `json:"limit"`
+       More     bool `json:"more"`
+       Total    int  `json:"total"`
+       Services []struct {
+               Id                      string    `json:"id"`
+               Summary                 string    `json:"summary"`
+               Type                    string    `json:"type"`
+               Self                    string    `json:"self"`
+               HtmlUrl                 string    `json:"html_url"`
+               Name                    string    `json:"name"`
+               AutoResolveTimeout      int       `json:"auto_resolve_timeout"`
+               AcknowledgementTimeout  int       
`json:"acknowledgement_timeout"`
+               CreatedAt               time.Time `json:"created_at"`
+               Status                  string    `json:"status"`
+               AlertCreation           string    `json:"alert_creation"`
+               AlertGroupingParameters struct {
+                       Type string `json:"type"`
+               } `json:"alert_grouping_parameters"`
+               Integrations []struct {
+                       Id      string `json:"id"`
+                       Type    string `json:"type"`
+                       Summary string `json:"summary"`
+                       Self    string `json:"self"`
+                       HtmlUrl string `json:"html_url"`
+               } `json:"integrations"`
+               EscalationPolicy struct {
+                       Id      string `json:"id"`
+                       Type    string `json:"type"`
+                       Summary string `json:"summary"`
+                       Self    string `json:"self"`
+                       HtmlUrl string `json:"html_url"`
+               } `json:"escalation_policy"`
+               Teams []struct {
+                       Id      string `json:"id"`
+                       Type    string `json:"type"`
+                       Summary string `json:"summary"`
+                       Self    string `json:"self"`
+                       HtmlUrl string `json:"html_url"`
+               } `json:"teams"`
+               IncidentUrgencyRule struct {
+                       Type               string `json:"type"`
+                       DuringSupportHours struct {
+                               Type    string `json:"type"`
+                               Urgency string `json:"urgency"`
+                       } `json:"during_support_hours"`
+                       OutsideSupportHours struct {
+                               Type    string `json:"type"`
+                               Urgency string `json:"urgency"`
+                       } `json:"outside_support_hours"`
+               } `json:"incident_urgency_rule"`
+               SupportHours struct {
+                       Type       string `json:"type"`
+                       TimeZone   string `json:"time_zone"`
+                       StartTime  string `json:"start_time"`
+                       EndTime    string `json:"end_time"`
+                       DaysOfWeek []int  `json:"days_of_week"`
+               } `json:"support_hours"`
+               ScheduledActions []struct {
+                       Type string `json:"type"`
+                       At   struct {
+                               Type string `json:"type"`
+                               Name string `json:"name"`
+                       } `json:"at"`
+                       ToUrgency string `json:"to_urgency"`
+               } `json:"scheduled_actions"`
+               AutoPauseNotificationsParameters struct {
+                       Enabled bool `json:"enabled"`
+                       Timeout int  `json:"timeout"`
+               } `json:"auto_pause_notifications_parameters"`
+       }
+}
+
+const RemoteScopesPerPage int = 100
+const TypeScope string = "scope"
+const TypeGroup string = "group"
+
+//const TypeGroup string = "group"
+
+// RemoteScopes list all available scopes (services) for this connection
+// @Summary list all available scopes (services) for this connection
+// @Description list all available scopes (services) for this connection
+// @Tags plugins/pagerduty
+// @Accept application/json
+// @Param connectionId path int false "connection ID"
+// @Param groupId query string false "group ID"
+// @Param pageToken query string false "page Token"
+// @Success 200  {object} RemoteScopesOutput
+// @Failure 400  {object} shared.ApiBody "Bad Request"
+// @Failure 500  {object} shared.ApiBody "Internal Error"
+// @Router /plugins/pagerduty/connections/{connectionId}/remote-scopes [GET]
+func RemoteScopes(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, 
errors.Error) {
+       connectionId, _ := extractParam(input.Params)
+       if connectionId == 0 {
+               return nil, errors.BadInput.New("invalid connectionId")
+       }
+
+       connection := &models.PagerDutyConnection{}
+       err := connectionHelper.First(connection, input.Params)
+       if err != nil {
+               return nil, err
+       }
+
+       pageToken, ok := input.Query["pageToken"]
+       if !ok || len(pageToken) == 0 {
+               pageToken = []string{""}
+       }
+
+       pageData, err := DecodeFromPageToken(pageToken[0])
+       if err != nil {
+               return nil, errors.BadInput.New("failed to get page token")
+       }
+
+       // create api client
+       apiClient, err := api.NewApiClientFromConnection(context.TODO(), 
basicRes, connection)
+       if err != nil {
+               return nil, err
+       }
+
+       query, err := GetQueryFromPageData(pageData)
+       if err != nil {
+               return nil, err
+       }
+
+       var res *http.Response
+       outputBody := &RemoteScopesOutput{}
+       res, err = apiClient.Get("/services", query, nil)
+       if err != nil {
+               return nil, err
+       }
+       response := &ServiceResponse{}
+       err = api.UnmarshalResponse(res, response)
+       if err != nil {
+               return nil, err
+       }
+       // append service to output
+       for _, service := range response.Services {
+               child := RemoteScopesChild{
+                       Type: TypeScope,
+                       Id:   service.Id,
+                       Name: service.Name,
+                       Data: models.Service{
+                               Url:                  service.HtmlUrl,
+                               Id:                   service.Id,
+                               TransformationRuleId: 0, // this is not 
determined here
+                               Name:                 service.Name,
+                       },
+               }
+               outputBody.Children = append(outputBody.Children, child)
+       }
+
+       // check service count
+       if !response.More {
+               pageData = nil
+       }
+
+       // get the next page token
+       outputBody.NextPageToken = ""
+       if pageData != nil {
+               pageData.Page += 1
+               outputBody.NextPageToken, err = EncodeToPageToken(pageData)
+               if err != nil {
+                       return nil, err
+               }
+       }
+
+       return &plugin.ApiResourceOutput{Body: outputBody, Status: 
http.StatusOK}, nil
+}
+
+// SearchRemoteScopes use the Search API and only return project
+// @Summary use the Search API and only return project
+// @Description use the Search API and only return project
+// @Tags plugins/pagerduty
+// @Accept application/json
+// @Param connectionId path int false "connection ID"
+// @Param search query string false "search"
+// @Param page query int false "page number"
+// @Param pageSize query int false "page size per page"
+// @Success 200  {object} SearchRemoteScopesOutput
+// @Failure 400  {object} shared.ApiBody "Bad Request"
+// @Failure 500  {object} shared.ApiBody "Internal Error"
+// @Router /plugins/pagerduty/connections/{connectionId}/search-remote-scopes 
[GET]
+func SearchRemoteScopes(input *plugin.ApiResourceInput) 
(*plugin.ApiResourceOutput, errors.Error) {
+       // Not supported
+       return &plugin.ApiResourceOutput{Body: nil, Status: 
http.StatusMethodNotAllowed}, nil
+}
+
+func EncodeToPageToken(pageData *PageData) (string, errors.Error) {
+       // Marshal json
+       pageTokenDecode, err := json.Marshal(pageData)
+       if err != nil {
+               return "", errors.Default.Wrap(err, fmt.Sprintf("Marshal 
pageToken failed %+v", pageData))
+       }
+       // Encode pageToken Base64
+       return base64.StdEncoding.EncodeToString(pageTokenDecode), nil
+}
+
+func DecodeFromPageToken(pageToken string) (*PageData, errors.Error) {
+       if pageToken == "" {
+               return &PageData{
+                       Page:    0,
+                       PerPage: RemoteScopesPerPage,
+               }, nil
+       }
+       // Decode pageToken Base64
+       pageTokenDecode, err := base64.StdEncoding.DecodeString(pageToken)
+       if err != nil {
+               return nil, errors.Default.Wrap(err, fmt.Sprintf("decode 
pageToken failed %s", pageToken))
+       }
+       // Unmarshal json
+       pt := &PageData{}
+       err = json.Unmarshal(pageTokenDecode, pt)
+       if err != nil {
+               return nil, errors.Default.Wrap(err, fmt.Sprintf("json 
Unmarshal pageTokenDecode failed %s", pageTokenDecode))
+       }
+       return pt, nil
+}
+
+func GetQueryFromPageData(pageData *PageData) (url.Values, errors.Error) {
+       query := url.Values{}
+       query.Set("offset", fmt.Sprintf("%v", pageData.Page))
+       query.Set("limit", fmt.Sprintf("%v", pageData.PerPage))
+       return query, nil
+}
+
+func extractParam(params map[string]string) (uint64, uint64) {
+       connectionId, _ := strconv.ParseUint(params["connectionId"], 10, 64)
+       serviceId, _ := strconv.ParseUint(params["serviceId"], 10, 64)
+       return connectionId, serviceId
+}
diff --git a/backend/plugins/pagerduty/api/scope.go 
b/backend/plugins/pagerduty/api/scope.go
new file mode 100644
index 000000000..c4e28bca5
--- /dev/null
+++ b/backend/plugins/pagerduty/api/scope.go
@@ -0,0 +1,91 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package api
+
+import (
+       "github.com/apache/incubator-devlake/core/errors"
+       "github.com/apache/incubator-devlake/core/plugin"
+       "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+       "github.com/apache/incubator-devlake/plugins/pagerduty/models"
+)
+
+// Needed for Swagger doc gen
+type (
+       ScopeReq api.ScopeReq[models.Service]
+       ScopeRes api.ScopeRes[models.Service]
+)
+
+// PutScope create or update pagerduty repo
+// @Summary create or update pagerduty repo
+// @Description Create or update pagerduty repo
+// @Tags plugins/pagerduty
+// @Accept application/json
+// @Param connectionId path int true "connection ID"
+// @Param scope body ScopeReq true "json"
+// @Success 200  {object} []models.Service
+// @Failure 400  {object} shared.ApiBody "Bad Request"
+// @Failure 500  {object} shared.ApiBody "Internal Error"
+// @Router /plugins/pagerduty/connections/{connectionId}/scopes [PUT]
+func PutScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, 
errors.Error) {
+       return scopeHelper.Put(input)
+}
+
+// UpdateScope patch to pagerduty repo
+// @Summary patch to pagerduty repo
+// @Description patch to pagerduty repo
+// @Tags plugins/pagerduty
+// @Accept application/json
+// @Param connectionId path int true "connection ID"
+// @Param repoId path int true "repo ID"
+// @Param scope body models.Service true "json"
+// @Success 200  {object} models.Service
+// @Failure 400  {object} shared.ApiBody "Bad Request"
+// @Failure 500  {object} shared.ApiBody "Internal Error"
+// @Router /plugins/pagerduty/connections/{connectionId}/scopes/{repoId} 
[PATCH]
+func UpdateScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, 
errors.Error) {
+       return scopeHelper.Update(input, "id")
+}
+
+// GetScopeList get PagerDuty repos
+// @Summary get PagerDuty repos
+// @Description get PagerDuty repos
+// @Tags plugins/pagerduty
+// @Param connectionId path int true "connection ID"
+// @Param pageSize query int false "page size, default 50"
+// @Param page query int false "page size, default 1"
+// @Success 200  {object} []ScopeRes
+// @Failure 400  {object} shared.ApiBody "Bad Request"
+// @Failure 500  {object} shared.ApiBody "Internal Error"
+// @Router /plugins/pagerduty/connections/{connectionId}/scopes/ [GET]
+func GetScopeList(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, 
errors.Error) {
+       return scopeHelper.GetScopeList(input)
+}
+
+// GetScope get one PagerDuty repo
+// @Summary get one PagerDuty repo
+// @Description get one PagerDuty repo
+// @Tags plugins/pagerduty
+// @Param connectionId path int true "connection ID"
+// @Param repoId path int true "repo ID"
+// @Success 200  {object} ScopeRes
+// @Failure 400  {object} shared.ApiBody "Bad Request"
+// @Failure 500  {object} shared.ApiBody "Internal Error"
+// @Router /plugins/pagerduty/connections/{connectionId}/scopes/{repoId} [GET]
+func GetScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, 
errors.Error) {
+       return scopeHelper.GetScope(input, "id")
+}
diff --git a/backend/plugins/pagerduty/api/transformation_rules.go 
b/backend/plugins/pagerduty/api/transformation_rules.go
new file mode 100644
index 000000000..142de4f11
--- /dev/null
+++ b/backend/plugins/pagerduty/api/transformation_rules.go
@@ -0,0 +1,79 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package api
+
+import (
+       "github.com/apache/incubator-devlake/core/errors"
+       "github.com/apache/incubator-devlake/core/plugin"
+)
+
+// CreateTransformationRule create transformation rule for PagerDuty
+// @Summary create transformation rule for PagerDuty
+// @Description create transformation rule for PagerDuty
+// @Tags plugins/pagerduty
+// @Accept application/json
+// @Param transformationRule body models.PagerdutyTransformationRule true 
"transformation rule"
+// @Success 200  {object} models.PagerdutyTransformationRule
+// @Failure 400  {object} shared.ApiBody "Bad Request"
+// @Failure 500  {object} shared.ApiBody "Internal Error"
+// @Router /plugins/pagerduty/:connectionId/transformation_rules [POST]
+func CreateTransformationRule(input *plugin.ApiResourceInput) 
(*plugin.ApiResourceOutput, errors.Error) {
+       return trHelper.Create(input)
+}
+
+// UpdateTransformationRule update transformation rule for Github
+// @Summary update transformation rule for Github
+// @Description update transformation rule for Github
+// @Tags plugins/pagerduty
+// @Accept application/json
+// @Param id path int true "id"
+// @Param transformationRule body models.PagerdutyTransformationRule true 
"transformation rule"
+// @Success 200  {object} models.PagerdutyTransformationRule
+// @Failure 400  {object} shared.ApiBody "Bad Request"
+// @Failure 500  {object} shared.ApiBody "Internal Error"
+// @Router /plugins/pagerduty/:connectionId/transformation_rules/{id} [PATCH]
+func UpdateTransformationRule(input *plugin.ApiResourceInput) 
(*plugin.ApiResourceOutput, errors.Error) {
+       return trHelper.Update(input)
+}
+
+// GetTransformationRule return one transformation rule
+// @Summary return one transformation rule
+// @Description return one transformation rule
+// @Tags plugins/pagerduty
+// @Param id path int true "id"
+// @Success 200  {object} models.PagerdutyTransformationRule
+// @Failure 400  {object} shared.ApiBody "Bad Request"
+// @Failure 500  {object} shared.ApiBody "Internal Error"
+// @Router /plugins/pagerduty/:connectionId/transformation_rules/{id} [GET]
+func GetTransformationRule(input *plugin.ApiResourceInput) 
(*plugin.ApiResourceOutput, errors.Error) {
+       return trHelper.Get(input)
+}
+
+// GetTransformationRuleList return all transformation rules
+// @Summary return all transformation rules
+// @Description return all transformation rules
+// @Tags plugins/pagerduty
+// @Param pageSize query int false "page size, default 50"
+// @Param page query int false "page size, default 1"
+// @Success 200  {object} []models.PagerdutyTransformationRule
+// @Failure 400  {object} shared.ApiBody "Bad Request"
+// @Failure 500  {object} shared.ApiBody "Internal Error"
+// @Router /plugins/pagerduty/:connectionId/transformation_rules [GET]
+func GetTransformationRuleList(input *plugin.ApiResourceInput) 
(*plugin.ApiResourceOutput, errors.Error) {
+       return trHelper.List(input)
+}
diff --git a/backend/plugins/pagerduty/e2e/incident_test.go 
b/backend/plugins/pagerduty/e2e/incident_test.go
index 488aea64e..1a77bd268 100644
--- a/backend/plugins/pagerduty/e2e/incident_test.go
+++ b/backend/plugins/pagerduty/e2e/incident_test.go
@@ -18,24 +18,47 @@ limitations under the License.
 package e2e
 
 import (
+       "fmt"
        "github.com/apache/incubator-devlake/core/models/common"
        "github.com/apache/incubator-devlake/core/models/domainlayer/ticket"
        "github.com/apache/incubator-devlake/helpers/e2ehelper"
        "github.com/apache/incubator-devlake/plugins/pagerduty/impl"
        "github.com/apache/incubator-devlake/plugins/pagerduty/models"
        "github.com/apache/incubator-devlake/plugins/pagerduty/tasks"
+       "github.com/stretchr/testify/require"
        "testing"
 )
 
 func TestIncidentDataFlow(t *testing.T) {
        var plugin impl.PagerDuty
        dataflowTester := e2ehelper.NewDataFlowTester(t, "pagerduty", plugin)
-
+       rule := models.PagerdutyTransformationRule{
+               Name: "rule1",
+       }
+       options := tasks.PagerDutyOptions{
+               ConnectionId:                1,
+               ServiceId:                   "PIKL83L",
+               ServiceName:                 "DevService",
+               Tasks:                       nil,
+               PagerdutyTransformationRule: &rule,
+       }
        taskData := &tasks.PagerDutyTaskData{
-               Options: &tasks.PagerDutyOptions{
-                       ConnectionId: 1,
-               },
+               Options: &options,
+       }
+
+       dataflowTester.FlushTabler(&models.PagerdutyTransformationRule{})
+       dataflowTester.FlushTabler(&models.Service{})
+       // tx-rule
+       require.NoError(t, dataflowTester.Dal.CreateOrUpdate(&rule))
+       service := models.Service{
+               ConnectionId:         options.ConnectionId,
+               Url:                  
fmt.Sprintf("https://keon-test.pagerduty.com/service-directory/%s";, 
options.ServiceId),
+               Id:                   options.ServiceId,
+               TransformationRuleId: rule.ID,
+               Name:                 options.ServiceName,
        }
+       // scope
+       require.NoError(t, dataflowTester.Dal.CreateOrUpdate(&service))
 
        // import raw data table
        
dataflowTester.ImportCsvIntoRawTable("./raw_tables/_raw_pagerduty_incidents.csv",
 "_raw_pagerduty_incidents")
@@ -43,7 +66,6 @@ func TestIncidentDataFlow(t *testing.T) {
        // verify worklog extraction
        dataflowTester.FlushTabler(&models.Incident{})
        dataflowTester.FlushTabler(&models.User{})
-       dataflowTester.FlushTabler(&models.Service{})
        dataflowTester.FlushTabler(&models.Assignment{})
        dataflowTester.Subtask(tasks.ExtractIncidentsMeta, taskData)
        dataflowTester.VerifyTableWithOptions(
@@ -67,13 +89,6 @@ func TestIncidentDataFlow(t *testing.T) {
                        IgnoreTypes: []any{common.Model{}},
                },
        )
-       dataflowTester.VerifyTableWithOptions(
-               models.Service{},
-               e2ehelper.TableOptions{
-                       CSVRelPath:  
"./snapshot_tables/_tool_pagerduty_services.csv",
-                       IgnoreTypes: []any{common.Model{}},
-               },
-       )
        dataflowTester.FlushTabler(&ticket.Issue{})
        dataflowTester.Subtask(tasks.ConvertIncidentsMeta, taskData)
        dataflowTester.VerifyTableWithOptions(
diff --git 
a/backend/plugins/pagerduty/e2e/raw_tables/_raw_pagerduty_incidents.csv 
b/backend/plugins/pagerduty/e2e/raw_tables/_raw_pagerduty_incidents.csv
index 19989b120..8cda8d9c8 100644
--- a/backend/plugins/pagerduty/e2e/raw_tables/_raw_pagerduty_incidents.csv
+++ b/backend/plugins/pagerduty/e2e/raw_tables/_raw_pagerduty_incidents.csv
@@ -1,4 +1,4 @@
 id,params,data,url,input,created_at
-1,"{""ConnectionId"":1,""Stream"":""incidents""}","{""incident_number"": 4, 
""title"": ""Crash reported"", ""created_at"": ""2022-11-03T06:23:06.000000Z"", 
""status"": ""triggered"", ""incident_key"": 
""bb60942875634ee6a7fe94ddb51c3a09"", ""service"": {""id"": ""PIKL83L"", 
""type"": ""service_reference"", ""summary"": ""DevService"", ""self"": 
""https://api.pagerduty.com/services/PIKL83L"";, ""html_url"": 
""https://keon-test.pagerduty.com/service-directory/PIKL83L""}, 
""assignments"": [{" [...]
-2,"{""ConnectionId"":1,""Stream"":""incidents""}","{""incident_number"": 5, 
""title"": ""Slow startup"", ""created_at"": ""2022-11-03T06:44:28.000000Z"", 
""status"": ""acknowledged"", ""incident_key"": 
""d7bc6d39c37e4af8b206a12ff6b05793"", ""service"": {""id"": ""PIKL83L"", 
""type"": ""service_reference"", ""summary"": ""DevService"", ""self"": 
""https://api.pagerduty.com/services/PIKL83L"";, ""html_url"": 
""https://keon-test.pagerduty.com/service-directory/PIKL83L""}, 
""assignments"": [{ [...]
-3,"{""ConnectionId"":1,""Stream"":""incidents""}","{""incident_number"": 6, 
""title"": ""Spamming logs"", ""created_at"": ""2022-11-03T06:45:36.000000Z"", 
""status"": ""resolved"", ""incident_key"": 
""9f5acd07975e4c57bc717d8d9e066785"", ""service"": {""id"": ""PIKL83L"", 
""type"": ""service_reference"", ""summary"": ""DevService"", ""self"": 
""https://api.pagerduty.com/services/PIKL83L"";, ""html_url"": 
""https://keon-test.pagerduty.com/service-directory/PIKL83L""}, 
""assignments"": [], " [...]
+1,"{""ConnectionId"":1}","{""incident_number"": 4, ""title"": ""Crash 
reported"", ""created_at"": ""2022-11-03T06:23:06.000000Z"", ""status"": 
""triggered"", ""incident_key"": ""bb60942875634ee6a7fe94ddb51c3a09"", 
""service"": {""id"": ""PIKL83L"", ""type"": ""service_reference"", 
""summary"": ""DevService"", ""self"": 
""https://api.pagerduty.com/services/PIKL83L"";, ""html_url"": 
""https://keon-test.pagerduty.com/service-directory/PIKL83L""}, 
""assignments"": [{""at"": ""2022-11-03T06:23 [...]
+2,"{""ConnectionId"":1}","{""incident_number"": 5, ""title"": ""Slow 
startup"", ""created_at"": ""2022-11-03T06:44:28.000000Z"", ""status"": 
""acknowledged"", ""incident_key"": ""d7bc6d39c37e4af8b206a12ff6b05793"", 
""service"": {""id"": ""PIKL83L"", ""type"": ""service_reference"", 
""summary"": ""DevService"", ""self"": 
""https://api.pagerduty.com/services/PIKL83L"";, ""html_url"": 
""https://keon-test.pagerduty.com/service-directory/PIKL83L""}, 
""assignments"": [{""at"": ""2022-11-03T06:4 [...]
+3,"{""ConnectionId"":1}","{""incident_number"": 6, ""title"": ""Spamming 
logs"", ""created_at"": ""2022-11-03T06:45:36.000000Z"", ""status"": 
""resolved"", ""incident_key"": ""9f5acd07975e4c57bc717d8d9e066785"", 
""service"": {""id"": ""PIKL83L"", ""type"": ""service_reference"", 
""summary"": ""DevService"", ""self"": 
""https://api.pagerduty.com/services/PIKL83L"";, ""html_url"": 
""https://keon-test.pagerduty.com/service-directory/PIKL83L""}, 
""assignments"": [], ""last_status_change_at"": [...]
diff --git 
a/backend/plugins/pagerduty/e2e/raw_tables/_tool_tapd_bug_statuses.csv 
b/backend/plugins/pagerduty/e2e/raw_tables/_tool_tapd_bug_statuses.csv
deleted file mode 100644
index ca4ef63dd..000000000
--- a/backend/plugins/pagerduty/e2e/raw_tables/_tool_tapd_bug_statuses.csv
+++ /dev/null
@@ -1,8 +0,0 @@
-connection_id,workspace_id,english_name,chinese_name,is_last_step,_raw_data_params,_raw_data_table,_raw_data_id,_raw_data_remark
-1,991,closed,已关闭,0,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_bug_status,51,
-1,991,in_progress,接受/处理,0,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_bug_status,51,
-1,991,new,新,0,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_bug_status,51,
-1,991,rejected,已拒绝,1,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_bug_status,51,
-1,991,reopened,重新打开,0,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_bug_status,51,
-1,991,resolved,已解决,1,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_bug_status,51,
-1,991,verified,已验证,0,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_bug_status,51,
diff --git 
a/backend/plugins/pagerduty/e2e/raw_tables/_tool_tapd_story_statuses.csv 
b/backend/plugins/pagerduty/e2e/raw_tables/_tool_tapd_story_statuses.csv
deleted file mode 100644
index 7038bf0aa..000000000
--- a/backend/plugins/pagerduty/e2e/raw_tables/_tool_tapd_story_statuses.csv
+++ /dev/null
@@ -1,22 +0,0 @@
-connection_id,workspace_id,english_name,chinese_name,is_last_step,_raw_data_params,_raw_data_table,_raw_data_id,_raw_data_remark
-1,991,developing,开发中,0,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_story_status,11,
-1,991,planning,test111test111test111,0,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_story_status,11,
-1,991,rejected,已拒绝,1,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_story_status,11,
-1,991,resolved,已解决,1,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_story_status,11,
-1,991,status_10,test111test111,0,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_story_status,11,
-1,991,status_11,test111test111test111test111test111,0,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_story_status,11,
-1,991,status_12,test111test111test111test111test111,0,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_story_status,11,
-1,991,status_13,test111test111test111test111test111,0,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_story_status,11,
-1,991,status_14,test111test111test111test111test111test111test111,0,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_story_status,11,
-1,991,status_15,test111test111test111test111test111,0,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_story_status,11,
-1,991,status_16,test111test111test111,0,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_story_status,11,
-1,991,status_17,test111test111test111,0,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_story_status,11,
-1,991,status_18,test111test111test111,0,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_story_status,11,
-1,991,status_2,test111test111,0,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_story_status,11,
-1,991,status_3,test111test111test111,0,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_story_status,11,
-1,991,status_4,test111test111,0,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_story_status,11,
-1,991,status_5,test111test111test111,0,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_story_status,11,
-1,991,status_6,test111test111test111test111test111,0,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_story_status,11,
-1,991,status_7,test111test111test111,0,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_story_status,11,
-1,991,status_8,test111test111test111,0,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_story_status,11,
-1,991,status_9,test111test111test111,0,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_story_status,11,
diff --git 
a/backend/plugins/pagerduty/e2e/raw_tables/_tool_tapd_workitem_types.csv 
b/backend/plugins/pagerduty/e2e/raw_tables/_tool_tapd_workitem_types.csv
deleted file mode 100644
index 148f25902..000000000
--- a/backend/plugins/pagerduty/e2e/raw_tables/_tool_tapd_workitem_types.csv
+++ /dev/null
@@ -1,5 +0,0 @@
-connection_id,id,workspace_id,entity_type,name,english_name,status,color,workflow_id,icon,icon_small,creator,created,modified_by,modified,icon_viper,icon_small_viper,_raw_data_params,_raw_data_table,_raw_data_id,_raw_data_remark
-1,11991001000269,991,story,需求,story,3,#3582fb,1146949574001000033,,,TAPD 
system,2022-07-21T03:34:55.000+00:00,,2022-07-21T03:34:55.000+00:00,https://www.tapd.cn//img/workitem_type/default_icon/@2/story.png,https://www.tapd.cn//img/workitem_type/default_icon/@2/story_small.png,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_workitem_types,4,
-1,1146949574001000034,991,story,Techstory,Tech,3,#3582fb,1146949574001000035,46949574/icon/1146949574001000041,46949574/icon/1146949574001000042,郝琳,2022-07-21T06:13:43.000+00:00,郝琳,2022-07-21T06:19:15.000+00:00,https://viper.tapd.cn/icon/files/46949574/icon/1146949574001000041.png?token=718c3cd9ce755352ca168bf6875a680762fdfefb13942689afd41da6b4802f1e&version=1658384355&company_id=55850509,https://viper.tapd.cn/icon/files/46949574/icon/1146949574001000042.png?token=b1c5a360123fadbd6ea4bc2
 [...]
-1,1146949574001000035,991,story,长篇故事,LStory,3,#8ebebe,1146949574001000033,46949574/icon/1146949574001000043,46949574/icon/1146949574001000044,郝琳,2022-07-22T06:17:56.000+00:00,郝琳,2022-07-22T06:18:12.000+00:00,https://viper.tapd.cn/icon/files/46949574/icon/1146949574001000043.png?token=9f17a906980a3ea89116d6e75003e6127bf10e6e9a758cffb1e9e8c2a5bb27fd&version=1658470692&company_id=55850509,https://viper.tapd.cn/icon/files/46949574/icon/1146949574001000044.png?token=d0d7885f2029a61149b323450e
 [...]
-1,1146949574001000036,991,story,技术债,TDebt,3,#f85e5e,1146949574001000033,46949574/icon/1146949574001000045,46949574/icon/1146949574001000046,郝琳,2022-07-22T06:19:34.000+00:00,郝琳,2022-07-22T06:20:14.000+00:00,https://viper.tapd.cn/icon/files/46949574/icon/1146949574001000045.png?token=ecc54ec9e76ac4862b0785e8978a9969939aa17add156d447bfba80976e7acd5&version=1658470814&company_id=55850509,https://viper.tapd.cn/icon/files/46949574/icon/1146949574001000046.png?token=54ae24aa2cebb737006a9a0a1b76
 [...]
diff --git 
a/backend/plugins/pagerduty/e2e/snapshot_tables/_tool_pagerduty_assignments.csv 
b/backend/plugins/pagerduty/e2e/snapshot_tables/_tool_pagerduty_assignments.csv
index 6acfc18e5..a1b1bc317 100644
--- 
a/backend/plugins/pagerduty/e2e/snapshot_tables/_tool_pagerduty_assignments.csv
+++ 
b/backend/plugins/pagerduty/e2e/snapshot_tables/_tool_pagerduty_assignments.csv
@@ -1,4 +1,4 @@
 
incident_number,user_id,created_at,updated_at,_raw_data_params,_raw_data_table,_raw_data_id,_raw_data_remark,connection_id,assigned_at
-4,P25K520,2022-11-03T07:11:37.415+00:00,2022-11-03T07:11:37.415+00:00,"{""ConnectionId"":1,""Stream"":""incidents""}",_raw_pagerduty_incidents,1,,1,2022-11-03T07:02:36.000+00:00
-4,PQYACO3,2022-11-03T07:11:37.415+00:00,2022-11-03T07:11:37.415+00:00,"{""ConnectionId"":1,""Stream"":""incidents""}",_raw_pagerduty_incidents,1,,1,2022-11-03T06:23:06.000+00:00
-5,PQYACO3,2022-11-03T07:11:37.415+00:00,2022-11-03T07:11:37.415+00:00,"{""ConnectionId"":1,""Stream"":""incidents""}",_raw_pagerduty_incidents,2,,1,2022-11-03T06:44:37.000+00:00
+4,P25K520,2022-11-03T07:11:37.415+00:00,2022-11-03T07:11:37.415+00:00,"{""ConnectionId"":1}",_raw_pagerduty_incidents,1,,1,2022-11-03T07:02:36.000+00:00
+4,PQYACO3,2022-11-03T07:11:37.415+00:00,2022-11-03T07:11:37.415+00:00,"{""ConnectionId"":1}",_raw_pagerduty_incidents,1,,1,2022-11-03T06:23:06.000+00:00
+5,PQYACO3,2022-11-03T07:11:37.415+00:00,2022-11-03T07:11:37.415+00:00,"{""ConnectionId"":1}",_raw_pagerduty_incidents,2,,1,2022-11-03T06:44:37.000+00:00
diff --git 
a/backend/plugins/pagerduty/e2e/snapshot_tables/_tool_pagerduty_incidents.csv 
b/backend/plugins/pagerduty/e2e/snapshot_tables/_tool_pagerduty_incidents.csv
index 920ff1fe5..de1a59010 100644
--- 
a/backend/plugins/pagerduty/e2e/snapshot_tables/_tool_pagerduty_incidents.csv
+++ 
b/backend/plugins/pagerduty/e2e/snapshot_tables/_tool_pagerduty_incidents.csv
@@ -1,4 +1,4 @@
 
connection_id,number,created_at,updated_at,_raw_data_params,_raw_data_table,_raw_data_id,_raw_data_remark,url,service_id,summary,status,urgency,created_date,updated_date
-1,4,2022-11-03T07:11:37.422+00:00,2022-11-03T07:11:37.422+00:00,"{""ConnectionId"":1,""Stream"":""incidents""}",_raw_pagerduty_incidents,1,,https://keon-test.pagerduty.com/incidents/Q3YON8WNWTZMRQ,PIKL83L,[#4]
 Crash 
reported,triggered,high,2022-11-03T06:23:06.000+00:00,2022-11-03T07:02:36.000+00:00
-1,5,2022-11-03T07:11:37.422+00:00,2022-11-03T07:11:37.422+00:00,"{""ConnectionId"":1,""Stream"":""incidents""}",_raw_pagerduty_incidents,2,,https://keon-test.pagerduty.com/incidents/Q3CZAU7Q4008QD,PIKL83L,[#5]
 Slow 
startup,acknowledged,high,2022-11-03T06:44:28.000+00:00,2022-11-03T06:44:37.000+00:00
-1,6,2022-11-03T07:11:37.422+00:00,2022-11-03T07:11:37.422+00:00,"{""ConnectionId"":1,""Stream"":""incidents""}",_raw_pagerduty_incidents,3,,https://keon-test.pagerduty.com/incidents/Q1OHFWFP3GPXOG,PIKL83L,[#6]
 Spamming 
logs,resolved,low,2022-11-03T06:45:36.000+00:00,2022-11-03T06:51:44.000+00:00
+1,4,2022-11-03T07:11:37.422+00:00,2022-11-03T07:11:37.422+00:00,"{""ConnectionId"":1}",_raw_pagerduty_incidents,1,,https://keon-test.pagerduty.com/incidents/Q3YON8WNWTZMRQ,PIKL83L,[#4]
 Crash 
reported,triggered,high,2022-11-03T06:23:06.000+00:00,2022-11-03T07:02:36.000+00:00
+1,5,2022-11-03T07:11:37.422+00:00,2022-11-03T07:11:37.422+00:00,"{""ConnectionId"":1}",_raw_pagerduty_incidents,2,,https://keon-test.pagerduty.com/incidents/Q3CZAU7Q4008QD,PIKL83L,[#5]
 Slow 
startup,acknowledged,high,2022-11-03T06:44:28.000+00:00,2022-11-03T06:44:37.000+00:00
+1,6,2022-11-03T07:11:37.422+00:00,2022-11-03T07:11:37.422+00:00,"{""ConnectionId"":1}",_raw_pagerduty_incidents,3,,https://keon-test.pagerduty.com/incidents/Q1OHFWFP3GPXOG,PIKL83L,[#6]
 Spamming 
logs,resolved,low,2022-11-03T06:45:36.000+00:00,2022-11-03T06:51:44.000+00:00
diff --git 
a/backend/plugins/pagerduty/e2e/snapshot_tables/_tool_pagerduty_services.csv 
b/backend/plugins/pagerduty/e2e/snapshot_tables/_tool_pagerduty_services.csv
deleted file mode 100644
index 8f5225257..000000000
--- a/backend/plugins/pagerduty/e2e/snapshot_tables/_tool_pagerduty_services.csv
+++ /dev/null
@@ -1,2 +0,0 @@
-connection_id,id,created_at,updated_at,_raw_data_params,_raw_data_table,_raw_data_id,_raw_data_remark,url,name
-1,PIKL83L,2022-11-03T07:11:37.411+00:00,2022-11-03T07:11:37.411+00:00,"{""ConnectionId"":1,""Stream"":""incidents""}",_raw_pagerduty_incidents,3,,https://keon-test.pagerduty.com/service-directory/PIKL83L,DevService
diff --git 
a/backend/plugins/pagerduty/e2e/snapshot_tables/_tool_pagerduty_users.csv 
b/backend/plugins/pagerduty/e2e/snapshot_tables/_tool_pagerduty_users.csv
index 25ad6a710..a47f8b668 100644
--- a/backend/plugins/pagerduty/e2e/snapshot_tables/_tool_pagerduty_users.csv
+++ b/backend/plugins/pagerduty/e2e/snapshot_tables/_tool_pagerduty_users.csv
@@ -1,3 +1,3 @@
 
connection_id,id,created_at,updated_at,_raw_data_params,_raw_data_table,_raw_data_id,_raw_data_remark,url,name
-1,P25K520,2022-11-03T07:11:37.418+00:00,2022-11-03T07:11:37.418+00:00,"{""ConnectionId"":1,""Stream"":""incidents""}",_raw_pagerduty_incidents,1,,https://keon-test.pagerduty.com/users/P25K520,Kian
 Amini
-1,PQYACO3,2022-11-03T07:11:37.418+00:00,2022-11-03T07:11:37.418+00:00,"{""ConnectionId"":1,""Stream"":""incidents""}",_raw_pagerduty_incidents,2,,https://keon-test.pagerduty.com/users/PQYACO3,Keon
 Amini
+1,P25K520,2022-11-03T07:11:37.418+00:00,2022-11-03T07:11:37.418+00:00,"{""ConnectionId"":1}",_raw_pagerduty_incidents,1,,https://keon-test.pagerduty.com/users/P25K520,Kian
 Amini
+1,PQYACO3,2022-11-03T07:11:37.418+00:00,2022-11-03T07:11:37.418+00:00,"{""ConnectionId"":1}",_raw_pagerduty_incidents,2,,https://keon-test.pagerduty.com/users/PQYACO3,Keon
 Amini
diff --git a/backend/plugins/pagerduty/e2e/snapshot_tables/issue_bug.csv 
b/backend/plugins/pagerduty/e2e/snapshot_tables/issue_bug.csv
deleted file mode 100644
index aa1dae8e8..000000000
--- a/backend/plugins/pagerduty/e2e/snapshot_tables/issue_bug.csv
+++ /dev/null
@@ -1,21 +0,0 @@
-id,url,issue_key,title,description,epic_key,type,status,original_status,story_point,resolution_date,created_date,updated_date,lead_time_minutes,parent_issue_id,priority,original_estimate_minutes,time_spent_minutes,time_remaining_minutes,creator_id,assignee_id,assignee_name,severity,component,icon_url,creator_name,_raw_data_params,_raw_data_table,_raw_data_id,_raw_data_remark
-tapd:TapdBug:1:11991001001025,https://www.tapd.cn/991/prong/stories/view/11991001001025,11991001001025,test-11test-11test-11test-11test-11est-11test-11test-11,,,缺陷,IN_PROGRESS,已关闭,0,2020-02-14T07:14:30.000+00:00,2020-02-13T06:04:29.000+00:00,2020-02-14T08:50:03.000+00:00,1510,tapd:TapdBug:1:0,medium,0,0,0,tapd:TapdAccount:1:test-11test-11.,,,normal,,,test-11test-11.,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_bugs,1271,
-tapd:TapdBug:1:11991001001030,https://www.tapd.cn/991/prong/stories/view/11991001001030,11991001001030,test-11test-11test-11t11test-11,,,缺陷,IN_PROGRESS,,0,2020-02-14T07:12:58.000+00:00,2020-02-13T06:33:47.000+00:00,2020-02-17T03:57:42.000+00:00,1479,tapd:TapdBug:1:0,medium,0,0,0,tapd:TapdAccount:1:test-11test-11.,,,normal,,,test-11test-11.,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_bugs,1272,
-tapd:TapdBug:1:11991001001031,https://www.tapd.cn/991/prong/stories/view/11991001001031,11991001001031,test-11test-11test-11test-11test-11test-11test-11test-11test-11,,,缺陷,IN_PROGRESS,,0,2020-02-14T08:30:23.000+00:00,2020-02-13T06:45:11.000+00:00,2020-02-17T03:56:58.000+00:00,1545,tapd:TapdBug:1:0,medium,0,0,0,tapd:TapdAccount:1:test-11test-11.,,,normal,,,test-11test-11.,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_bugs,1273,
-tapd:TapdBug:1:11991001001032,https://www.tapd.cn/991/prong/stories/view/11991001001032,11991001001032,test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11,,,缺陷,IN_PROGRESS,已关闭,0,2020-02-13T09:33:17.000+00:00,2020-02-13T06:50:33.000+00:00,2020-02-17T03:56:36.000+00:00,162,tapd:TapdBug:1:0,medium,0,0,0,tapd:TapdAccount:1:test-11test-11.,,,normal,,,test-11test-11.,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":
 [...]
-tapd:TapdBug:1:11991001001417,https://www.tapd.cn/991/prong/stories/view/11991001001417,11991001001417,test-11test-11-11test-11tesst-11test-11test-11test-11,,,缺陷,DONE,已拒绝,0,,2020-02-25T08:01:11.000+00:00,2020-03-04T08:12:14.000+00:00,0,tapd:TapdBug:1:0,medium,0,0,0,tapd:TapdAccount:1:test-11test-11test-11,tapd:TapdAccount:1:test-11test-11test-11,test-11test-11test-11,normal,,,test-11test-11test-11,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_bugs,1275,
-tapd:TapdBug:1:11991001001418,https://www.tapd.cn/991/prong/stories/view/11991001001418,11991001001418,test-11tesest-11test-11,,,缺陷,DONE,已拒绝,0,,2020-02-25T08:04:12.000+00:00,2020-03-04T08:27:29.000+00:00,0,tapd:TapdBug:1:0,medium,0,0,0,tapd:TapdAccount:1:test-11test-11test-11,tapd:TapdAccount:1:test-11test-11test-11,test-11test-11test-11,normal,,,test-11test-11test-11,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_bugs,1276,
-tapd:TapdBug:1:11991001001420,https://www.tapd.cn/991/prong/stories/view/11991001001420,11991001001420,test-11test-11test-11test-11test-11,,,缺陷,IN_PROGRESS,已关闭,0,2020-03-04T07:41:18.000+00:00,2020-02-25T08:07:04.000+00:00,2020-03-04T08:20:12.000+00:00,11494,tapd:TapdBug:1:0,medium,0,0,0,tapd:TapdAccount:1:test-11test-11test-11,,,normal,,,test-11test-11test-11,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_bugs,1277,
-tapd:TapdBug:1:11991001001422,https://www.tapd.cn/991/prong/stories/view/11991001001422,11991001001422,test-11te11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11,,,缺陷,DONE,已拒绝,0,,2020-02-25T08:10:51.000+00:00,2020-03-04T08:12:17.000+00:00,0,tapd:TapdBug:1:0,medium,0,0,0,tapd:TapdAccount:1:test-11test-11test-11,tapd:TapdAccount:1:test-11test-11test-11,test-11test-11test-11,normal,,,test-11test-11test-11,"{""ConnectionId"":1,""CompanyId"":
 [...]
-tapd:TapdBug:1:11991001001423,https://www.tapd.cn/991/prong/stories/view/11991001001423,11991001001423,test--11test-11test-11test-11test-11test-11,,,缺陷,DONE,已拒绝,0,,2020-02-25T08:12:56.000+00:00,2020-03-04T08:25:19.000+00:00,0,tapd:TapdBug:1:0,medium,0,0,0,tapd:TapdAccount:1:test-11test-11test-11,tapd:TapdAccount:1:test-11test-11test-11,test-11test-11test-11,normal,,,test-11test-11test-11,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_bugs,1279,
-tapd:TapdBug:1:11991001001425,https://www.tapd.cn/991/prong/stories/view/11991001001425,11991001001425,test-11test1test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11,,,缺陷,IN_PROGRESS,已关闭,0,2020-03-04T07:41:01.000+00:00,2020-02-25T08:18:40.000+00:00,2020-03-04T08:19:48.000+00:00,11482,tapd:TapdBug:1:0,medium,0,0,0,tapd:TapdAccount:1:test-11test-11test-11,,,normal,,,test-11test-11test-11,"{""Co
 [...]
-tapd:TapdBug:1:11991001001426,https://www.tapd.cn/991/prong/stories/view/11991001001426,11991001001426,test-11test-11te11test-11test-11test-11test-11test-11,,,缺陷,DONE,已拒绝,0,,2020-02-25T08:23:39.000+00:00,2020-03-04T08:12:19.000+00:00,0,tapd:TapdBug:1:0,medium,0,0,0,tapd:TapdAccount:1:test-11test-11test-11,tapd:TapdAccount:1:test-11test-11test-11,test-11test-11test-11,normal,,,test-11test-11test-11,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_bugs,1281,
-tapd:TapdBug:1:11991001001447,https://www.tapd.cn/991/prong/stories/view/11991001001447,11991001001447,IE11
 
test-11te1test-11test-11test-11,,,缺陷,IN_PROGRESS,已关闭,0,2020-03-17T08:43:28.000+00:00,2020-02-27T03:06:59.000+00:00,2020-03-18T08:24:37.000+00:00,27696,tapd:TapdBug:1:0,medium,0,0,0,tapd:TapdAccount:1:test-11test-11test-11,,,normal,,,test-11test-11test-11,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_bugs,1282,
-tapd:TapdBug:1:11991001001448,https://www.tapd.cn/991/prong/stories/view/11991001001448,11991001001448,IE11
 
test-11test-11test-11test-11test-11test-11,,,缺陷,DONE,已拒绝,0,,2020-02-27T03:14:02.000+00:00,2020-04-27T02:53:11.000+00:00,0,tapd:TapdBug:1:0,medium,0,0,0,tapd:TapdAccount:1:test-11test-11test-11,tapd:TapdAccount:1:test-11test-11test-11,test-11test-11test-11,normal,,,test-11test-11test-11,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_bugs,1283,
-tapd:TapdBug:1:11991001001450,https://www.tapd.cn/991/prong/stories/view/11991001001450,11991001001450,IE11
 
tes-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11,,,缺陷,IN_PROGRESS,已关闭,0,2020-03-17T08:43:04.000+00:00,2020-02-27T03:18:04.000+00:00,2020-03-18T08:24:15.000+00:00,27685,tapd:TapdBug:1:0,medium,0,0,0,tapd:TapdAccount:1:test-11test-11test-11,,,normal,,,test-11test-11test-11,"{""Con
 [...]
-tapd:TapdBug:1:11991001001451,https://www.tapd.cn/991/prong/stories/view/11991001001451,11991001001451,IE11
 
test-11test-11test-st-11test-11,,,缺陷,IN_PROGRESS,已关闭,0,2020-03-17T08:42:17.000+00:00,2020-02-27T03:36:29.000+00:00,2020-03-18T08:23:08.000+00:00,27665,tapd:TapdBug:1:0,medium,0,0,0,tapd:TapdAccount:1:test-11test-11test-11,,,normal,,,test-11test-11test-11,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_bugs,1285,
-tapd:TapdBug:1:11991001001710,https://www.tapd.cn/991/prong/stories/view/11991001001710,11991001001710,test-11-11test-11test-11test-11test-11test-11test-11test-11test-11,,,缺陷,DONE,已拒绝,0,,2020-03-13T06:14:26.000+00:00,2020-04-01T09:38:36.000+00:00,0,tapd:TapdBug:1:0,medium,0,0,0,tapd:TapdAccount:1:test-11test-11test-11,tapd:TapdAccount:1:test-11test-11test-11,test-11test-11test-11,normal,,,test-11test-11test-11,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_bugs,1286,
-tapd:TapdBug:1:11991001001711,https://www.tapd.cn/991/prong/stories/view/11991001001711,11991001001711,test-11tesst-11test-11test-11test-11test-11test-11test-11test-11...test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11test-11...test-11,,,缺陷,DONE,已拒绝,0,,2020-03-13T06:20:04.000+00:00,2020-04-01T09:37:52.000+00:00,0,tapd:TapdBug:1:0,medium,0,0,0,tapd:TapdAccount:1:test-11test-11test-11,tapd:TapdAccount:1:test-11test-11test-11,t
 [...]
-tapd:TapdBug:1:11991001001737,https://www.tapd.cn/991/prong/stories/view/11991001001737,11991001001737,test-11tesst-11test-1test-11test-11test-11test-11test-11test-11test-11,,,缺陷,IN_PROGRESS,已关闭,0,2020-03-26T08:31:05.000+00:00,2020-03-18T07:27:14.000+00:00,2020-03-26T09:02:54.000+00:00,11583,tapd:TapdBug:1:0,medium,0,0,0,tapd:TapdAccount:1:test-11test-11test-11,,,normal,,,test-11test-11test-11,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_bugs,1288,
-tapd:TapdBug:1:11991001001739,https://www.tapd.cn/991/prong/stories/view/11991001001739,11991001001739,tt6d3btt52a8tt62a5tt5730tt65b986tt7a7att683c,,,缺陷,IN_PROGRESS,已关闭,0,2020-03-26T08:25:10.000+00:00,2020-03-18T07:32:55.000+00:00,2020-03-26T09:02:37.000+00:00,11572,tapd:TapdBug:1:0,medium,0,0,0,tapd:TapdAccount:1:tt90ddtt9a81tt5bb5,,,normal,,,tt90ddtt9a81tt5bb5,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_bugs,1289,
-tapd:TapdBug:1:11991001001740,https://www.tapd.cn/991/prong/stories/view/11991001001740,11991001001740,tt6d3btt52a8tt6ctt5230tt4e13tt5bb6tt4ecbtt7ecdtt9875,,,缺陷,DONE,已拒绝,0,,2020-03-18T07:37:43.000+00:00,2020-03-25T02:38:34.000+00:00,0,tapd:TapdBug:1:0,medium,0,0,0,tapd:TapdAccount:1:郝骁宵,tapd:TapdAccount:1:郝骁宵,郝骁宵,normal,,,郝骁宵,"{""ConnectionId"":1,""CompanyId"":0,""WorkspaceId"":991}",_raw_tapd_api_bugs,1290,
diff --git a/backend/plugins/pagerduty/impl/impl.go 
b/backend/plugins/pagerduty/impl/impl.go
index 3606a16ea..e0558591b 100644
--- a/backend/plugins/pagerduty/impl/impl.go
+++ b/backend/plugins/pagerduty/impl/impl.go
@@ -23,7 +23,6 @@ import (
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/plugin"
        helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
-       "github.com/apache/incubator-devlake/helpers/pluginhelper/tap"
        "github.com/apache/incubator-devlake/plugins/pagerduty/api"
        "github.com/apache/incubator-devlake/plugins/pagerduty/models"
        
"github.com/apache/incubator-devlake/plugins/pagerduty/models/migrationscripts"
@@ -72,26 +71,28 @@ func (p PagerDuty) PrepareTaskData(taskCtx 
plugin.TaskContext, options map[strin
        if err != nil {
                return nil, errors.Default.Wrap(err, "unable to get Pagerduty 
connection by the given connection ID")
        }
-       startDate, err := parseTime("start_date", options)
+       var timeAfter *time.Time
+       if op.TimeAfter != "" {
+               convertedTime, err := errors.Convert01(time.Parse(time.RFC3339, 
op.TimeAfter))
+               if err != nil {
+                       return nil, errors.BadInput.Wrap(err, 
fmt.Sprintf("invalid value for `timeAfter`: %s", timeAfter))
+               }
+               timeAfter = &convertedTime
+       }
+       client, err := helper.NewApiClient(taskCtx.GetContext(), 
connection.Endpoint, map[string]string{
+               "Authorization": fmt.Sprintf("Token %s", connection.Token),
+       }, 0, connection.Proxy, taskCtx)
        if err != nil {
                return nil, err
        }
-       config := &models.PagerDutyConfig{
-               Token:     connection.Token,
-               Email:     "", // ignore, works without it too
-               StartDate: startDate,
-       }
-       tapClient, err := tap.NewSingerTap(&tap.SingerTapConfig{
-               TapExecutable:        models.TapExecutable,
-               StreamPropertiesFile: models.StreamPropertiesFile,
-       })
+       asyncClient, err := helper.CreateAsyncApiClient(taskCtx, client, nil)
        if err != nil {
                return nil, err
        }
        return &tasks.PagerDutyTaskData{
-               Options: op,
-               Config:  config,
-               Client:  tapClient,
+               Options:   op,
+               TimeAfter: timeAfter,
+               Client:    asyncClient,
        }, nil
 }
 
@@ -118,6 +119,28 @@ func (p PagerDuty) ApiResources() 
map[string]map[string]plugin.ApiResourceHandle
                        "PATCH":  api.PatchConnection,
                        "DELETE": api.DeleteConnection,
                },
+               "connections/:connectionId/scopes/:projectId": {
+                       "GET":   api.GetScope,
+                       "PATCH": api.UpdateScope,
+               },
+               "connections/:connectionId/remote-scopes": {
+                       "GET": api.RemoteScopes,
+               },
+               "connections/:connectionId/search-remote-scopes": {
+                       "GET": api.SearchRemoteScopes,
+               },
+               "connections/:connectionId/scopes": {
+                       "GET": api.GetScopeList,
+                       "PUT": api.PutScope,
+               },
+               "/:connectionId/transformation_rules": {
+                       "POST": api.CreateTransformationRule,
+                       "GET":  api.GetTransformationRuleList,
+               },
+               "/:connectionId/transformation_rules/:id": {
+                       "PATCH": api.UpdateTransformationRule,
+                       "GET":   api.GetTransformationRule,
+               },
        }
 }
 
@@ -125,6 +148,11 @@ func (p PagerDuty) MakePipelinePlan(connectionId uint64, 
scope []*plugin.Bluepri
        return api.MakePipelinePlan(p.SubTaskMetas(), connectionId, scope)
 }
 
+func (p PagerDuty) MakeDataSourcePipelinePlanV200(connectionId uint64, scopes 
[]*plugin.BlueprintScopeV200, syncPolicy plugin.BlueprintSyncPolicy,
+) (plugin.PipelinePlan, []plugin.Scope, errors.Error) {
+       return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(), 
connectionId, scopes, &syncPolicy)
+}
+
 func (p PagerDuty) Close(taskCtx plugin.TaskContext) errors.Error {
        _, ok := taskCtx.GetData().(*tasks.PagerDutyTaskData)
        if !ok {
@@ -132,16 +160,3 @@ func (p PagerDuty) Close(taskCtx plugin.TaskContext) 
errors.Error {
        }
        return nil
 }
-
-func parseTime(key string, opts map[string]any) (time.Time, errors.Error) {
-       var date time.Time
-       dateRaw, ok := opts[key]
-       if !ok {
-               return date, errors.BadInput.New("time input not provided")
-       }
-       date, err := time.Parse("2006-01-02T15:04:05Z", dateRaw.(string))
-       if err != nil {
-               return date, errors.BadInput.Wrap(err, "bad type input 
provided")
-       }
-       return date, nil
-}
diff --git a/backend/plugins/pagerduty/models/config.go 
b/backend/plugins/pagerduty/models/config.go
index c1818dc92..c2716c533 100644
--- a/backend/plugins/pagerduty/models/config.go
+++ b/backend/plugins/pagerduty/models/config.go
@@ -17,18 +17,4 @@ limitations under the License.
 
 package models
 
-import (
-       "time"
-)
-
-// PagerDutyConfig model corresponds to docs here 
https://github.com/singer-io/tap-pagerduty
-type PagerDutyConfig struct {
-       Token     string    `json:"token"`
-       Email     string    `json:"email"` // Seems to be an inconsequential 
field
-       StartDate time.Time `json:"start_date"`
-}
-
-type PagerDutyParams struct {
-       ConnectionId uint64
-       Stream       string
-}
+//TODO rework
diff --git 
a/backend/plugins/pagerduty/models/migrationscripts/20230123_add_connection_fields.go
 
b/backend/plugins/pagerduty/models/migrationscripts/20230123_add_connection_fields.go
new file mode 100644
index 000000000..bd80939a9
--- /dev/null
+++ 
b/backend/plugins/pagerduty/models/migrationscripts/20230123_add_connection_fields.go
@@ -0,0 +1,51 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package migrationscripts
+
+import (
+       "github.com/apache/incubator-devlake/core/context"
+       "github.com/apache/incubator-devlake/core/errors"
+)
+
+type pagerdutyConnection20230123 struct {
+       Endpoint         string `mapstructure:"endpoint" 
env:"PAGERDUTY_ENDPOINT" validate:"required"`
+       Proxy            string `mapstructure:"proxy" env:"PAGERDUTY_PROXY"`
+       RateLimitPerHour int    `comment:"api request rate limit per hour"`
+}
+
+func (pagerdutyConnection20230123) TableName() string {
+       return "_tool_pagerduty_connections"
+}
+
+type addPagerdutyConnectionFields20230123 struct{}
+
+func (script *addPagerdutyConnectionFields20230123) Name() string {
+       return "add connection config fields"
+}
+
+func (script *addPagerdutyConnectionFields20230123) Up(basicRes 
context.BasicRes) errors.Error {
+       err := basicRes.GetDal().AutoMigrate(&pagerdutyConnection20230123{})
+       if err != nil {
+               return err
+       }
+       return nil
+}
+
+func (*addPagerdutyConnectionFields20230123) Version() uint64 {
+       return 20230123000000
+}
diff --git 
a/backend/plugins/pagerduty/models/migrationscripts/20230203_add_transformation_rules.go
 
b/backend/plugins/pagerduty/models/migrationscripts/20230203_add_transformation_rules.go
new file mode 100644
index 000000000..c7e835545
--- /dev/null
+++ 
b/backend/plugins/pagerduty/models/migrationscripts/20230203_add_transformation_rules.go
@@ -0,0 +1,50 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package migrationscripts
+
+import (
+       "github.com/apache/incubator-devlake/core/context"
+       "github.com/apache/incubator-devlake/core/errors"
+       "github.com/apache/incubator-devlake/helpers/migrationhelper"
+       
"github.com/apache/incubator-devlake/plugins/pagerduty/models/migrationscripts/archived"
+)
+
+type service20230303 struct {
+       TransformationRuleId uint64
+}
+
+func (*service20230303) TableName() string {
+       return archived.Service{}.TableName()
+}
+
+type addTransformationRulesToService20230303 struct{}
+
+func (*addTransformationRulesToService20230303) Up(basicRes context.BasicRes) 
errors.Error {
+       return migrationhelper.AutoMigrateTables(basicRes,
+               &archived.TransformationRules{},
+               &service20230303{},
+       )
+}
+
+func (*addTransformationRulesToService20230303) Version() uint64 {
+       return 20230303001814
+}
+
+func (*addTransformationRulesToService20230303) Name() string {
+       return "add transformation rules to pagerduty"
+}
diff --git a/backend/plugins/pagerduty/models/service.go 
b/backend/plugins/pagerduty/models/migrationscripts/archived/transformation_rules.go
similarity index 70%
copy from backend/plugins/pagerduty/models/service.go
copy to 
backend/plugins/pagerduty/models/migrationscripts/archived/transformation_rules.go
index 759170ba5..2da5581fb 100644
--- a/backend/plugins/pagerduty/models/service.go
+++ 
b/backend/plugins/pagerduty/models/migrationscripts/archived/transformation_rules.go
@@ -15,20 +15,16 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 */
 
-package models
+package archived
 
-import (
-       "github.com/apache/incubator-devlake/core/models/common"
-)
+import "github.com/apache/incubator-devlake/core/models/common"
 
-type Service struct {
-       common.NoPKModel
-       ConnectionId uint64 `gorm:"primaryKey"`
-       Url          string
-       Id           string `gorm:"primaryKey"`
-       Name         string
+type TransformationRules struct {
+       common.Model
+       Name         string 
`gorm:"type:varchar(255);index:idx_name_github,unique"`
+       ConnectionId uint64
 }
 
-func (Service) TableName() string {
-       return "_tool_pagerduty_services"
+func (*TransformationRules) TableName() string {
+       return "_tool_pagerduty_transformation_rules"
 }
diff --git a/backend/plugins/pagerduty/models/migrationscripts/register.go 
b/backend/plugins/pagerduty/models/migrationscripts/register.go
index 1a0e72d14..75aa89265 100644
--- a/backend/plugins/pagerduty/models/migrationscripts/register.go
+++ b/backend/plugins/pagerduty/models/migrationscripts/register.go
@@ -26,5 +26,7 @@ func All() []plugin.MigrationScript {
        return []plugin.MigrationScript{
                new(addInitTables),
                new(addEndpointAndProxyToConnection),
+               new(addPagerdutyConnectionFields20230123),
+               new(addTransformationRulesToService20230303),
        }
 }
diff --git a/backend/plugins/pagerduty/models/generated/incidents.go 
b/backend/plugins/pagerduty/models/raw/incidents.go
similarity index 99%
rename from backend/plugins/pagerduty/models/generated/incidents.go
rename to backend/plugins/pagerduty/models/raw/incidents.go
index 04e0941d8..464ab1072 100644
--- a/backend/plugins/pagerduty/models/generated/incidents.go
+++ b/backend/plugins/pagerduty/models/raw/incidents.go
@@ -14,9 +14,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 
or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 */
-// Code generated by github.com/atombender/go-jsonschema, DO NOT EDIT.
 
-package generated
+package raw
 
 import "time"
 
diff --git a/backend/plugins/pagerduty/models/generated/notifications.go 
b/backend/plugins/pagerduty/models/raw/notifications.go
similarity index 95%
rename from backend/plugins/pagerduty/models/generated/notifications.go
rename to backend/plugins/pagerduty/models/raw/notifications.go
index 721ea5479..f361e636b 100644
--- a/backend/plugins/pagerduty/models/generated/notifications.go
+++ b/backend/plugins/pagerduty/models/raw/notifications.go
@@ -14,9 +14,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 
or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 */
-// Code generated by github.com/atombender/go-jsonschema, DO NOT EDIT.
 
-package generated
+package raw
 
 import "time"
 
diff --git a/backend/plugins/pagerduty/models/generated/services.go 
b/backend/plugins/pagerduty/models/raw/services.go
similarity index 98%
rename from backend/plugins/pagerduty/models/generated/services.go
rename to backend/plugins/pagerduty/models/raw/services.go
index 0823548a1..c68b42f2a 100644
--- a/backend/plugins/pagerduty/models/generated/services.go
+++ b/backend/plugins/pagerduty/models/raw/services.go
@@ -14,9 +14,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 
or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 */
-// Code generated by github.com/atombender/go-jsonschema, DO NOT EDIT.
 
-package generated
+package raw
 
 import "time"
 
diff --git a/backend/plugins/pagerduty/models/service.go 
b/backend/plugins/pagerduty/models/service.go
index 759170ba5..6c5cfb3ee 100644
--- a/backend/plugins/pagerduty/models/service.go
+++ b/backend/plugins/pagerduty/models/service.go
@@ -23,10 +23,11 @@ import (
 
 type Service struct {
        common.NoPKModel
-       ConnectionId uint64 `gorm:"primaryKey"`
-       Url          string
-       Id           string `gorm:"primaryKey"`
-       Name         string
+       ConnectionId         uint64 `json:"connection_id" 
mapstructure:"connectionId,omitempty" gorm:"primaryKey" `
+       Url                  string `json:"url" mapstructure:"url"`
+       Id                   string `json:"id" mapstructure:"id" 
gorm:"primaryKey" `
+       TransformationRuleId uint64 `json:"transformation_rule_id" 
mapstructure:"transformation_rule_id,omitempty"` //keys to 
PagerdutyTransformationRules.ID
+       Name                 string `json:"name" mapstructure:"name"`
 }
 
 func (Service) TableName() string {
diff --git a/backend/plugins/pagerduty/models/service.go 
b/backend/plugins/pagerduty/models/transformation_rules.go
similarity index 72%
copy from backend/plugins/pagerduty/models/service.go
copy to backend/plugins/pagerduty/models/transformation_rules.go
index 759170ba5..6d83916ac 100644
--- a/backend/plugins/pagerduty/models/service.go
+++ b/backend/plugins/pagerduty/models/transformation_rules.go
@@ -21,14 +21,12 @@ import (
        "github.com/apache/incubator-devlake/core/models/common"
 )
 
-type Service struct {
-       common.NoPKModel
-       ConnectionId uint64 `gorm:"primaryKey"`
-       Url          string
-       Id           string `gorm:"primaryKey"`
-       Name         string
+type PagerdutyTransformationRule struct {
+       common.Model `mapstructure:"-"`
+       Name         string `mapstructure:"name" json:"name" 
gorm:"type:varchar(255);index:idx_name_github,unique" validate:"required"`
+       ConnectionId uint64
 }
 
-func (Service) TableName() string {
-       return "_tool_pagerduty_services"
+func (PagerdutyTransformationRule) TableName() string {
+       return "_tool_pagerduty_transformation_rules"
 }
diff --git a/backend/plugins/pagerduty/tasks/incidents_collector.go 
b/backend/plugins/pagerduty/tasks/incidents_collector.go
index 55a2fd854..baadcb163 100644
--- a/backend/plugins/pagerduty/tasks/incidents_collector.go
+++ b/backend/plugins/pagerduty/tasks/incidents_collector.go
@@ -18,37 +18,130 @@ limitations under the License.
 package tasks
 
 import (
+       "encoding/json"
+       "fmt"
+       "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/plugin"
-       helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
-       "github.com/apache/incubator-devlake/helpers/pluginhelper/tap"
+       "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
        "github.com/apache/incubator-devlake/plugins/pagerduty/models"
+       "net/http"
+       "net/url"
+       "reflect"
+       "time"
 )
 
 const RAW_INCIDENTS_TABLE = "pagerduty_incidents"
 
 var _ plugin.SubTaskEntryPoint = CollectIncidents
 
+type (
+       pagingInfo struct {
+               Limit  *int  `json:"limit"`
+               Offset *int  `json:"offset"`
+               Total  *int  `json:"total"`
+               More   *bool `json:"more"`
+       }
+       collectedIncidents struct {
+               pagingInfo
+               Incidents []json.RawMessage `json:"incidents"`
+       }
+
+       collectedIncident struct {
+               pagingInfo
+               Incident json.RawMessage `json:"incident"`
+       }
+       simplifiedRawIncident struct {
+               Number    int       `json:"number"`
+               CreatedAt time.Time `json:"created_at"`
+       }
+)
+
 func CollectIncidents(taskCtx plugin.SubTaskContext) errors.Error {
        data := taskCtx.GetData().(*PagerDutyTaskData)
-       collector, err := tap.NewTapCollector(
-               &tap.CollectorArgs[tap.SingerTapStream]{
-                       RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
-                               Ctx:   taskCtx,
-                               Table: RAW_INCIDENTS_TABLE,
-                               Params: models.PagerDutyParams{
-                                       Stream:       models.IncidentStream,
-                                       ConnectionId: data.Options.ConnectionId,
+       db := taskCtx.GetDal()
+       args := api.RawDataSubTaskArgs{
+               Ctx: taskCtx,
+               Params: PagerDutyParams{
+                       ConnectionId: data.Options.ConnectionId,
+               },
+               Table: RAW_INCIDENTS_TABLE,
+       }
+       collector, err := 
api.NewStatefulApiCollectorForFinalizableEntity(api.FinalizableApiCollectorArgs{
+               RawDataSubTaskArgs: args,
+               ApiClient:          data.Client,
+               TimeAfter:          data.TimeAfter,
+               CollectNewRecordsByList: api.FinalizableApiCollectorListArgs{
+                       PageSize: 100,
+                       GetTotalPages: func(res *http.Response, args 
*api.ApiCollectorArgs) (int, errors.Error) {
+                               paging := pagingInfo{}
+                               err := api.UnmarshalResponse(res, &paging)
+                               if err != nil {
+                                       return 0, errors.BadInput.Wrap(err, 
"failed to determined paging count")
+                               }
+                               return *paging.Total, nil
+                       },
+                       FinalizableApiCollectorCommonArgs: 
api.FinalizableApiCollectorCommonArgs{
+                               UrlTemplate: "incidents",
+                               Query: func(reqData *api.RequestData, 
createdAfter *time.Time) (url.Values, errors.Error) {
+                                       query := url.Values{}
+                                       if createdAfter != nil {
+                                               now := time.Now()
+                                               if 
now.Sub(*createdAfter).Seconds() > 180*24*time.Hour.Seconds() {
+                                                       // beyond 6 months 
Pagerduty API will just return nothing, so need to query for 'all' instead
+                                                       query.Set("date_range", 
"all")
+                                               } else {
+                                                       // since for PagerDuty 
is actually the created_at time of the incident (this is not well documented in 
their APIs)
+                                                       query.Set("since", 
data.TimeAfter.String())
+                                               }
+                                       } else {
+                                               query.Set("date_range", "all")
+                                       }
+                                       query.Set("sort_by", "created_at:desc")
+                                       query.Set("limit", fmt.Sprintf("%d", 
reqData.Pager.Size))
+                                       query.Set("offset", fmt.Sprintf("%d", 
reqData.Pager.Page))
+                                       query.Set("total", "true")
+                                       return query, nil
                                },
+                               ResponseParser: func(res *http.Response) 
([]json.RawMessage, errors.Error) {
+                                       rawResult := collectedIncidents{}
+                                       err := api.UnmarshalResponse(res, 
&rawResult)
+                                       return rawResult.Incidents, err
+                               },
+                       },
+               },
+               CollectUnfinishedDetails: api.FinalizableApiCollectorDetailArgs{
+                       FinalizableApiCollectorCommonArgs: 
api.FinalizableApiCollectorCommonArgs{
+                               // 2. "Input" here is the type: 
simplifiedRawIncident which is the element type of the returned iterator from 
BuildInputIterator
+                               UrlTemplate: "incidents/{{ .Input.Number }}",
+                               // 3. No custom query params/headers needed for 
this endpoint
+                               Query: nil,
+                               // 4. Parse the response for this endpoint call 
into a json.RawMessage
+                               ResponseParser: func(res *http.Response) 
([]json.RawMessage, errors.Error) {
+                                       rawResult := collectedIncident{}
+                                       err := api.UnmarshalResponse(res, 
&rawResult)
+                                       return 
[]json.RawMessage{rawResult.Incident}, err
+                               },
+                       },
+                       BuildInputIterator: func() (api.Iterator, errors.Error) 
{
+                               // 1. fetch individual "active/non-final" 
incidents from previous collections+extractions
+                               cursor, err := db.Cursor(
+                                       dal.Select("number, created_date"),
+                                       dal.From(&models.Incident{}),
+                                       dal.Where(
+                                               "service_id = ? AND 
connection_id = ? AND status != ?",
+                                               data.Options.ServiceId, 
data.Options.ConnectionId, "resolved",
+                                       ),
+                               )
+                               if err != nil {
+                                       return nil, err
+                               }
+                               return api.NewDalCursorIterator(db, cursor, 
reflect.TypeOf(simplifiedRawIncident{}))
                        },
-                       TapClient:    data.Client,
-                       TapConfig:    data.Config,
-                       ConnectionId: data.Options.ConnectionId, // Seems to be 
an inconsequential field
-                       StreamName:   models.IncidentStream,
                },
-       )
+       })
        if err != nil {
-               return err
+               return nil
        }
        return collector.Execute()
 }
diff --git a/backend/plugins/pagerduty/tasks/incidents_converter.go 
b/backend/plugins/pagerduty/tasks/incidents_converter.go
index c45dac63c..4f1d4bc9a 100644
--- a/backend/plugins/pagerduty/tasks/incidents_converter.go
+++ b/backend/plugins/pagerduty/tasks/incidents_converter.go
@@ -69,9 +69,8 @@ func ConvertIncidents(taskCtx plugin.SubTaskContext) 
errors.Error {
        converter, err := api.NewDataConverter(api.DataConverterArgs{
                RawDataSubTaskArgs: api.RawDataSubTaskArgs{
                        Ctx: taskCtx,
-                       Params: models.PagerDutyParams{
+                       Params: PagerDutyParams{
                                ConnectionId: data.Options.ConnectionId,
-                               Stream:       models.IncidentStream,
                        },
                        Table: RAW_INCIDENTS_TABLE,
                },
diff --git a/backend/plugins/pagerduty/tasks/incidents_extractor.go 
b/backend/plugins/pagerduty/tasks/incidents_extractor.go
index f186ca451..d5ba7eb26 100644
--- a/backend/plugins/pagerduty/tasks/incidents_extractor.go
+++ b/backend/plugins/pagerduty/tasks/incidents_extractor.go
@@ -23,7 +23,7 @@ import (
        "github.com/apache/incubator-devlake/core/plugin"
        "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
        "github.com/apache/incubator-devlake/plugins/pagerduty/models"
-       "github.com/apache/incubator-devlake/plugins/pagerduty/models/generated"
+       "github.com/apache/incubator-devlake/plugins/pagerduty/models/raw"
 )
 
 var _ plugin.SubTaskEntryPoint = ExtractIncidents
@@ -33,14 +33,13 @@ func ExtractIncidents(taskCtx plugin.SubTaskContext) 
errors.Error {
        extractor, err := api.NewApiExtractor(api.ApiExtractorArgs{
                RawDataSubTaskArgs: api.RawDataSubTaskArgs{
                        Ctx: taskCtx,
-                       Params: models.PagerDutyParams{
+                       Params: PagerDutyParams{
                                ConnectionId: data.Options.ConnectionId,
-                               Stream:       models.IncidentStream,
                        },
                        Table: RAW_INCIDENTS_TABLE,
                },
                Extract: func(row *api.RawData) ([]interface{}, errors.Error) {
-                       incidentRaw := &generated.Incidents{}
+                       incidentRaw := &raw.Incidents{}
                        err := errors.Convert(json.Unmarshal(row.Data, 
incidentRaw))
                        if err != nil {
                                return nil, err
diff --git a/backend/plugins/pagerduty/tasks/task_data.go 
b/backend/plugins/pagerduty/tasks/task_data.go
index c03685b20..91ba00329 100644
--- a/backend/plugins/pagerduty/tasks/task_data.go
+++ b/backend/plugins/pagerduty/tasks/task_data.go
@@ -19,34 +19,70 @@ package tasks
 
 import (
        "github.com/apache/incubator-devlake/core/errors"
-       helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
-       "github.com/apache/incubator-devlake/helpers/pluginhelper/tap"
+       "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
        "github.com/apache/incubator-devlake/plugins/pagerduty/models"
+       "time"
 )
 
 type PagerDutyOptions struct {
-       ConnectionId    uint64   `json:"connectionId"`
-       Tasks           []string `json:"tasks,omitempty"`
-       Transformations TransformationRules
+       ConnectionId uint64   `json:"connectionId"`
+       TimeAfter    string   `json:"time_after,omitempty"`
+       ServiceId    string   `json:"service_id,omitempty"`
+       ServiceName  string   `json:"service_name,omitempty"`
+       Tasks        []string `json:"tasks,omitempty"`
+       *models.PagerdutyTransformationRule
 }
 
 type PagerDutyTaskData struct {
-       Options *PagerDutyOptions `json:"-"`
-       Config  *models.PagerDutyConfig
-       Client  *tap.SingerTap
+       Options   *PagerDutyOptions
+       TimeAfter *time.Time
+       Client    api.RateLimitedApiClient
 }
 
-type TransformationRules struct {
-       //Placeholder struct for later if needed
+type PagerDutyParams struct {
+       ConnectionId uint64
 }
 
 func DecodeAndValidateTaskOptions(options map[string]interface{}) 
(*PagerDutyOptions, errors.Error) {
+       op, err := DecodeTaskOptions(options)
+       if err != nil {
+               return nil, err
+       }
+       err = ValidateTaskOptions(op)
+       if err != nil {
+               return nil, err
+       }
+       return op, nil
+}
+
+func DecodeTaskOptions(options map[string]interface{}) (*PagerDutyOptions, 
errors.Error) {
        var op PagerDutyOptions
-       if err := helper.Decode(options, &op, nil); err != nil {
+       err := api.Decode(options, &op, nil)
+       if err != nil {
+               return nil, err
+       }
+       return &op, nil
+}
+
+func EncodeTaskOptions(op *PagerDutyOptions) (map[string]interface{}, 
errors.Error) {
+       var result map[string]interface{}
+       err := api.Decode(op, &result, nil)
+       if err != nil {
                return nil, err
        }
+       return result, nil
+}
+
+func ValidateTaskOptions(op *PagerDutyOptions) errors.Error {
+       if op.ServiceName == "" {
+               return errors.BadInput.New("not enough info for Pagerduty 
execution")
+       }
+       if op.ServiceId == "" {
+               return errors.BadInput.New("not enough info for Pagerduty 
execution")
+       }
+       // find the needed GitHub now
        if op.ConnectionId == 0 {
-               return nil, errors.Default.New("connectionId is invalid")
+               return errors.BadInput.New("connectionId is invalid")
        }
-       return &op, nil
+       return nil
 }
diff --git a/backend/python/pydevlake/pydevlake/doc.template.json 
b/backend/python/pydevlake/pydevlake/doc.template.json
index a565187cd..10cccd2c1 100644
--- a/backend/python/pydevlake/pydevlake/doc.template.json
+++ b/backend/python/pydevlake/pydevlake/doc.template.json
@@ -249,7 +249,7 @@
                 }
             }
         },
-        "/plugins/$plugin_name/transformation_rules": {
+        
"/plugins/$plugin_name/connections/{connectionId}/transformation_rules": {
             "get": {
                 "description": "Get all transformation rules",
                 "parameters": [
@@ -300,7 +300,7 @@
                 }
             }
         },
-        "/plugins/$plugin_name/transformation_rules/{ruleId}": {
+        
"/plugins/$plugin_name/connections/{connectionId}/transformation_rules/{ruleId}":
 {
             "get": {
                 "description": "Get a transformation rule",
                 "parameters": [
diff --git a/backend/server/api/remote/register.go 
b/backend/server/api/remote/register.go
index 1c886027b..492700cc4 100644
--- a/backend/server/api/remote/register.go
+++ b/backend/server/api/remote/register.go
@@ -58,7 +58,7 @@ func RegisterPlugin(router *gin.Engine, registerEndpoints 
func(r *gin.Engine, pl
                }
                remotePlugin, err := remote.NewRemotePlugin(&details.PluginInfo)
                if err != nil {
-                       shared.ApiOutputError(c, errors.Default.Wrap(err, 
"plugin could not be initialized"))
+                       shared.ApiOutputError(c, errors.Default.Wrap(err, 
fmt.Sprintf("plugin %s could not be initialized", details.PluginInfo.Name)))
                        return
                }
                resource := ApiResource{
diff --git a/backend/server/services/remote/init.go 
b/backend/server/services/remote/init.go
index 98d46770d..3e505b96c 100644
--- a/backend/server/services/remote/init.go
+++ b/backend/server/services/remote/init.go
@@ -18,6 +18,7 @@ limitations under the License.
 package remote
 
 import (
+       "fmt"
        "github.com/apache/incubator-devlake/core/config"
        "github.com/apache/incubator-devlake/core/context"
        "github.com/apache/incubator-devlake/core/errors"
@@ -36,11 +37,11 @@ func Init(br context.BasicRes) {
 
 func NewRemotePlugin(info *models.PluginInfo) (models.RemotePlugin, 
errors.Error) {
        if _, ok := remotePlugins[info.Name]; ok {
-               return nil, errors.BadInput.New("plugin already registered")
+               return nil, errors.BadInput.New(fmt.Sprintf("plugin %s already 
registered", info.Name))
        }
        plugin, err := remote.NewRemotePlugin(info)
        if err != nil {
-               return nil, errors.BadInput.New("unsupported plugin type")
+               return nil, err
        }
        forceMigration := config.GetConfig().GetBool("FORCE_MIGRATION")
        err = plugin.RunMigrations(forceMigration)
diff --git a/backend/server/services/remote/models/conversion.go 
b/backend/server/services/remote/models/conversion.go
index 0deb03dbd..651947236 100644
--- a/backend/server/services/remote/models/conversion.go
+++ b/backend/server/services/remote/models/conversion.go
@@ -91,7 +91,7 @@ func canonicalFieldName(fieldName string) string {
 func generateStructField(name string, encrypt bool, schema map[string]any) 
(*reflect.StructField, errors.Error) {
        goType, err := getGoType(schema)
        if err != nil {
-               return nil, err
+               return nil, errors.Default.Wrap(err, fmt.Sprintf("couldn't 
resolve type for field: \"%s\"", name))
        }
        sf := &reflect.StructField{
                Name: strings.Title(name), //nolint:staticcheck
diff --git a/backend/server/services/remote/plugin/default_api.go 
b/backend/server/services/remote/plugin/default_api.go
index 6245fa5a7..568ec85da 100644
--- a/backend/server/services/remote/plugin/default_api.go
+++ b/backend/server/services/remote/plugin/default_api.go
@@ -76,11 +76,11 @@ func GetDefaultAPI(
        }
 
        if txRuleType != nil {
-               resources["transformation_rules"] = 
map[string]plugin.ApiResourceHandler{
+               resources["connections/:connectionId/transformation_rules"] = 
map[string]plugin.ApiResourceHandler{
                        "POST": papi.PostTransformationRules,
                        "GET":  papi.ListTransformationRules,
                }
-               resources["transformation_rules/:id"] = 
map[string]plugin.ApiResourceHandler{
+               resources["connections/:connectionId/transformation_rules/:id"] 
= map[string]plugin.ApiResourceHandler{
                        "GET":   papi.GetTransformationRule,
                        "PATCH": papi.PatchTransformationRule,
                }
diff --git a/backend/server/services/remote/plugin/plugin_impl.go 
b/backend/server/services/remote/plugin/plugin_impl.go
index b6e250819..6b1d95390 100644
--- a/backend/server/services/remote/plugin/plugin_impl.go
+++ b/backend/server/services/remote/plugin/plugin_impl.go
@@ -54,19 +54,19 @@ type (
 func newPlugin(info *models.PluginInfo, invoker bridge.Invoker) 
(*remotePluginImpl, errors.Error) {
        connectionTabler, err := 
info.ConnectionModelInfo.LoadDynamicTabler(true, common.Model{})
        if err != nil {
-               return nil, err
+               return nil, errors.Default.Wrap(err, fmt.Sprintf("Couldn't load 
Connection type for plugin %s", info.Name))
        }
 
        var txRuleTabler *coreModels.DynamicTabler
        if info.TransformationRuleModelInfo != nil {
                txRuleTabler, err = 
info.TransformationRuleModelInfo.LoadDynamicTabler(false, 
models.TransformationModel{})
                if err != nil {
-                       return nil, err
+                       return nil, errors.Default.Wrap(err, 
fmt.Sprintf("Couldn't load TransformationRule type for plugin %s", info.Name))
                }
        }
        scopeTabler, err := info.ScopeModelInfo.LoadDynamicTabler(false, 
models.ScopeModel{})
        if err != nil {
-               return nil, err
+               return nil, errors.Default.Wrap(err, fmt.Sprintf("Couldn't load 
Scope type for plugin %s", info.Name))
        }
        p := remotePluginImpl{
                name:                     info.Name,
diff --git a/backend/test/integration/helper/api.go 
b/backend/test/integration/helper/api.go
index 237122d65..5100fc737 100644
--- a/backend/test/integration/helper/api.go
+++ b/backend/test/integration/helper/api.go
@@ -19,14 +19,14 @@ package helper
 
 import (
        "fmt"
-       "net/http"
-       "reflect"
-       "strings"
-       "time"
-
+       "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/models"
        "github.com/apache/incubator-devlake/core/plugin"
        apiProject "github.com/apache/incubator-devlake/server/api/project"
+       "github.com/stretchr/testify/require"
+       "net/http"
+       "reflect"
+       "strings"
 )
 
 // CreateConnection FIXME
@@ -35,7 +35,7 @@ func (d *DevlakeClient) TestConnection(pluginName string, 
connection any) {
        _ = sendHttpRequest[Connection](d.testCtx, d.timeout, debugInfo{
                print:      true,
                inlineJson: false,
-       }, http.MethodPost, fmt.Sprintf("%s/plugins/%s/test", d.Endpoint, 
pluginName), connection)
+       }, http.MethodPost, fmt.Sprintf("%s/plugins/%s/test", d.Endpoint, 
pluginName), nil, connection)
 }
 
 // CreateConnection FIXME
@@ -44,7 +44,7 @@ func (d *DevlakeClient) CreateConnection(pluginName string, 
connection any) *Con
        created := sendHttpRequest[Connection](d.testCtx, d.timeout, debugInfo{
                print:      true,
                inlineJson: false,
-       }, http.MethodPost, fmt.Sprintf("%s/plugins/%s/connections", 
d.Endpoint, pluginName), connection)
+       }, http.MethodPost, fmt.Sprintf("%s/plugins/%s/connections", 
d.Endpoint, pluginName), nil, connection)
        return &created
 }
 
@@ -54,7 +54,7 @@ func (d *DevlakeClient) ListConnections(pluginName string) 
[]*Connection {
        all := sendHttpRequest[[]*Connection](d.testCtx, d.timeout, debugInfo{
                print:      true,
                inlineJson: false,
-       }, http.MethodGet, fmt.Sprintf("%s/plugins/%s/connections", d.Endpoint, 
pluginName), nil)
+       }, http.MethodGet, fmt.Sprintf("%s/plugins/%s/connections", d.Endpoint, 
pluginName), nil, nil)
        return all
 }
 
@@ -83,7 +83,7 @@ func (d *DevlakeClient) CreateBasicBlueprintV2(name string, 
config *BlueprintV2C
        blueprint = sendHttpRequest[models.Blueprint](d.testCtx, d.timeout, 
debugInfo{
                print:      true,
                inlineJson: false,
-       }, http.MethodPost, fmt.Sprintf("%s/blueprints", d.Endpoint), 
&blueprint)
+       }, http.MethodPost, fmt.Sprintf("%s/blueprints", d.Endpoint), nil, 
&blueprint)
        return blueprint
 }
 
@@ -110,7 +110,7 @@ func (d *DevlakeClient) CreateProject(project 
*ProjectConfig) models.ApiOutputPr
        return sendHttpRequest[models.ApiOutputProject](d.testCtx, d.timeout, 
debugInfo{
                print:      true,
                inlineJson: false,
-       }, http.MethodPost, fmt.Sprintf("%s/projects", d.Endpoint), 
&models.ApiInputProject{
+       }, http.MethodPost, fmt.Sprintf("%s/projects", d.Endpoint), nil, 
&models.ApiInputProject{
                BaseProject: models.BaseProject{
                        Name:        project.ProjectName,
                        Description: project.ProjectDescription,
@@ -124,59 +124,61 @@ func (d *DevlakeClient) GetProject(projectName string) 
models.ApiOutputProject {
        return sendHttpRequest[models.ApiOutputProject](d.testCtx, d.timeout, 
debugInfo{
                print:      true,
                inlineJson: false,
-       }, http.MethodGet, fmt.Sprintf("%s/projects/%s", d.Endpoint, 
projectName), nil)
+       }, http.MethodGet, fmt.Sprintf("%s/projects/%s", d.Endpoint, 
projectName), nil, nil)
 }
 
 func (d *DevlakeClient) ListProjects() apiProject.PaginatedProjects {
        return sendHttpRequest[apiProject.PaginatedProjects](d.testCtx, 
d.timeout, debugInfo{
                print:      true,
                inlineJson: false,
-       }, http.MethodGet, fmt.Sprintf("%s/projects", d.Endpoint), nil)
+       }, http.MethodGet, fmt.Sprintf("%s/projects", d.Endpoint), nil, nil)
 }
 
-func (d *DevlakeClient) CreateScope(pluginName string, connectionId uint64, 
scope any) any {
+func (d *DevlakeClient) CreateScope(pluginName string, connectionId uint64, 
scopes ...any) any {
        request := map[string]any{
-               "Data": []any{scope},
+               "data": scopes,
        }
        return sendHttpRequest[any](d.testCtx, d.timeout, debugInfo{
                print:      true,
                inlineJson: false,
-       }, http.MethodPut, fmt.Sprintf("%s/plugins/%s/connections/%d/scopes", 
d.Endpoint, pluginName, connectionId), request)
+       }, http.MethodPut, fmt.Sprintf("%s/plugins/%s/connections/%d/scopes", 
d.Endpoint, pluginName, connectionId), nil, request)
 }
 
 func (d *DevlakeClient) UpdateScope(pluginName string, connectionId uint64, 
scopeId string, scope any) any {
        return sendHttpRequest[any](d.testCtx, d.timeout, debugInfo{
                print:      true,
                inlineJson: false,
-       }, http.MethodPatch, 
fmt.Sprintf("%s/plugins/%s/connections/%d/scopes/%s", d.Endpoint, pluginName, 
connectionId, scopeId), scope)
+       }, http.MethodPatch, 
fmt.Sprintf("%s/plugins/%s/connections/%d/scopes/%s", d.Endpoint, pluginName, 
connectionId, scopeId), nil, scope)
 }
 
 func (d *DevlakeClient) ListScopes(pluginName string, connectionId uint64) 
[]any {
        return sendHttpRequest[[]any](d.testCtx, d.timeout, debugInfo{
                print:      true,
                inlineJson: false,
-       }, http.MethodGet, fmt.Sprintf("%s/plugins/%s/connections/%d/scopes", 
d.Endpoint, pluginName, connectionId), nil)
+       }, http.MethodGet, fmt.Sprintf("%s/plugins/%s/connections/%d/scopes", 
d.Endpoint, pluginName, connectionId), nil, nil)
 }
 
 func (d *DevlakeClient) GetScope(pluginName string, connectionId uint64, 
scopeId string) any {
        return sendHttpRequest[any](d.testCtx, d.timeout, debugInfo{
                print:      true,
                inlineJson: false,
-       }, http.MethodGet, 
fmt.Sprintf("%s/plugins/%s/connections/%d/scopes/%s", d.Endpoint, pluginName, 
connectionId, scopeId), nil)
+       }, http.MethodGet, 
fmt.Sprintf("%s/plugins/%s/connections/%d/scopes/%s", d.Endpoint, pluginName, 
connectionId, scopeId), nil, nil)
 }
 
-func (d *DevlakeClient) CreateTransformationRule(pluginName string, rules any) 
any {
+func (d *DevlakeClient) CreateTransformationRule(pluginName string, 
connectionId uint64, rules any) any {
        return sendHttpRequest[any](d.testCtx, d.timeout, debugInfo{
                print:      true,
                inlineJson: false,
-       }, http.MethodPost, fmt.Sprintf("%s/plugins/%s/transformation_rules", 
d.Endpoint, pluginName), rules)
+       }, http.MethodPost, 
fmt.Sprintf("%s/plugins/%s/connections/%d/transformation_rules",
+               d.Endpoint, pluginName, connectionId), nil, rules)
 }
 
-func (d *DevlakeClient) ListTransformationRules(pluginName string) []any {
+func (d *DevlakeClient) ListTransformationRules(pluginName string, 
connectionId uint64) []any {
        return sendHttpRequest[[]any](d.testCtx, d.timeout, debugInfo{
                print:      true,
                inlineJson: false,
-       }, http.MethodGet, 
fmt.Sprintf("%s/plugins/%s/transformation_rules?pageSize=20?page=1", 
d.Endpoint, pluginName), nil)
+       }, http.MethodGet, 
fmt.Sprintf("%s/plugins/%s/connections/%d/transformation_rules?pageSize=20&page=1",
+               d.Endpoint, pluginName, connectionId), nil, nil)
 }
 
 func (d *DevlakeClient) RemoteScopes(query RemoteScopesQuery) 
RemoteScopesOutput {
@@ -200,7 +202,7 @@ func (d *DevlakeClient) RemoteScopes(query 
RemoteScopesQuery) RemoteScopesOutput
        return sendHttpRequest[RemoteScopesOutput](d.testCtx, d.timeout, 
debugInfo{
                print:      true,
                inlineJson: false,
-       }, http.MethodGet, url, nil)
+       }, http.MethodGet, url, nil, nil)
 }
 
 // SearchRemoteScopes makes calls to the "scope API" indirectly. "Search" is 
the remote endpoint to hit.
@@ -216,7 +218,7 @@ func (d *DevlakeClient) SearchRemoteScopes(query 
SearchRemoteScopesQuery) Search
                query.Page,
                query.PageSize,
                mapToQueryString(query.Params)),
-               nil)
+               nil, nil)
 }
 
 // CreateBasicBlueprint FIXME
@@ -238,7 +240,7 @@ func (d *DevlakeClient) CreateBasicBlueprint(name string, 
connection *plugin.Blu
        blueprint = sendHttpRequest[models.Blueprint](d.testCtx, d.timeout, 
debugInfo{
                print:      true,
                inlineJson: false,
-       }, http.MethodPost, fmt.Sprintf("%s/blueprints", d.Endpoint), 
&blueprint)
+       }, http.MethodPost, fmt.Sprintf("%s/blueprints", d.Endpoint), nil, 
&blueprint)
        return blueprint
 }
 
@@ -248,7 +250,7 @@ func (d *DevlakeClient) TriggerBlueprint(blueprintId 
uint64) models.Pipeline {
        pipeline := sendHttpRequest[models.Pipeline](d.testCtx, d.timeout, 
debugInfo{
                print:      true,
                inlineJson: false,
-       }, http.MethodPost, fmt.Sprintf("%s/blueprints/%d/trigger", d.Endpoint, 
blueprintId), nil)
+       }, http.MethodPost, fmt.Sprintf("%s/blueprints/%d/trigger", d.Endpoint, 
blueprintId), nil, nil)
        return d.monitorPipeline(pipeline.ID)
 }
 
@@ -258,7 +260,7 @@ func (d *DevlakeClient) RunPipeline(pipeline 
models.NewPipeline) models.Pipeline
        pipelineResult := sendHttpRequest[models.Pipeline](d.testCtx, 
d.timeout, debugInfo{
                print:      true,
                inlineJson: false,
-       }, http.MethodPost, fmt.Sprintf("%s/pipelines", d.Endpoint), &pipeline)
+       }, http.MethodPost, fmt.Sprintf("%s/pipelines", d.Endpoint), nil, 
&pipeline)
        return d.monitorPipeline(pipelineResult.ID)
 }
 
@@ -276,18 +278,24 @@ func (d *DevlakeClient) monitorPipeline(id uint64) 
models.Pipeline {
        var previousResult models.Pipeline
        endpoint := fmt.Sprintf("%s/pipelines/%d", d.Endpoint, id)
        coloredPrintf("calling:\n\t%s %s\nwith:\n%s\n", http.MethodGet, 
endpoint, string(ToCleanJson(false, nil)))
-       for {
-               time.Sleep(1 * time.Second)
-               pipelineResult := sendHttpRequest[models.Pipeline](d.testCtx, 
d.timeout, debugInfo{
+       var pipelineResult models.Pipeline
+       require.NoError(d.testCtx, runWithTimeout(d.pipelineTimeout, func() 
(bool, errors.Error) {
+               pipelineResult = sendHttpRequest[models.Pipeline](d.testCtx, 
d.pipelineTimeout, debugInfo{
                        print: false,
-               }, http.MethodGet, fmt.Sprintf("%s/pipelines/%d", d.Endpoint, 
id), nil)
-               if pipelineResult.Status == models.TASK_COMPLETED || 
pipelineResult.Status == models.TASK_FAILED {
+               }, http.MethodGet, fmt.Sprintf("%s/pipelines/%d", d.Endpoint, 
id), nil, nil)
+               if pipelineResult.Status == models.TASK_COMPLETED {
                        coloredPrintf("result: %s\n", ToCleanJson(true, 
&pipelineResult))
-                       return pipelineResult
+                       return true, nil
+               }
+               if pipelineResult.Status == models.TASK_FAILED {
+                       coloredPrintf("result: %s\n", ToCleanJson(true, 
&pipelineResult))
+                       return true, errors.Default.New("pipeline task failed")
                }
                if !reflect.DeepEqual(pipelineResult, previousResult) {
                        coloredPrintf("result: %s\n", ToCleanJson(true, 
&pipelineResult))
                }
                previousResult = pipelineResult
-       }
+               return false, nil
+       }))
+       return pipelineResult
 }
diff --git a/backend/test/integration/helper/client.go 
b/backend/test/integration/helper/client.go
index 750326e6b..c05c1571c 100644
--- a/backend/test/integration/helper/client.go
+++ b/backend/test/integration/helper/client.go
@@ -24,6 +24,7 @@ import (
        goerror "errors"
        "fmt"
        "io"
+       "math"
        "net/http"
        "os"
        "sync"
@@ -53,9 +54,9 @@ var (
        throwawayDir           string
        initService            = new(sync.Once)
        dbTruncationExclusions = []string{
-               "_devlake_migration_history",
-               "_devlake_locking_stub",
-               "_devlake_locking_history",
+               migration.MigrationHistory{}.TableName(),
+               models.LockingHistory{}.TableName(),
+               models.LockingStub{}.TableName(),
        }
 )
 
@@ -70,13 +71,14 @@ func init() {
 // DevlakeClient FIXME
 type (
        DevlakeClient struct {
-               Endpoint string
-               db       *gorm.DB
-               log      log.Logger
-               cfg      *viper.Viper
-               testCtx  *testing.T
-               basicRes corectx.BasicRes
-               timeout  time.Duration
+               Endpoint        string
+               db              *gorm.DB
+               log             log.Logger
+               cfg             *viper.Viper
+               testCtx         *testing.T
+               basicRes        corectx.BasicRes
+               timeout         time.Duration
+               pipelineTimeout time.Duration
        }
        LocalClientConfig struct {
                ServerPort           uint
@@ -87,6 +89,7 @@ type (
                Plugins              map[string]plugin.PluginMeta
                AdditionalMigrations func() []plugin.MigrationScript
                Timeout              time.Duration
+               PipelineTimeout      time.Duration
        }
        RemoteClientConfig struct {
                Endpoint string
@@ -104,12 +107,12 @@ func ConnectRemoteServer(t *testing.T, sbConfig 
*RemoteClientConfig) *DevlakeCli
 }
 
 // ConnectLocalServer spins up a local server from the config and returns a 
client connected to it
-func ConnectLocalServer(t *testing.T, sbConfig *LocalClientConfig) 
*DevlakeClient {
+func ConnectLocalServer(t *testing.T, clientConfig *LocalClientConfig) 
*DevlakeClient {
        t.Helper()
        fmt.Printf("Using test temp directory: %s\n", throwawayDir)
        logger := logruslog.Global.Nested("test")
        cfg := config.GetConfig()
-       cfg.Set("DB_URL", sbConfig.DbURL)
+       cfg.Set("DB_URL", clientConfig.DbURL)
        db, err := runner.NewGormDb(cfg, logger)
        require.NoError(t, err)
        t.Cleanup(func() {
@@ -117,23 +120,30 @@ func ConnectLocalServer(t *testing.T, sbConfig 
*LocalClientConfig) *DevlakeClien
                require.NoError(t, err)
                require.NoError(t, d.Close())
        })
-       addr := fmt.Sprintf("http://localhost:%d";, sbConfig.ServerPort)
+       addr := fmt.Sprintf("http://localhost:%d";, clientConfig.ServerPort)
        d := &DevlakeClient{
-               Endpoint: addr,
-               db:       db,
-               log:      logger,
-               cfg:      cfg,
-               basicRes: contextimpl.NewDefaultBasicRes(cfg, logger, 
dalgorm.NewDalgorm(db)),
-               testCtx:  t,
-               timeout:  sbConfig.Timeout,
+               Endpoint:        addr,
+               db:              db,
+               log:             logger,
+               cfg:             cfg,
+               basicRes:        contextimpl.NewDefaultBasicRes(cfg, logger, 
dalgorm.NewDalgorm(db)),
+               testCtx:         t,
+               timeout:         clientConfig.Timeout,
+               pipelineTimeout: clientConfig.PipelineTimeout,
+       }
+       if d.timeout == 0 {
+               d.timeout = 10 * time.Second
+       }
+       if d.pipelineTimeout == 0 {
+               d.pipelineTimeout = 30 * time.Second
        }
        d.configureEncryption()
-       d.initPlugins(sbConfig)
-       if sbConfig.DropDb || sbConfig.TruncateDb {
-               d.prepareDB(sbConfig)
+       d.initPlugins(clientConfig)
+       if clientConfig.DropDb || clientConfig.TruncateDb {
+               d.prepareDB(clientConfig)
        }
-       if sbConfig.CreateServer {
-               cfg.Set("PORT", sbConfig.ServerPort)
+       if clientConfig.CreateServer {
+               cfg.Set("PORT", clientConfig.ServerPort)
                cfg.Set("PLUGIN_DIR", throwawayDir)
                cfg.Set("LOGGING_DIR", throwawayDir)
                go func() {
@@ -146,7 +156,7 @@ func ConnectLocalServer(t *testing.T, sbConfig 
*LocalClientConfig) *DevlakeClien
                e := err.Unwrap()
                return goerror.Is(e, syscall.ECONNREFUSED)
        })
-       d.runMigrations(sbConfig)
+       d.runMigrations(clientConfig)
        return d
 }
 
@@ -156,20 +166,12 @@ func (d *DevlakeClient) SetTimeout(timeout time.Duration) 
{
 }
 
 // AwaitPluginAvailability wait for this plugin to become available on the 
server given a timeout. Returns false if this condition does not get met.
-func (d *DevlakeClient) AwaitPluginAvailability(pluginName string, timeout 
time.Duration) bool {
-       timeoutCh := time.After(timeout)
-       for {
-               select {
-               case <-timeoutCh:
-                       return false
-               default:
-                       _, err := plugin.GetPlugin(pluginName)
-                       if err == nil {
-                               return true
-                       }
-                       time.Sleep(250 * time.Millisecond)
-               }
-       }
+func (d *DevlakeClient) AwaitPluginAvailability(pluginName string, timeout 
time.Duration) {
+       err := runWithTimeout(timeout, func() (bool, errors.Error) {
+               _, err := plugin.GetPlugin(pluginName)
+               return err == nil, nil
+       })
+       require.NoError(d.testCtx, err)
 }
 
 // RunPlugin manually execute a plugin directly (local server only)
@@ -310,43 +312,84 @@ func (d *DevlakeClient) prepareDB(cfg *LocalClientConfig) 
{
        }
 }
 
-func sendHttpRequest[Res any](t *testing.T, timeout time.Duration, debug 
debugInfo, httpMethod string, endpoint string, body any) Res {
+func runWithTimeout(timeout time.Duration, f func() (bool, errors.Error)) 
errors.Error {
+       if timeout == 0 {
+               timeout = math.MaxInt
+       }
+       type response struct {
+               err       errors.Error
+               completed bool
+       }
+       timer := time.After(timeout)
+       resChan := make(chan response)
+       resp := response{}
+       for {
+               go func() {
+                       done, err := f()
+                       resChan <- response{err, done}
+               }()
+               select {
+               case <-timer:
+                       if !resp.completed {
+                               return errors.Default.New(fmt.Sprintf("timed 
out calling function after %d miliseconds", timeout.Milliseconds()))
+                       }
+                       return nil
+               case resp = <-resChan:
+                       if resp.err != nil {
+                               return resp.err
+                       }
+                       if resp.completed {
+                               return nil
+                       }
+                       time.Sleep(1 * time.Second)
+                       continue
+               }
+       }
+}
+
+func sendHttpRequest[Res any](t *testing.T, timeout time.Duration, debug 
debugInfo, httpMethod string, endpoint string, headers map[string]string, body 
any) Res {
        t.Helper()
        b := ToJson(body)
        if debug.print {
                coloredPrintf("calling:\n\t%s %s\nwith:\n%s\n", httpMethod, 
endpoint, string(ToCleanJson(debug.inlineJson, body)))
        }
-       timer := time.After(timeout)
-       request, err := http.NewRequest(httpMethod, endpoint, 
bytes.NewReader(b))
-       require.NoError(t, err)
-       request.Close = true
-       request.Header.Add("Content-Type", "application/json")
-       for {
+       var result Res
+       err := runWithTimeout(timeout, func() (bool, errors.Error) {
+               request, err := http.NewRequest(httpMethod, endpoint, 
bytes.NewReader(b))
+               if err != nil {
+                       return false, errors.Convert(err)
+               }
+               request.Close = true
+               request.Header.Add("Content-Type", "application/json")
+               for header, headerVal := range headers {
+                       request.Header.Add(header, headerVal)
+               }
                response, err := http.DefaultClient.Do(request)
-               require.NoError(t, err)
-               if timeout > 0 {
-                       select {
-                       case <-timer:
-                       default:
-                               if response.StatusCode >= 300 {
-                                       require.NoError(t, 
response.Body.Close())
-                                       response.Close = true
-                                       time.Sleep(1 * time.Second)
-                                       continue
-                               }
+               if err != nil {
+                       return false, errors.Convert(err)
+               }
+               if response.StatusCode >= 300 {
+                       if err = response.Body.Close(); err != nil {
+                               return false, errors.Convert(err)
                        }
+                       response.Close = true
+                       return false, 
errors.HttpStatus(response.StatusCode).New(fmt.Sprintf("unexpected http status 
code: %d", response.StatusCode))
                }
-               require.True(t, response.StatusCode < 300, "unexpected http 
status code: %d", response.StatusCode)
-               var result Res
                b, _ = io.ReadAll(response.Body)
-               require.NoError(t, json.Unmarshal(b, &result))
+               if err = json.Unmarshal(b, &result); err != nil {
+                       return false, errors.Convert(err)
+               }
                if debug.print {
                        coloredPrintf("result: %s\n", 
ToCleanJson(debug.inlineJson, b))
                }
-               require.NoError(t, response.Body.Close())
+               if err = response.Body.Close(); err != nil {
+                       return false, errors.Convert(err)
+               }
                response.Close = true
-               return result
-       }
+               return true, nil
+       })
+       require.NoError(t, err)
+       return result
 }
 
 func coloredPrintf(msg string, args ...any) {
diff --git a/backend/test/integration/remote/helper.go 
b/backend/test/integration/remote/helper.go
index 469d49e28..1fee42b89 100644
--- a/backend/test/integration/remote/helper.go
+++ b/backend/test/integration/remote/helper.go
@@ -98,7 +98,7 @@ func CreateTestConnection(client *helper.DevlakeClient) 
*helper.Connection {
 }
 
 func CreateTestScope(client *helper.DevlakeClient, connectionId uint64) any {
-       res := client.CreateTransformationRule(PLUGIN_NAME, FakeTxRule{Name: 
"Tx rule", Env: "test env"})
+       res := client.CreateTransformationRule(PLUGIN_NAME, connectionId, 
FakeTxRule{Name: "Tx rule", Env: "test env"})
        rule, ok := res.(map[string]interface{})
        if !ok {
                panic("Cannot cast transform rule")
@@ -114,7 +114,5 @@ func CreateTestScope(client *helper.DevlakeClient, 
connectionId uint64) any {
                        TransformationRuleId: ruleId,
                },
        )
-
-       client.SetTimeout(1)
        return scope
 }

Reply via email to