This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 374debfe3 Fix remote scope azure (#4650)
374debfe3 is described below

commit 374debfe300a0479840651d7468e9454a102e95b
Author: Camille Teruel <[email protected]>
AuthorDate: Wed Mar 15 07:07:50 2023 +0100

    Fix remote scope azure (#4650)
    
    * refactor: extract common test setup
    
    * style: Remove unused API endpoints
    
    * feat: Filter builds when collecting
    
    * refactor: Simplify bridge code
    
    Side effect: some errors are not silenced anymore
    
    * fix: Fix remote-scopes command
    
    * fix: Rename azure plugin to azuredevops
    
    The name conflicts with existing go plugin
    
    ---------
    
    Co-authored-by: Camille Teruel <[email protected]>
---
 backend/core/utils/ipc.go                          | 56 ++++++----------
 .../helpers/pluginhelper/tap/singer_tap_impl.go    | 22 +++----
 .../plugins/{azure => azuredevops}/README.md       |  0
 .../azure => azuredevops/azuredevops}/__init__.py  |  0
 .../azure => azuredevops/azuredevops}/api.py       | 23 +------
 .../azure => azuredevops/azuredevops}/helper/db.py |  0
 .../azure => azuredevops/azuredevops}/main.py      | 18 ++----
 .../azure => azuredevops/azuredevops}/models.py    |  0
 .../azuredevops}/streams/builds.py                 | 30 ++-------
 .../azuredevops}/streams/commits.py                |  8 +--
 .../azuredevops}/streams/jobs.py                   | 10 +--
 .../azuredevops}/streams/pull_request_commits.py   | 12 ++--
 .../azuredevops}/streams/pull_requests.py          | 10 +--
 .../plugins/{azure => azuredevops}/poetry.lock     |  0
 .../plugins/{azure => azuredevops}/pyproject.toml  |  2 +-
 .../{azure => azuredevops}/tests/test_streams.py   |  2 +-
 backend/python/pydevlake/pydevlake/ipc.py          |  6 +-
 backend/python/pydevlake/pydevlake/message.py      |  4 ++
 backend/python/pydevlake/pydevlake/plugin.py       |  7 +-
 backend/python/pydevlake/pydevlake/subtasks.py     |  2 +-
 backend/server/services/remote/bridge/bridge.go    |  4 +-
 backend/server/services/remote/bridge/cmd.go       | 34 +++++-----
 backend/server/services/remote/bridge/returns.go   | 53 +++++-----------
 backend/server/services/remote/bridge/utils.go     | 57 -----------------
 .../services/remote/plugin/connection_api.go       |  2 +-
 .../server/services/remote/plugin/plugin_impl.go   | 10 +--
 .../services/remote/plugin/remote_scope_api.go     |  6 +-
 backend/test/helper/api.go                         | 33 ++++++----
 backend/test/remote/fakeplugin/fakeplugin/main.py  | 14 ++--
 backend/test/remote/remote_test.go                 | 74 +++++++++++++++-------
 30 files changed, 204 insertions(+), 295 deletions(-)

diff --git a/backend/core/utils/ipc.go b/backend/core/utils/ipc.go
index cbd11c8ba..d48e03fd6 100644
--- a/backend/core/utils/ipc.go
+++ b/backend/core/utils/ipc.go
@@ -30,16 +30,11 @@ import (
        "github.com/apache/incubator-devlake/core/errors"
 )
 
-// NoopConverter no-op converter
-var NoopConverter = func(b []byte) (any, errors.Error) {
-       return b, nil
-}
-
 // ProcessResponse wraps output of a process
 type ProcessResponse struct {
-       stdout any
-       stderr any
-       fdOut  any
+       stdout []byte
+       stderr []byte
+       fdOut  []byte
        err    errors.Error
 }
 
@@ -52,11 +47,11 @@ type ProcessStream struct {
 
 // StreamProcessOptions options for streaming a process
 type StreamProcessOptions struct {
-       OnStdout func(b []byte) (any, errors.Error)
-       OnStderr func(b []byte) (any, errors.Error)
+       OnStdout func(b []byte)
+       OnStderr func(b []byte)
        // UseFdOut if true, it'll open this fd to be used by the child 
process. Useful to isolate stdout and custom outputs
        UseFdOut bool
-       OnFdOut  func(b []byte) (any, errors.Error)
+       OnFdOut  func(b []byte)
 }
 
 // RunProcessOptions options for running a process
@@ -96,18 +91,15 @@ func (p *ProcessStream) Cancel() errors.Error {
        return nil
 }
 
-// GetStdout gets the stdout. The type depends on the conversion defined by 
StreamProcessOptions.OnStdout, otherwise it is []byte
-func (resp *ProcessResponse) GetStdout() any {
+func (resp *ProcessResponse) GetStdout() []byte {
        return resp.stdout
 }
 
-// GetStderr gets the stderr. The type depends on the conversion defined by 
StreamProcessOptions.OnStderr, otherwise it is []byte
-func (resp *ProcessResponse) GetStderr() any {
+func (resp *ProcessResponse) GetStderr() []byte {
        return resp.stderr
 }
 
-// GetFdOut gets the custom fd output. The type depends on the conversion 
defined by StreamProcessOptions.OnFdOut, otherwise it is []byte
-func (resp *ProcessResponse) GetFdOut() any {
+func (resp *ProcessResponse) GetFdOut() []byte {
        return resp.fdOut
 }
 
@@ -119,24 +111,21 @@ func (resp *ProcessResponse) GetError() errors.Error {
 // RunProcess runs the cmd and blocks until its completion. All returned 
results will have type []byte.
 func RunProcess(cmd *exec.Cmd, opts *RunProcessOptions) (*ProcessResponse, 
errors.Error) {
        stream, err := StreamProcess(cmd, &StreamProcessOptions{
-               OnStdout: func(b []byte) (any, errors.Error) {
+               OnStdout: func(b []byte) {
                        if opts.OnStdout != nil {
                                opts.OnStdout(b)
                        }
-                       return NoopConverter(b)
                },
-               OnStderr: func(b []byte) (any, errors.Error) {
+               OnStderr: func(b []byte) {
                        if opts.OnStderr != nil {
                                opts.OnStderr(b)
                        }
-                       return NoopConverter(b)
                },
                UseFdOut: opts.UseFdOut,
-               OnFdOut: func(b []byte) (any, errors.Error) {
+               OnFdOut: func(b []byte) {
                        if opts.OnFdOut != nil {
                                opts.OnFdOut(b)
                        }
-                       return NoopConverter(b)
                },
        })
        if err != nil {
@@ -151,13 +140,13 @@ func RunProcess(cmd *exec.Cmd, opts *RunProcessOptions) 
(*ProcessResponse, error
                        break
                }
                if result.stdout != nil {
-                       stdout = append(stdout, result.stdout.([]byte)...)
+                       stdout = append(stdout, result.stdout...)
                }
                if result.stderr != nil {
-                       stderr = append(stderr, result.stderr.([]byte)...)
+                       stderr = append(stderr, result.stderr...)
                }
                if result.fdOut != nil {
-                       fdOut = append(fdOut, result.fdOut.([]byte)...)
+                       fdOut = append(fdOut, result.fdOut...)
                }
        }
        return &ProcessResponse{
@@ -184,11 +173,11 @@ func StreamProcess(cmd *exec.Cmd, opts 
*StreamProcessOptions) (*ProcessStream, e
        }
        receiveStream := make(chan *ProcessResponse, 32)
        wg := &sync.WaitGroup{}
-       stdScanner := scanOutputPipe(pipes.stdout, wg, opts.OnStdout, 
func(result any) *ProcessResponse {
+       stdScanner := scanOutputPipe(pipes.stdout, wg, opts.OnStdout, 
func(result []byte) *ProcessResponse {
                return &ProcessResponse{stdout: result}
        }, receiveStream)
        errScanner, remoteErrorMsg := scanErrorPipe(pipes.stderr, receiveStream)
-       fdOutScanner := scanOutputPipe(pipes.fdOut, wg, opts.OnFdOut, 
func(result any) *ProcessResponse {
+       fdOutScanner := scanOutputPipe(pipes.fdOut, wg, opts.OnFdOut, 
func(result []byte) *ProcessResponse {
                return &ProcessResponse{fdOut: result}
        }, receiveStream)
        wg.Add(2)
@@ -245,8 +234,8 @@ func getPipes(cmd *exec.Cmd, opts *StreamProcessOptions) 
(*processPipes, errors.
        }, nil
 }
 
-func scanOutputPipe(pipe io.ReadCloser, wg *sync.WaitGroup, onReceive 
func([]byte) (any, errors.Error),
-       responseCreator func(any) *ProcessResponse, outboundChannel chan<- 
*ProcessResponse) func() {
+func scanOutputPipe(pipe io.ReadCloser, wg *sync.WaitGroup, onReceive 
func([]byte),
+       responseCreator func([]byte) *ProcessResponse, outboundChannel chan<- 
*ProcessResponse) func() {
        return func() {
                scanner := bufio.NewScanner(pipe)
                scanner.Split(bufio.ScanLines)
@@ -254,11 +243,8 @@ func scanOutputPipe(pipe io.ReadCloser, wg 
*sync.WaitGroup, onReceive func([]byt
                        src := scanner.Bytes()
                        data := make([]byte, len(src))
                        copy(data, src)
-                       if result, err := onReceive(data); err != nil {
-                               outboundChannel <- &ProcessResponse{err: err}
-                       } else {
-                               outboundChannel <- responseCreator(result)
-                       }
+                       onReceive(data)
+                       outboundChannel <- responseCreator(data)
                }
                wg.Done()
        }
diff --git a/backend/helpers/pluginhelper/tap/singer_tap_impl.go 
b/backend/helpers/pluginhelper/tap/singer_tap_impl.go
index b86856b74..fde5ada78 100644
--- a/backend/helpers/pluginhelper/tap/singer_tap_impl.go
+++ b/backend/helpers/pluginhelper/tap/singer_tap_impl.go
@@ -21,12 +21,13 @@ import (
        "bufio"
        "context"
        "encoding/json"
+       "os"
+       "path/filepath"
+
        "github.com/apache/incubator-devlake/core/config"
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/utils"
        "github.com/mitchellh/hashstructure"
-       "os"
-       "path/filepath"
 )
 
 const singerPropertiesDir = "TAP_PROPERTIES_DIR"
@@ -137,16 +138,7 @@ func (t *SingerTap) Run(ctx context.Context) (<-chan 
*Response, errors.Error) {
                t.propertiesFile.path,
                ifElse(t.stateFile.path != "", "--state "+t.stateFile.path, ""),
        )
-       rawStream, err := utils.StreamProcess(cmd, &utils.StreamProcessOptions{
-               OnStdout: func(b []byte) (any, errors.Error) {
-                       var output Output[json.RawMessage]
-                       output, err := NewSingerTapOutput(b)
-                       if err != nil {
-                               return nil, err
-                       }
-                       return output, nil //data is expected to be JSON
-               },
-       })
+       rawStream, err := utils.StreamProcess(cmd, nil)
        if err != nil {
                return nil, errors.Default.Wrap(err, "error starting process 
stream from singer-tap")
        }
@@ -159,7 +151,11 @@ func (t *SingerTap) Run(ctx context.Context) (<-chan 
*Response, errors.Error) {
                        }
                        out := result.GetStdout()
                        if out != nil {
-                               stream <- &Response{Out: 
out.(Output[json.RawMessage])}
+                               tap_output, err := NewSingerTapOutput(out)
+                               if err != nil {
+                                       stream <- &Response{Err: err}
+                               }
+                               stream <- &Response{Out: tap_output}
                        }
                }
        }()
diff --git a/backend/python/plugins/azure/README.md 
b/backend/python/plugins/azuredevops/README.md
similarity index 100%
rename from backend/python/plugins/azure/README.md
rename to backend/python/plugins/azuredevops/README.md
diff --git a/backend/python/plugins/azure/azure/__init__.py 
b/backend/python/plugins/azuredevops/azuredevops/__init__.py
similarity index 100%
rename from backend/python/plugins/azure/azure/__init__.py
rename to backend/python/plugins/azuredevops/azuredevops/__init__.py
diff --git a/backend/python/plugins/azure/azure/api.py 
b/backend/python/plugins/azuredevops/azuredevops/api.py
similarity index 80%
rename from backend/python/plugins/azure/azure/api.py
rename to backend/python/plugins/azuredevops/azuredevops/api.py
index 67e8fee39..825f9b392 100644
--- a/backend/python/plugins/azure/azure/api.py
+++ b/backend/python/plugins/azuredevops/azuredevops/api.py
@@ -59,23 +59,13 @@ class AzureDevOpsAPI(API):
         req = Request('https://app.vssps.visualstudio.com/_apis/accounts', 
query_args={"memberId": member_id})
         return self.send(req)
 
-    def orgs(self) -> list[str]:
-        response = self.accounts()
-        return [acct["AccountName"] for acct in response.json]
-
     def projects(self, org: str):
         return self.get(org, '_apis/projects')
 
-    # Get a project
-    def project(self, org: str, project: str):
-        return self.get(org, '_apis/projects', project)
-
-    # List repos under an org
     def git_repos(self, org: str, project: str):
         return self.get(org, project, '_apis/git/repositories')
 
     def git_repo_pull_requests(self, org: str, project: str, repo_id: str):
-        # see 
https://learn.microsoft.com/en-us/rest/api/azure/devops/git/pull-requests/get-pull-requests?view=azure-devops-rest-7.1&tabs=HTTP
         return self.get(org, project, '_apis/git/repositories', repo_id, 
'pullrequests')
 
     def git_repo_pull_request_commits(self, org: str, project: str, repo_id: 
str, pull_request_id: int):
@@ -84,20 +74,11 @@ class AzureDevOpsAPI(API):
     def git_repo_pull_request_comments(self, org: str, project: str, repo_id: 
str, pull_request_id: int):
         return self.get(org, project, '_apis/git/repositories', repo_id, 
'pullRequests', pull_request_id, 'threads')
 
-    # not needed
     def commits(self, org: str, project: str, repo_id: str):
         return self.get(org, project, '_apis/git/repositories', repo_id, 
'commits')
 
-    def builds(self, org: str, project: str):
-        return self.get(org, project, '_apis/build/builds')
+    def builds(self, org: str, project: str, repository_id: str, provider: 
str):
+        return self.get(org, project, '_apis/build/builds', 
repositoryId=repository_id, repositoryType=provider)
 
     def jobs(self, org: str, project: str, build_id: int):
         return self.get(org, project, '_apis/build/builds', build_id, 
'timeline')
-
-    # unused
-    def deployments(self, org: str, project: str):
-        return self.get(org, project, '_apis/release/deployments')
-
-    # unused
-    def releases(self, org: str, project: str):
-        return self.get(org, project, '_apis/release/releases')
diff --git a/backend/python/plugins/azure/azure/helper/db.py 
b/backend/python/plugins/azuredevops/azuredevops/helper/db.py
similarity index 100%
rename from backend/python/plugins/azure/azure/helper/db.py
rename to backend/python/plugins/azuredevops/azuredevops/helper/db.py
diff --git a/backend/python/plugins/azure/azure/main.py 
b/backend/python/plugins/azuredevops/azuredevops/main.py
similarity index 88%
rename from backend/python/plugins/azure/azure/main.py
rename to backend/python/plugins/azuredevops/azuredevops/main.py
index 86a2644df..ec03ef345 100644
--- a/backend/python/plugins/azure/azure/main.py
+++ b/backend/python/plugins/azuredevops/azuredevops/main.py
@@ -13,13 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from azure.api import AzureDevOpsAPI
-from azure.models import AzureDevOpsConnection, GitRepository
-from azure.streams.builds import Builds
-from azure.streams.commits import GitCommits
-from azure.streams.jobs import Jobs
-from azure.streams.pull_request_commits import GitPullRequestCommits
-from azure.streams.pull_requests import GitPullRequests
+from azuredevops.api import AzureDevOpsAPI
+from azuredevops.models import AzureDevOpsConnection, GitRepository
+from azuredevops.streams.builds import Builds
+from azuredevops.streams.commits import GitCommits
+from azuredevops.streams.jobs import Jobs
+from azuredevops.streams.pull_request_commits import GitPullRequestCommits
+from azuredevops.streams.pull_requests import GitPullRequests
 
 from pydevlake import Plugin, RemoteScopeGroup
 from pydevlake.domain_layer.code import Repo
@@ -73,10 +73,6 @@ class AzureDevOpsPlugin(Plugin):
                 repo.parentRepositoryUrl = raw_repo["parentRepository"]["url"]
             yield repo
 
-    @property
-    def name(self) -> str:
-        return "azure"
-
     def test_connection(self, connection: AzureDevOpsConnection):
         resp = AzureDevOpsAPI(connection.base_url, 
connection.pat).projects(connection.org)
         if resp.status != 200:
diff --git a/backend/python/plugins/azure/azure/models.py 
b/backend/python/plugins/azuredevops/azuredevops/models.py
similarity index 100%
rename from backend/python/plugins/azure/azure/models.py
rename to backend/python/plugins/azuredevops/azuredevops/models.py
diff --git a/backend/python/plugins/azure/azure/streams/builds.py 
b/backend/python/plugins/azuredevops/azuredevops/streams/builds.py
similarity index 78%
rename from backend/python/plugins/azure/azure/streams/builds.py
rename to backend/python/plugins/azuredevops/azuredevops/streams/builds.py
index c47a6597f..3996e4be9 100644
--- a/backend/python/plugins/azure/azure/streams/builds.py
+++ b/backend/python/plugins/azuredevops/azuredevops/streams/builds.py
@@ -13,15 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import typing
 from typing import Iterable
 
 import iso8601 as iso8601
 
-from azure.api import AzureDevOpsAPI
-from azure.helper import db
-from azure.models import AzureDevOpsConnection, GitRepository
-from azure.models import Build
+from azuredevops.api import AzureDevOpsAPI
+from azuredevops.models import AzureDevOpsConnection, GitRepository
+from azuredevops.models import Build
 from pydevlake import Context, DomainType, Stream, logger
 import pydevlake.domain_layer.devops as devops
 
@@ -33,12 +31,10 @@ class Builds(Stream):
     def collect(self, state, context) -> Iterable[tuple[object, dict]]:
         connection: AzureDevOpsConnection = context.connection
         repo: GitRepository = context.scope
-        azure_api = AzureDevOpsAPI(connection.base_url, connection.pat)
-        cached_repos = dict()
-        response = azure_api.builds(repo.org_id, repo.project_id)
+        azuredevops_api = AzureDevOpsAPI(connection.base_url, connection.pat)
+        response = azuredevops_api.builds(repo.org_id, repo.project_id, 
repo.id, repo.provider)
         for raw_build in response:
-            if self.validate_repo(context, raw_build, cached_repos):
-                yield raw_build, state
+            yield raw_build, state
 
     def extract(self, raw_data: dict) -> Build:
         build: Build = self.tool_model(**raw_data)
@@ -112,17 +108,3 @@ class Builds(Stream):
             repo_id=b.repo_id,
             repo=repo_url,
         )
-
-    # workaround because azure also returns builds for unmanaged repos (we 
don't want them)
-    @classmethod
-    def validate_repo(cls, context: Context, raw_build: dict, cached_repos: 
typing.Dict[str, GitRepository]) -> bool:
-        repo_id = raw_build["repository"]["id"]
-        if repo_id not in cached_repos:
-            repo: GitRepository = db.get(context, GitRepository, 
GitRepository.id == repo_id)
-            if repo is None:
-                logger.warn(f"no Azure repo associated with {repo_id}")
-            cached_repos[repo_id] = repo
-        if cached_repos[repo_id] is None:
-            return False
-        raw_build["repository"]["url"] = cached_repos[repo_id].url
-        return True
diff --git a/backend/python/plugins/azure/azure/streams/commits.py 
b/backend/python/plugins/azuredevops/azuredevops/streams/commits.py
similarity index 92%
rename from backend/python/plugins/azure/azure/streams/commits.py
rename to backend/python/plugins/azuredevops/azuredevops/streams/commits.py
index 088344300..3e02ef2f2 100644
--- a/backend/python/plugins/azure/azure/streams/commits.py
+++ b/backend/python/plugins/azuredevops/azuredevops/streams/commits.py
@@ -17,8 +17,8 @@ from typing import Iterable
 
 import iso8601 as iso8601
 
-from azure.api import AzureDevOpsAPI
-from azure.models import GitRepository, GitCommit
+from azuredevops.api import AzureDevOpsAPI
+from azuredevops.models import GitRepository, GitCommit
 from pydevlake import Stream, DomainType, Context
 from pydevlake.domain_layer.code import Commit as DomainCommit
 from pydevlake.domain_layer.code import RepoCommit as DomainRepoCommit
@@ -31,8 +31,8 @@ class GitCommits(Stream):
     def collect(self, state, context) -> Iterable[tuple[object, dict]]:
         connection = context.connection
         repo: GitRepository = context.scope
-        azure_api = AzureDevOpsAPI(connection.base_url, connection.pat)
-        response = azure_api.commits(repo.org_id, repo.project_id, repo.id)
+        azuredevops_api = AzureDevOpsAPI(connection.base_url, connection.pat)
+        response = azuredevops_api.commits(repo.org_id, repo.project_id, 
repo.id)
         for raw_commit in response:
             raw_commit["repo_id"] = repo.id
             yield raw_commit, state
diff --git a/backend/python/plugins/azure/azure/streams/jobs.py 
b/backend/python/plugins/azuredevops/azuredevops/streams/jobs.py
similarity index 89%
rename from backend/python/plugins/azure/azure/streams/jobs.py
rename to backend/python/plugins/azuredevops/azuredevops/streams/jobs.py
index b6f11c410..ef6997f7a 100644
--- a/backend/python/plugins/azure/azure/streams/jobs.py
+++ b/backend/python/plugins/azuredevops/azuredevops/streams/jobs.py
@@ -15,9 +15,9 @@
 
 from typing import Iterable
 
-from azure.api import AzureDevOpsAPI
-from azure.models import AzureDevOpsConnection, Job, Build, GitRepository
-from azure.streams.builds import Builds
+from azuredevops.api import AzureDevOpsAPI
+from azuredevops.models import AzureDevOpsConnection, Job, Build, GitRepository
+from azuredevops.streams.builds import Builds
 from pydevlake import Context, Substream, DomainType
 import pydevlake.domain_layer.devops as devops
 
@@ -30,8 +30,8 @@ class Jobs(Substream):
     def collect(self, state, context, parent: Build) -> Iterable[tuple[object, 
dict]]:
         connection: AzureDevOpsConnection = context.connection
         repo: GitRepository = context.scope
-        azure_api = AzureDevOpsAPI(connection.base_url, connection.pat)
-        response = azure_api.jobs(repo.org_id, repo.project_id, parent.id)
+        azuredevops_api = AzureDevOpsAPI(connection.base_url, connection.pat)
+        response = azuredevops_api.jobs(repo.org_id, repo.project_id, 
parent.id)
         if response.status != 200:
             yield None, state
         else:
diff --git a/backend/python/plugins/azure/azure/streams/pull_request_commits.py 
b/backend/python/plugins/azuredevops/azuredevops/streams/pull_request_commits.py
similarity index 79%
rename from backend/python/plugins/azure/azure/streams/pull_request_commits.py
rename to 
backend/python/plugins/azuredevops/azuredevops/streams/pull_request_commits.py
index 661fa7f4c..560833c9b 100644
--- a/backend/python/plugins/azure/azure/streams/pull_request_commits.py
+++ 
b/backend/python/plugins/azuredevops/azuredevops/streams/pull_request_commits.py
@@ -15,10 +15,10 @@
 
 from typing import Iterable
 
-from azure.api import AzureDevOpsAPI
-from azure.models import GitPullRequest, GitCommit, GitRepository
-from azure.streams.commits import extract_raw_commit
-from azure.streams.pull_requests import GitPullRequests
+from azuredevops.api import AzureDevOpsAPI
+from azuredevops.models import GitPullRequest, GitCommit, GitRepository
+from azuredevops.streams.commits import extract_raw_commit
+from azuredevops.streams.pull_requests import GitPullRequests
 from pydevlake import Substream, DomainType
 from pydevlake.domain_layer.code import PullRequestCommit as 
DomainPullRequestCommit
 
@@ -31,8 +31,8 @@ class GitPullRequestCommits(Substream):
     def collect(self, state, context, parent: GitPullRequest) -> 
Iterable[tuple[object, dict]]:
         connection = context.connection
         repo: GitRepository = context.scope
-        azure_api = AzureDevOpsAPI(connection.base_url, connection.pat)
-        response = azure_api.git_repo_pull_request_commits(repo.org_id, 
repo.project_id, parent.repo_id, parent.id)
+        azuredevops_api = AzureDevOpsAPI(connection.base_url, connection.pat)
+        response = azuredevops_api.git_repo_pull_request_commits(repo.org_id, 
repo.project_id, parent.repo_id, parent.id)
         for raw_commit in response:
             raw_commit["repo_id"] = parent.repo_id
             yield raw_commit, state
diff --git a/backend/python/plugins/azure/azure/streams/pull_requests.py 
b/backend/python/plugins/azuredevops/azuredevops/streams/pull_requests.py
similarity index 92%
rename from backend/python/plugins/azure/azure/streams/pull_requests.py
rename to 
backend/python/plugins/azuredevops/azuredevops/streams/pull_requests.py
index 5c2384ee8..1267de21d 100644
--- a/backend/python/plugins/azure/azure/streams/pull_requests.py
+++ b/backend/python/plugins/azuredevops/azuredevops/streams/pull_requests.py
@@ -18,9 +18,9 @@ from typing import Iterable
 
 import iso8601 as iso8601
 
-from azure.api import AzureDevOpsAPI
-from azure.helper import db
-from azure.models import GitRepository, GitPullRequest, GitCommit
+from azuredevops.api import AzureDevOpsAPI
+from azuredevops.helper import db
+from azuredevops.models import GitRepository, GitPullRequest, GitCommit
 from pydevlake import Stream, Context, DomainType
 from pydevlake.domain_layer.code import PullRequest as DomainPullRequest
 
@@ -31,9 +31,9 @@ class GitPullRequests(Stream):
 
     def collect(self, state, context) -> Iterable[tuple[object, dict]]:
         connection = context.connection
-        azure_api = AzureDevOpsAPI(connection.base_url, connection.pat)
+        azuredevops_api = AzureDevOpsAPI(connection.base_url, connection.pat)
         repo: GitRepository = context.scope
-        response = azure_api.git_repo_pull_requests(repo.org_id, 
repo.project_id, repo.id)
+        response = azuredevops_api.git_repo_pull_requests(repo.org_id, 
repo.project_id, repo.id)
         for raw_pr in response:
             yield raw_pr, state
 
diff --git a/backend/python/plugins/azure/poetry.lock 
b/backend/python/plugins/azuredevops/poetry.lock
similarity index 100%
rename from backend/python/plugins/azure/poetry.lock
rename to backend/python/plugins/azuredevops/poetry.lock
diff --git a/backend/python/plugins/azure/pyproject.toml 
b/backend/python/plugins/azuredevops/pyproject.toml
similarity index 98%
rename from backend/python/plugins/azure/pyproject.toml
rename to backend/python/plugins/azuredevops/pyproject.toml
index 97fe442ec..ad160b9b7 100644
--- a/backend/python/plugins/azure/pyproject.toml
+++ b/backend/python/plugins/azuredevops/pyproject.toml
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 [tool.poetry]
-name = "azure"
+name = "azuredevops"
 version = "0.1.0"
 description = ""
 authors = ["Hezheng Yin <[email protected]>"]
diff --git a/backend/python/plugins/azure/tests/test_streams.py 
b/backend/python/plugins/azuredevops/tests/test_streams.py
similarity index 99%
rename from backend/python/plugins/azure/tests/test_streams.py
rename to backend/python/plugins/azuredevops/tests/test_streams.py
index 467d0e76a..a56ba1bfa 100644
--- a/backend/python/plugins/azure/tests/test_streams.py
+++ b/backend/python/plugins/azuredevops/tests/test_streams.py
@@ -19,7 +19,7 @@ from pydevlake.testing import assert_convert
 import pydevlake.domain_layer.code as code
 import pydevlake.domain_layer.devops as devops
 
-from azure.main import AzureDevOpsPlugin
+from azuredevops.main import AzureDevOpsPlugin
 
 
 def test_builds_stream():
diff --git a/backend/python/pydevlake/pydevlake/ipc.py 
b/backend/python/pydevlake/pydevlake/ipc.py
index a105d5047..b55ce1147 100644
--- a/backend/python/pydevlake/pydevlake/ipc.py
+++ b/backend/python/pydevlake/pydevlake/ipc.py
@@ -29,7 +29,7 @@ def plugin_method(func):
 
     def send_output(send_ch: TextIO, obj: object):
         if not isinstance(obj, Message):
-            return
+            raise Exception(f"Not a message: {obj}")
         send_ch.write(obj.json(exclude_unset=True))
         send_ch.write('\n')
         send_ch.flush()
@@ -84,9 +84,9 @@ class PluginCommands:
         return self._plugin.plugin_info()
 
     @plugin_method
-    def remote_scopes(self, connection: dict, group_id: Optional[str]):
+    def remote_scopes(self, connection: dict, group_id: Optional[str] = None):
         c = self._plugin.connection_type(**connection)
-        return self._plugin.remote_scopes(c, group_id)
+        return self._plugin.make_remote_scopes(c, group_id)
 
     def startup(self, endpoint: str):
         self._plugin.startup(endpoint)
diff --git a/backend/python/pydevlake/pydevlake/message.py 
b/backend/python/pydevlake/pydevlake/message.py
index 43eef850f..32f70ac22 100644
--- a/backend/python/pydevlake/pydevlake/message.py
+++ b/backend/python/pydevlake/pydevlake/message.py
@@ -107,3 +107,7 @@ class RemoteScopeGroup(RemoteScopeTreeNode):
 class RemoteScope(RemoteScopeTreeNode):
     type: str = Field("scope", const=True)
     scope: ToolScope
+
+
+class RemoteScopes(Message):
+    __root__: list[RemoteScopeTreeNode]
diff --git a/backend/python/pydevlake/pydevlake/plugin.py 
b/backend/python/pydevlake/pydevlake/plugin.py
index 14af2000a..2b1bc4acf 100644
--- a/backend/python/pydevlake/pydevlake/plugin.py
+++ b/backend/python/pydevlake/pydevlake/plugin.py
@@ -104,9 +104,9 @@ class Plugin(ABC):
         # TODO: Create tables
         pass
 
-    def make_remote_scopes(self, connection: Connection, group_id: 
Optional[str]) -> list[msg.RemoteScopeTreeNode]:
+    def make_remote_scopes(self, connection: Connection, group_id: 
Optional[str] = None) -> msg.RemoteScopes:
         if group_id:
-            return [
+            scopes = [
                 msg.RemoteScope(
                     id=tool_scope.id,
                     name=tool_scope.name,
@@ -116,7 +116,8 @@ class Plugin(ABC):
                 in self.remote_scopes(connection, group_id)
             ]
         else:
-            return self.remote_scope_groups(connection)
+            scopes = self.remote_scope_groups(connection)
+        return msg.RemoteScopes(__root__=scopes)
 
     def make_pipeline(self, tool_scopes: list[ToolScope], connection_id: int):
         """
diff --git a/backend/python/pydevlake/pydevlake/subtasks.py 
b/backend/python/pydevlake/pydevlake/subtasks.py
index e6ff57fa0..23e3f1077 100644
--- a/backend/python/pydevlake/pydevlake/subtasks.py
+++ b/backend/python/pydevlake/pydevlake/subtasks.py
@@ -70,7 +70,7 @@ class Subtask:
                             current=i
                         )
             except Exception as e:
-                logger.error(e)
+                logger.error(f'{type(e).__name__}: {e}')
 
             subtask_run.state = json.dumps(state)
             subtask_run.completed = datetime.now()
diff --git a/backend/server/services/remote/bridge/bridge.go 
b/backend/server/services/remote/bridge/bridge.go
index 2d80685dd..d526494d8 100644
--- a/backend/server/services/remote/bridge/bridge.go
+++ b/backend/server/services/remote/bridge/bridge.go
@@ -45,8 +45,8 @@ func (b *Bridge) RemoteSubtaskEntrypointHandler(subtaskMeta 
models.SubtaskMeta)
                }
                stream := b.invoker.Stream(subtaskMeta.EntryPointName, 
NewChildRemoteContext(ctx), args...)
                for recv := range stream.Receive() {
-                       if recv.err != nil {
-                               return recv.err
+                       if recv.Err != nil {
+                               return recv.Err
                        }
                        progress := RemoteProgress{}
                        err := recv.Get(&progress)
diff --git a/backend/server/services/remote/bridge/cmd.go 
b/backend/server/services/remote/bridge/cmd.go
index b3a14371a..edcb5ccd6 100644
--- a/backend/server/services/remote/bridge/cmd.go
+++ b/backend/server/services/remote/bridge/cmd.go
@@ -18,6 +18,7 @@ limitations under the License.
 package bridge
 
 import (
+       "encoding/json"
        "fmt"
        "os/exec"
 
@@ -44,7 +45,7 @@ func (c *CmdInvoker) Call(methodName string, ctx 
plugin.ExecContext, args ...any
        serializedArgs, err := serialize(args...)
        if err != nil {
                return &CallResult{
-                       err: err,
+                       Err: err,
                }
        }
        executable, inputArgs := c.resolveCmd(methodName, serializedArgs...)
@@ -70,11 +71,10 @@ func (c *CmdInvoker) Call(methodName string, ctx 
plugin.ExecContext, args ...any
        err = response.GetError()
        if err != nil {
                return &CallResult{
-                       err: errors.Default.Wrap(err, fmt.Sprintf("failed to 
invoke remote function \"%s\"", methodName)),
+                       Err: errors.Default.Wrap(err, fmt.Sprintf("failed to 
invoke remote function \"%s\"", methodName)),
                }
        }
-       results, err := deserialize(response.GetFdOut().([]byte))
-       return NewCallResult(results, errors.Convert(err))
+       return NewCallResult(response.GetFdOut(), nil)
 }
 
 func (c *CmdInvoker) Stream(methodName string, ctx plugin.ExecContext, args 
...any) *MethodStream {
@@ -95,18 +95,15 @@ func (c *CmdInvoker) Stream(methodName string, ctx 
plugin.ExecContext, args ...a
                cmd.Dir = c.workingPath
        }
        processHandle, err := utils.StreamProcess(cmd, 
&utils.StreamProcessOptions{
-               OnStdout: func(b []byte) (any, errors.Error) {
+               OnStdout: func(b []byte) {
                        msg := string(b)
                        c.logRemoteMessage(ctx.GetLogger(), msg)
-                       return b, nil
                },
-               OnStderr: func(b []byte) (any, errors.Error) {
+               OnStderr: func(b []byte) {
                        msg := string(b)
                        c.logRemoteError(ctx.GetLogger(), msg)
-                       return b, nil
                },
                UseFdOut: true,
-               OnFdOut:  utils.NoopConverter,
        })
        if err != nil {
                recvChannel <- NewStreamResult(nil, err)
@@ -133,18 +130,25 @@ func (c *CmdInvoker) Stream(methodName string, ctx 
plugin.ExecContext, args ...a
                        }
                        response := msg.GetFdOut()
                        if response != nil {
-                               results, err := deserialize(response.([]byte))
-                               if err != nil {
-                                       recvChannel <- NewStreamResult(nil, err)
-                               } else {
-                                       recvChannel <- NewStreamResult(results, 
nil)
-                               }
+                               recvChannel <- NewStreamResult(response, nil)
                        }
                }
        }()
        return stream
 }
 
+func serialize(args ...any) ([]string, errors.Error) {
+       var serializedArgs []string
+       for _, arg := range args {
+               serializedArg, err := json.Marshal(arg)
+               if err != nil {
+                       return nil, errors.Convert(err)
+               }
+               serializedArgs = append(serializedArgs, string(serializedArg))
+       }
+       return serializedArgs, nil
+}
+
 func (c *CmdInvoker) logRemoteMessage(logger log.Logger, msg string) {
        logger.Info(msg)
 }
diff --git a/backend/server/services/remote/bridge/returns.go 
b/backend/server/services/remote/bridge/returns.go
index b7aa46fb6..44eed2c0c 100644
--- a/backend/server/services/remote/bridge/returns.go
+++ b/backend/server/services/remote/bridge/returns.go
@@ -18,14 +18,15 @@ limitations under the License.
 package bridge
 
 import (
+       "encoding/json"
+
        "github.com/apache/incubator-devlake/core/errors"
-       "github.com/mitchellh/mapstructure"
 )
 
 type (
        CallResult struct {
-               results []map[string]any
-               err     errors.Error
+               Results []byte
+               Err     errors.Error
        }
        StreamResult = CallResult
        MethodStream struct {
@@ -36,50 +37,28 @@ type (
        }
 )
 
-func NewCallResult(results []map[string]any, err errors.Error) *CallResult {
+func NewCallResult(results []byte, err errors.Error) *CallResult {
        return &CallResult{
-               results: results,
-               err:     err,
+               Results: results,
+               Err:     err,
        }
 }
 
-func (m *CallResult) Get(targets ...any) errors.Error {
-       if m.err != nil {
-               return m.err
-       }
-       if len(targets) != len(m.results) {
-               // if everything came back as nil, consider it good
-               for _, result := range m.results {
-                       if result != nil {
-                               return errors.Default.New("unequal results and 
targets length")
-                       }
-               }
-               return nil
+func (m *CallResult) Get(target any) errors.Error {
+       if m.Err != nil {
+               return m.Err
        }
-
-       for i, target := range targets {
-               config := &mapstructure.DecoderConfig{
-                       TagName: "json",
-                       Result:  target,
-               }
-
-               decoder, err := mapstructure.NewDecoder(config)
-               if err != nil {
-                       return errors.Convert(err)
-               }
-
-               err = decoder.Decode(m.results[i])
-               if err != nil {
-                       return errors.Convert(err)
-               }
+       err := json.Unmarshal(m.Results, &target)
+       if err != nil {
+               return errors.Convert(err)
        }
        return nil
 }
 
-func NewStreamResult(results []map[string]any, err errors.Error) *StreamResult 
{
+func NewStreamResult(results []byte, err errors.Error) *StreamResult {
        return &StreamResult{
-               results: results,
-               err:     err,
+               Results: results,
+               Err:     err,
        }
 }
 
diff --git a/backend/server/services/remote/bridge/utils.go 
b/backend/server/services/remote/bridge/utils.go
deleted file mode 100644
index 542074e87..000000000
--- a/backend/server/services/remote/bridge/utils.go
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package bridge
-
-import (
-       "encoding/json"
-
-       "github.com/apache/incubator-devlake/core/errors"
-)
-
-func serialize(args ...any) ([]string, errors.Error) {
-       var serializedArgs []string
-       for _, arg := range args {
-               serializedArg, err := json.Marshal(arg)
-               if err != nil {
-                       return nil, errors.Convert(err)
-               }
-               serializedArgs = append(serializedArgs, string(serializedArg))
-       }
-       return serializedArgs, nil
-}
-
-func deserialize(bytes json.RawMessage) ([]map[string]any, errors.Error) {
-       if len(bytes) == 0 {
-               return nil, nil
-       }
-       var result []map[string]any
-       if bytes[0] == '{' {
-               single := make(map[string]any)
-               if err := json.Unmarshal(bytes, &single); err != nil {
-                       return nil, errors.Convert(err)
-               }
-               result = append(result, single)
-       } else if bytes[0] == '[' {
-               if err := json.Unmarshal(bytes, &result); err != nil {
-                       return nil, errors.Convert(err)
-               }
-       } else {
-               return nil, errors.Default.New("malformed JSON from remote 
call")
-       }
-       return result, nil
-}
diff --git a/backend/server/services/remote/plugin/connection_api.go 
b/backend/server/services/remote/plugin/connection_api.go
index 7be545cba..a5db952fb 100644
--- a/backend/server/services/remote/plugin/connection_api.go
+++ b/backend/server/services/remote/plugin/connection_api.go
@@ -26,7 +26,7 @@ import (
 )
 
 func (pa *pluginAPI) TestConnection(input *plugin.ApiResourceInput) 
(*plugin.ApiResourceOutput, errors.Error) {
-       err := pa.invoker.Call("test-connection", bridge.DefaultContext, 
input.Body).Get()
+       err := pa.invoker.Call("test-connection", bridge.DefaultContext, 
input.Body).Err
        if err != nil {
                return nil, err
        }
diff --git a/backend/server/services/remote/plugin/plugin_impl.go 
b/backend/server/services/remote/plugin/plugin_impl.go
index e22b32817..fb8feb3b4 100644
--- a/backend/server/services/remote/plugin/plugin_impl.go
+++ b/backend/server/services/remote/plugin/plugin_impl.go
@@ -174,17 +174,17 @@ func (p *remotePluginImpl) RunMigrations(forceMigrate 
bool) errors.Error {
        if err != nil {
                return err
        }
+       err = api.CallDB(basicRes.GetDal().AutoMigrate, p.scopeTabler.New())
+       if err != nil {
+               return err
+       }
        if p.transformationRuleTabler != nil {
                err = api.CallDB(basicRes.GetDal().AutoMigrate, 
p.transformationRuleTabler.New())
                if err != nil {
                        return err
                }
        }
-       err = api.CallDB(basicRes.GetDal().AutoMigrate, p.scopeTabler.New())
-       if err != nil {
-               return err
-       }
-       err = p.invoker.Call("run-migrations", bridge.DefaultContext, 
forceMigrate).Get()
+       err = p.invoker.Call("run-migrations", bridge.DefaultContext, 
forceMigrate).Err
        return err
 }
 
diff --git a/backend/server/services/remote/plugin/remote_scope_api.go 
b/backend/server/services/remote/plugin/remote_scope_api.go
index 341e0d52d..0e5485739 100644
--- a/backend/server/services/remote/plugin/remote_scope_api.go
+++ b/backend/server/services/remote/plugin/remote_scope_api.go
@@ -49,8 +49,10 @@ func (pa *pluginAPI) GetRemoteScopes(input 
*plugin.ApiResourceInput) (*plugin.Ap
                return nil, err
        }
 
-       var remoteScopes []RemoteScopesTreeNode
-       err = pa.invoker.Call("remote-scopes", bridge.DefaultContext, 
connection.Unwrap()).Get(&remoteScopes)
+       groupId := input.Query.Get("groupId")
+
+       remoteScopes := make([]RemoteScopesTreeNode, 0)
+       err = pa.invoker.Call("remote-scopes", bridge.DefaultContext, 
connection.Unwrap(), groupId).Get(&remoteScopes)
        if err != nil {
                return nil, err
        }
diff --git a/backend/test/helper/api.go b/backend/test/helper/api.go
index c751f51f6..1c46ebb56 100644
--- a/backend/test/helper/api.go
+++ b/backend/test/helper/api.go
@@ -21,6 +21,7 @@ import (
        "fmt"
        "net/http"
        "reflect"
+       "strings"
        "time"
 
        "github.com/apache/incubator-devlake/core/models"
@@ -179,17 +180,27 @@ func (d *DevlakeClient) ListTransformRules(pluginName 
string) []any {
 }
 
 func (d *DevlakeClient) RemoteScopes(query RemoteScopesQuery) 
RemoteScopesOutput {
-       return sendHttpRequest[RemoteScopesOutput](d.testCtx, d.timeout, 
debugInfo{
-               print:      true,
-               inlineJson: false,
-       }, http.MethodGet, 
fmt.Sprintf("%s/plugins/%s/connections/%d/remote-scopes?groupId=%s?pageToken=%s&%s",
+       url := fmt.Sprintf("%s/plugins/%s/connections/%d/remote-scopes",
                d.Endpoint,
                query.PluginName,
                query.ConnectionId,
-               query.GroupId,
-               query.PageToken,
-               mapToQueryString(query.Params)),
-               nil)
+       )
+       if query.Params == nil {
+               query.Params = make(map[string]string)
+       }
+       if query.GroupId != "" {
+               query.Params["groupId"] = query.GroupId
+       }
+       if query.PageToken != "" {
+               query.Params["pageToken"] = query.PageToken
+       }
+       if len(query.Params) > 0 {
+               url = url + "?" + mapToQueryString(query.Params)
+       }
+       return sendHttpRequest[RemoteScopesOutput](d.testCtx, d.timeout, 
debugInfo{
+               print:      true,
+               inlineJson: false,
+       }, http.MethodGet, url, nil)
 }
 
 // SearchRemoteScopes makes calls to the "scope API" indirectly. "Search" is 
the remote endpoint to hit.
@@ -252,11 +263,11 @@ func (d *DevlakeClient) RunPipeline(pipeline 
models.NewPipeline) models.Pipeline
 }
 
 func mapToQueryString(queryParams map[string]string) string {
-       q := ""
+       params := make([]string, 0)
        for k, v := range queryParams {
-               q = q + "&" + k + "=" + v
+               params = append(params, k+"="+v)
        }
-       return q
+       return strings.Join(params, "&")
 }
 
 // MonitorPipeline FIXME
diff --git a/backend/test/remote/fakeplugin/fakeplugin/main.py 
b/backend/test/remote/fakeplugin/fakeplugin/main.py
index 7130530f9..45d0c4c6b 100644
--- a/backend/test/remote/fakeplugin/fakeplugin/main.py
+++ b/backend/test/remote/fakeplugin/fakeplugin/main.py
@@ -24,7 +24,6 @@ from pydevlake.domain_layer.devops import CicdScope, 
CICDPipeline
 
 
 VALID_TOKEN = "this_is_a_valid_token"
-VALID_PROJECT = "this_is_a_valid_project"
 
 
 class FakePipeline(ToolModel, table=True):
@@ -35,7 +34,6 @@ class FakePipeline(ToolModel, table=True):
         SUCCESS = "success"
 
     id: str = Field(primary_key=True)
-    project: str
     started_at: Optional[datetime]
     finished_at: Optional[datetime]
     state: State
@@ -46,16 +44,14 @@ class FakeStream(Stream):
     domain_types = [DomainType.CICD]
 
     fake_pipelines = [
-        FakePipeline(id=1, project=VALID_PROJECT, 
state=FakePipeline.State.SUCCESS, started_at=datetime(2023, 1, 10, 11, 0, 0), 
finished_at=datetime(2023, 1, 10, 11, 3, 0)),
-        FakePipeline(id=2, project=VALID_PROJECT, 
state=FakePipeline.State.FAILURE, started_at=datetime(2023, 1, 10, 12, 0, 0), 
finished_at=datetime(2023, 1, 10, 12, 1, 30)),
-        FakePipeline(id=1, project=VALID_PROJECT, 
state=FakePipeline.State.PENDING),
+        FakePipeline(id=1, state=FakePipeline.State.SUCCESS, 
started_at=datetime(2023, 1, 10, 11, 0, 0), finished_at=datetime(2023, 1, 10, 
11, 3, 0)),
+        FakePipeline(id=2, state=FakePipeline.State.FAILURE, 
started_at=datetime(2023, 1, 10, 12, 0, 0), finished_at=datetime(2023, 1, 10, 
12, 1, 30)),
+        FakePipeline(id=1, state=FakePipeline.State.PENDING),
     ]
 
     def collect(self, state, context):
-        project = context.options['project']
-        if project == VALID_PROJECT:
-            for p in self.fake_pipelines:
-                yield dict(p)
+        for p in self.fake_pipelines:
+            yield p.json(), {}
 
     def convert(self, pipeline: FakePipeline, ctx):
         if ctx.transformationRule:
diff --git a/backend/test/remote/remote_test.go 
b/backend/test/remote/remote_test.go
index fae7079c6..599ab9235 100644
--- a/backend/test/remote/remote_test.go
+++ b/backend/test/remote/remote_test.go
@@ -95,7 +95,13 @@ func connectLocalServer(t *testing.T) *helper.DevlakeClient {
        return client
 }
 
-func CreateTestConnection(client *helper.DevlakeClient) *helper.Connection {
+func createClient(t *testing.T) *helper.DevlakeClient {
+       setupEnv()
+       buildPython(t)
+       return connectLocalServer(t)
+}
+
+func createTestConnection(client *helper.DevlakeClient) *helper.Connection {
        connection := client.CreateConnection(PLUGIN_NAME,
                FakePluginConnection{
                        Name:  "Test connection",
@@ -107,7 +113,7 @@ func CreateTestConnection(client *helper.DevlakeClient) 
*helper.Connection {
        return connection
 }
 
-func CreateTestScope(client *helper.DevlakeClient, connectionId uint64) any {
+func createTestScope(client *helper.DevlakeClient, connectionId uint64) any {
        res := client.CreateTransformRule(PLUGIN_NAME, FakeTxRule{Name: "Tx 
rule", Env: "test env"})
        rule, ok := res.(map[string]interface{})
        if !ok {
@@ -130,36 +136,63 @@ func CreateTestScope(client *helper.DevlakeClient, 
connectionId uint64) any {
 }
 
 func TestCreateConnection(t *testing.T) {
-       setupEnv()
-       buildPython(t)
-       client := connectLocalServer(t)
+       client := createClient(t)
 
-       CreateTestConnection(client)
+       createTestConnection(client)
 
        conns := client.ListConnections(PLUGIN_NAME)
        require.Equal(t, 1, len(conns))
        require.Equal(t, TOKEN, conns[0].Token)
 }
 
+func TestRemoteScopeGroups(t *testing.T) {
+       client := createClient(t)
+       connection := createTestConnection(client)
+
+       output := client.RemoteScopes(helper.RemoteScopesQuery{
+               PluginName:   PLUGIN_NAME,
+               ConnectionId: connection.ID,
+       })
+
+       scopeGroups := output.Children
+       require.Equal(t, 1, len(scopeGroups))
+       scope := scopeGroups[0]
+       require.Equal(t, "Group 1", scope.Name)
+       require.Equal(t, "group1", scope.Id)
+}
+
+func TestRemoteScopes(t *testing.T) {
+       client := createClient(t)
+       connection := createTestConnection(client)
+
+       output := client.RemoteScopes(helper.RemoteScopesQuery{
+               PluginName:   PLUGIN_NAME,
+               ConnectionId: connection.ID,
+               GroupId:      "group1",
+       })
+
+       scopes := output.Children
+       require.Equal(t, 1, len(scopes))
+       scope := scopes[0]
+       require.Equal(t, "Project 1", scope.Name)
+       require.Equal(t, "p1", scope.Id)
+}
+
 func TestCreateScope(t *testing.T) {
-       setupEnv()
-       buildPython(t)
-       client := connectLocalServer(t)
+       client := createClient(t)
        var connectionId uint64 = 1
 
-       CreateTestScope(client, connectionId)
+       createTestScope(client, connectionId)
 
        scopes := client.ListScopes(PLUGIN_NAME, connectionId)
        require.Equal(t, 1, len(scopes))
 }
 
 func TestRunPipeline(t *testing.T) {
-       setupEnv()
-       buildPython(t)
-       client := connectLocalServer(t)
-       conn := CreateTestConnection(client)
+       client := createClient(t)
+       conn := createTestConnection(client)
 
-       CreateTestScope(client, conn.ID)
+       createTestScope(client, conn.ID)
 
        pipeline := client.RunPipeline(models.NewPipeline{
                Name: "remote_test",
@@ -183,17 +216,13 @@ func TestRunPipeline(t *testing.T) {
 }
 
 func TestBlueprintV200(t *testing.T) {
-       setupEnv()
-       buildPython(t)
-       client := connectLocalServer(t)
-       connection := CreateTestConnection(client)
+       client := createClient(t)
+       connection := createTestConnection(client)
        projectName := "Test project"
-
        client.CreateProject(&helper.ProjectConfig{
                ProjectName: projectName,
        })
-
-       CreateTestScope(client, connection.ID)
+       createTestScope(client, connection.ID)
 
        blueprint := client.CreateBasicBlueprintV2(
                "Test blueprint",
@@ -218,6 +247,5 @@ func TestBlueprintV200(t *testing.T) {
 
        project := client.GetProject(projectName)
        require.Equal(t, blueprint.Name, project.Blueprint.Name)
-
        client.TriggerBlueprint(blueprint.ID)
 }


Reply via email to