This is an automated email from the ASF dual-hosted git repository.

hez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 40779ea43 Fix Azuredevops UI pipeline run  (#4831)
40779ea43 is described below

commit 40779ea43994808f68360e080d83b8a3e2aff6b3
Author: Camille Teruel <[email protected]>
AuthorDate: Mon Apr 3 18:11:10 2023 +0200

    Fix Azuredevops UI pipeline run  (#4831)
    
    * test: Fix skipped test
    
    * fix: Get most recent state from SubtaskRun
    
    Not oldest state.
    
    * fix: Limit extraction and conversion to same connection and scope
    
    * style: Fix type hints
    
    * fix: Fix JSON arg parsing
    
    * fix: Always use tfsgit source provider
    
    * feat: Show response body in APIException
    
    * fix: ToolModel should not inherit from Model
    
    The primary keys of a tool model are specific to each devops tool API.
    So ToolModel should not impose to have an int `id` primary key.
    
    * feat: Support domain models that don't inherit from DomainModel
    
    Some domain models have multiple primary keys, like CICDPipelineCommit, and 
don't inherit from DomainModel, but from NoPKModel like their counterpart do on 
go side.
    Convertor now accept to save such models but won't set their id to the 
domain id generated from the source tool model.
    
    * feat: Materialize entity types as DomainType enum values
    
    ... instead of strings.
    
    * fix: Make FakeProject url look correct
    
    * test: Improve scope tests
    
    * fix: Continue in case repository has n default branch
    
    * fix: Support no tx rule in make pipeline
    
    * fix: Add missing attributes in make-pipeline parameters
    
    * fix: Do not expose token in repo url
    
    Build repo url with token just for gitextractor task.
    
    * feat: Populate domain models raw data origin
    
    * fix: Remove user name from repo url
    
    This user name comes from the token but letting it in the url makes it non 
canonical.
    
    * fix: Unamarshalled dynamic scope misses id
    
    Use `json.Unmarshal` instead of `mapstructure.Decode` to unmarshal dynamic 
scope json data.
    
    * fix: Fix created_at and updated_at fields
    
    For some reason sqlalchemy `func.now()` randomly fails to set fields.
    Use `datetime.utcnow` instead.
    
    * fix: Improve cicd streams
    
    ---------
    
    Co-authored-by: Camille Teruel <[email protected]>
---
 .../python/plugins/azuredevops/azuredevops/main.py | 23 ++++++------
 .../plugins/azuredevops/azuredevops/models.py      |  4 +-
 .../azuredevops/azuredevops/streams/builds.py      | 28 ++++++++------
 .../azuredevops/azuredevops/streams/jobs.py        |  9 ++---
 .../plugins/azuredevops/tests/streams_test.py      | 42 +++++++++++----------
 backend/python/pydevlake/pydevlake/api.py          |  3 +-
 backend/python/pydevlake/pydevlake/context.py      |  6 +--
 backend/python/pydevlake/pydevlake/ipc.py          | 43 ++++++++++------------
 backend/python/pydevlake/pydevlake/message.py      |  2 +-
 backend/python/pydevlake/pydevlake/model.py        | 25 ++++++++-----
 backend/python/pydevlake/pydevlake/plugin.py       | 16 ++++----
 backend/python/pydevlake/pydevlake/subtasks.py     | 36 ++++++++----------
 .../python/pydevlake/pydevlake/testing/testing.py  |  4 ++
 backend/python/pydevlake/tests/stream_test.py      |  7 ++--
 backend/python/test/fakeplugin/fakeplugin/main.py  |  8 ++--
 .../remote/models/dynamic_domain_scopes.go         | 15 ++------
 backend/server/services/remote/models/models.go    |  4 +-
 .../services/remote/plugin/plugin_extensions.go    |  2 +-
 backend/test/e2e/remote/helper.go                  |  6 ++-
 backend/test/e2e/remote/python_plugin_test.go      | 18 ++++++++-
 20 files changed, 162 insertions(+), 139 deletions(-)

diff --git a/backend/python/plugins/azuredevops/azuredevops/main.py 
b/backend/python/plugins/azuredevops/azuredevops/main.py
index da81a34bf..17a27a815 100644
--- a/backend/python/plugins/azuredevops/azuredevops/main.py
+++ b/backend/python/plugins/azuredevops/azuredevops/main.py
@@ -78,14 +78,15 @@ class AzureDevOpsPlugin(Plugin):
         org, proj = group_id.split('/')
         api = AzureDevOpsAPI(connection)
         for raw_repo in api.git_repos(org, proj):
-            url = urlparse(raw_repo['remoteUrl'])
-            url = 
url._replace(netloc=f'{url.username}:{connection.token}@{url.hostname}')
-            raw_repo['url'] = url.geturl()
             raw_repo['project_id'] = proj
             raw_repo['org_id'] = org
+            # remove username from url
+            url = urlparse(raw_repo['remoteUrl'])
+            url = url._replace(netloc=url.hostname)
+            raw_repo['url'] = url.geturl()
             repo = GitRepository(**raw_repo)
             if not repo.defaultBranch:
-                return None
+                continue
             if "parentRepository" in raw_repo:
                 repo.parentRepositoryUrl = raw_repo["parentRepository"]["url"]
             yield repo
@@ -105,17 +106,17 @@ class AzureDevOpsPlugin(Plugin):
             except APIException as e:
                 raise Exception(f"Invalid token: {e}")
 
-    def extra_tasks(self, scope: GitRepository, tx_rule: 
AzureDevOpsTransformationRule, entity_types: list[str], connection: 
AzureDevOpsConnection):
+    def extra_tasks(self, scope: GitRepository, tx_rule: 
AzureDevOpsTransformationRule, entity_types: list[DomainType], connection: 
AzureDevOpsConnection):
         if DomainType.CODE in entity_types:
-            return [gitextractor(scope.url, scope.id, connection.proxy)]
-        else:
-            return []
+            url = urlparse(scope.remoteUrl)
+            url = 
url._replace(netloc=f'{url.username}:{connection.token}@{url.hostname}')
+            yield gitextractor(url.geturl(), scope.domain_id(), 
connection.proxy)
 
-    def extra_stages(self, scope_tx_rule_pairs: list[ScopeTxRulePair], 
entity_types: list[str], _):
+    def extra_stages(self, scope_tx_rule_pairs: list[ScopeTxRulePair], 
entity_types: list[DomainType], _):
         if DomainType.CODE in entity_types:
             for scope, tx_rule in scope_tx_rule_pairs:
-                options = tx_rule.refdiff_options if tx_rule else None
-                yield refdiff(scope.id, options)
+                options = tx_rule.refdiff if tx_rule else None
+                yield [refdiff(scope.id, options)]
 
     @property
     def streams(self):
diff --git a/backend/python/plugins/azuredevops/azuredevops/models.py 
b/backend/python/plugins/azuredevops/azuredevops/models.py
index 82b2dc134..c374754e7 100644
--- a/backend/python/plugins/azuredevops/azuredevops/models.py
+++ b/backend/python/plugins/azuredevops/azuredevops/models.py
@@ -147,6 +147,7 @@ class Build(ToolModel, table=True):
         Succeeded = "succeeded"
 
     id: int = Field(primary_key=True)
+    name: str
     project_id: str
     repo_id: str
     repo_type: str
@@ -187,8 +188,7 @@ class Job(ToolModel, table=True):
         SucceededWithIssues = "succeededWithIssues"
 
     id: str = Field(primary_key=True)
-    build_id: int
-    repo_id: str
+    build_id: str
     parentId: Optional[str]
     type: Optional[Type]
     name: str
diff --git a/backend/python/plugins/azuredevops/azuredevops/streams/builds.py 
b/backend/python/plugins/azuredevops/azuredevops/streams/builds.py
index a5b9b9424..966538a23 100644
--- a/backend/python/plugins/azuredevops/azuredevops/streams/builds.py
+++ b/backend/python/plugins/azuredevops/azuredevops/streams/builds.py
@@ -31,12 +31,13 @@ class Builds(Stream):
     def collect(self, state, context) -> Iterable[tuple[object, dict]]:
         repo: GitRepository = context.scope
         api = AzureDevOpsAPI(context.connection)
-        response = api.builds(repo.org_id, repo.project_id, repo.id, 
repo.provider)
+        response = api.builds(repo.org_id, repo.project_id, repo.id, 'tfsgit')
         for raw_build in response:
             yield raw_build, state
 
     def extract(self, raw_data: dict) -> Build:
         build: Build = self.tool_model(**raw_data)
+        build.name = raw_data["definition"]["name"]
         build.project_id = raw_data["project"]["id"]
         build.repo_id = raw_data["repository"]["id"]
         build.repo_type = raw_data["repository"]["type"]
@@ -75,26 +76,29 @@ class Builds(Stream):
             case Build.Status.Postponed:
                 status = devops.CICDStatus.IN_PROGRESS
 
+        type = devops.CICDType.BUILD
+        if ctx.transformation_rule and 
ctx.transformation_rule.deployment_pattern.search(b.name):
+            type = devops.CICDType.DEPLOYMENT
+        environment = devops.CICDEnvironment.TESTING
+        if ctx.transformation_rule and 
ctx.transformation_rule.production_pattern.search(b.name):
+            environment = devops.CICDEnvironment.PRODUCTION
+
         yield devops.CICDPipeline(
-            name=b.id,
+            name=b.name,
             status=status,
             created_date=b.start_time,
             finished_date=b.finish_time,
             result=result,
             duration_sec=abs(b.finish_time.second-b.start_time.second),
-            environment=devops.CICDEnvironment.PRODUCTION,
-            type=devops.CICDType.DEPLOYMENT,
-            cicd_scope_id=b.repo_id,
+            environment=environment,
+            type=type,
+            cicd_scope_id=ctx.scope.domain_id(),
         )
 
-        repo_url = None
-        if b.repo_type == 'GitHub':
-            repo_url = f'https://github.com/{b.repo_id}'
-
         yield devops.CiCDPipelineCommit(
-            pipeline_id=b.id,
+            pipeline_id=b.domain_id(),
             commit_sha=b.source_version,
             branch=b.source_branch,
-            repo_id=b.repo_id,
-            repo=repo_url,
+            repo_id=ctx.scope.domain_id(),
+            repo=ctx.scope.url,
         )
diff --git a/backend/python/plugins/azuredevops/azuredevops/streams/jobs.py 
b/backend/python/plugins/azuredevops/azuredevops/streams/jobs.py
index 980514312..c6d34a7fd 100644
--- a/backend/python/plugins/azuredevops/azuredevops/streams/jobs.py
+++ b/backend/python/plugins/azuredevops/azuredevops/streams/jobs.py
@@ -35,8 +35,7 @@ class Jobs(Substream):
             yield None, state
         else:
             for raw_job in response.json["records"]:
-                raw_job["build_id"] = parent.id
-                raw_job["repo_id"] = parent.repo_id
+                raw_job["build_id"] = parent.domain_id()
                 yield raw_job, state
 
 
@@ -66,10 +65,10 @@ class Jobs(Substream):
                 status = devops.CICDStatus.IN_PROGRESS
 
         type = devops.CICDType.BUILD
-        if ctx.transformation_rule.deployment_pattern.search(j.name):
+        if ctx.transformation_rule and 
ctx.transformation_rule.deployment_pattern.search(j.name):
             type = devops.CICDType.DEPLOYMENT
         environment = devops.CICDEnvironment.TESTING
-        if ctx.transformation_rule.production_pattern.search(j.name):
+        if ctx.transformation_rule and 
ctx.transformation_rule.production_pattern.search(j.name):
             environment = devops.CICDEnvironment.PRODUCTION
 
         yield devops.CICDTask(
@@ -83,5 +82,5 @@ class Jobs(Substream):
             type=type,
             duration_sec=abs(j.finishTime.second-j.startTime.second),
             environment=environment,
-            cicd_scope_id=j.repo_id
+            cicd_scope_id=ctx.scope.domain_id()
         )
diff --git a/backend/python/plugins/azuredevops/tests/streams_test.py 
b/backend/python/plugins/azuredevops/tests/streams_test.py
index f76584174..c4584abf9 100644
--- a/backend/python/plugins/azuredevops/tests/streams_test.py
+++ b/backend/python/plugins/azuredevops/tests/streams_test.py
@@ -22,7 +22,18 @@ import pydevlake.domain_layer.devops as devops
 from azuredevops.main import AzureDevOpsPlugin
 
 
-def test_builds_stream():
[email protected]
+def context():
+    return (
+        ContextBuilder(AzureDevOpsPlugin())
+        .with_connection(token='token')
+        .with_transformation_rule(deployment_pattern='deploy',
+                                  production_pattern='prod')
+        .with_scope('johndoe/test-repo', 
url='https://github.com/johndoe/test-repo')
+        .build()
+    )
+
+def test_builds_stream(context):
     raw = {
         'properties': {},
         'tags': [],
@@ -40,7 +51,7 @@ def test_builds_stream():
         'definition': {
             'drafts': [],
             'id': 5,
-            'name': 'johndoe.test-repo',
+            'name': 'deploy_to_prod',
             'url': 
'https://dev.azure.com/testorg/7a3fd40e-2aed-4fac-bac9-511bf1a70206/_apis/build/Definitions/5?revision=1',
             'uri': 'vstfs:///Build/Definition/5',
             'path': '\\',
@@ -114,7 +125,7 @@ def test_builds_stream():
 
     expected = [
         devops.CICDPipeline(
-            name=12,
+            name='deploy_to_prod',
             status=devops.CICDStatus.DONE,
             created_date='2023-02-25T06:22:32.8097789Z',
             finished_date='2023-02-25T06:23:04.0061884Z',
@@ -122,35 +133,28 @@ def test_builds_stream():
             duration_sec=28,
             environment=devops.CICDEnvironment.PRODUCTION,
             type=devops.CICDType.DEPLOYMENT,
-            cicd_scope_id='johndoe/test-repo'
+            cicd_scope_id=context.scope.domain_id()
         ),
         devops.CiCDPipelineCommit(
-            pipeline_id=12,
+            pipeline_id='azuredevops:Build:1:12',
             commit_sha='40c59264e73fc5e1a6cab192f1622d26b7bd5c2a',
             branch='refs/heads/main',
-            repo_id='johndoe/test-repo',
+            repo_id=context.scope.domain_id(),
             repo='https://github.com/johndoe/test-repo'
         )
     ]
 
-    assert_stream_convert(AzureDevOpsPlugin, 'builds', raw, expected)
+    assert_stream_convert(AzureDevOpsPlugin, 'builds', raw, expected, context)
 
 
-def test_jobs_stream():
-    ctx = (
-        ContextBuilder(AzureDevOpsPlugin)
-        .with_transformation_rule(deployment_pattern='deploy',
-                                  production_pattern='prod')
-        .build()
-    )
+def test_jobs_stream(context):
     raw = {
         'previousAttempts': [],
         'id': 'cfa20e98-6997-523c-4233-f0a7302c929f',
         'parentId': '9ecf18fe-987d-5811-7c63-300aecae35da',
         'type': 'Job',
         'name': 'deploy production',
-        'build_id': 12,  # Added by collector,
-        'repo_id': 'johndoe/test-repo',  # Added by collector,
+        'build_id': 'azuredevops:Build:1:12',  # Added by collector,
         'startTime': '2023-02-25T06:22:36.8066667Z',
         'finishTime': '2023-02-25T06:22:43.2333333Z',
         'currentOperation': None,
@@ -180,7 +184,7 @@ def test_jobs_stream():
     expected = devops.CICDTask(
         id='cfa20e98-6997-523c-4233-f0a7302c929f',
         name='deploy production',
-        pipeline_id=12,
+        pipeline_id='azuredevops:Build:1:12',
         status=devops.CICDStatus.DONE,
         created_date='2023-02-25T06:22:36.8066667Z',
         finished_date='2023-02-25T06:22:43.2333333Z',
@@ -188,9 +192,9 @@ def test_jobs_stream():
         type=devops.CICDType.DEPLOYMENT,
         duration_sec=7,
         environment=devops.CICDEnvironment.PRODUCTION,
-        cicd_scope_id='johndoe/test-repo'
+        cicd_scope_id=context.scope.domain_id()
     )
-    assert_stream_convert(AzureDevOpsPlugin, 'jobs', raw, expected, ctx)
+    assert_stream_convert(AzureDevOpsPlugin, 'jobs', raw, expected, context)
 
 
 def test_pull_requests_stream():
diff --git a/backend/python/pydevlake/pydevlake/api.py 
b/backend/python/pydevlake/pydevlake/api.py
index b7f642cf0..69deb5c18 100644
--- a/backend/python/pydevlake/pydevlake/api.py
+++ b/backend/python/pydevlake/pydevlake/api.py
@@ -289,7 +289,8 @@ class APIException(Exception):
         self.response = response
 
     def __str__(self):
-        return f'APIException: {self.response}'
+        body = self.response.body or 'no body'
+        return f'APIException: {self.response} body: {body}'
 
 
 class API(APIBase):
diff --git a/backend/python/pydevlake/pydevlake/context.py 
b/backend/python/pydevlake/pydevlake/context.py
index 42a1de377..660ff8847 100644
--- a/backend/python/pydevlake/pydevlake/context.py
+++ b/backend/python/pydevlake/pydevlake/context.py
@@ -19,13 +19,13 @@ from urllib.parse import urlparse, parse_qsl
 from sqlalchemy.engine import Engine
 from sqlmodel import SQLModel, create_engine
 
-from pydevlake.model import Connection, TransformationRule
+from pydevlake.model import Connection, TransformationRule, ToolScope
 
 
 class Context:
     def __init__(self,
                  db_url: str,
-                 scope: str,
+                 scope: ToolScope,
                  connection: Connection,
                  transformation_rule: TransformationRule = None,
                  options: dict = None):
@@ -51,7 +51,7 @@ class Context:
     def incremental(self) -> bool:
         return self.options.get('incremental') is True
 
-    def get_engine_db_url(self) -> [str, dict[str, any]]:
+    def get_engine_db_url(self) -> tuple[str, dict[str, any]]:
         db_url = self.db_url
         if not db_url:
             raise Exception("Missing db_url setting")
diff --git a/backend/python/pydevlake/pydevlake/ipc.py 
b/backend/python/pydevlake/pydevlake/ipc.py
index b75895ce1..7e5ccc56c 100644
--- a/backend/python/pydevlake/pydevlake/ipc.py
+++ b/backend/python/pydevlake/pydevlake/ipc.py
@@ -17,10 +17,13 @@
 import os
 import json
 from functools import wraps
-from typing import Generator, TextIO, Optional, Union
+from typing import Generator, TextIO, Optional
+
+from fire.decorators import SetParseFn
 
 from pydevlake.context import Context
 from pydevlake.message import Message
+from pydevlake.stream import DomainType
 
 
 def plugin_method(func):
@@ -36,7 +39,14 @@ def plugin_method(func):
         send_ch.write('\n')
         send_ch.flush()
 
+    def parse_arg(arg):
+        try:
+            return json.loads(arg)
+        except json.JSONDecodeError as e:
+            raise Exception(f"Invalid JSON {arg}: {e.msg}")
+
     @wraps(func)
+    @SetParseFn(parse_arg)
     def wrapper(self, *args):
         ret = func(self, *args)
         if ret is not None:
@@ -69,7 +79,6 @@ class PluginCommands:
 
     @plugin_method
     def test_connection(self, connection: dict):
-        connection = self._parse(connection)
         if "name" not in connection:
             connection["name"] = "Test connection"
         connection = self._plugin.connection_type(**connection)
@@ -77,15 +86,15 @@ class PluginCommands:
 
     @plugin_method
     def make_pipeline(self, scope_tx_rule_pairs: list[tuple[dict, dict]], 
entities: list[str], connection: dict):
-        connection = self._plugin.connection_type(**self._parse(connection))
-        entities = self._parse(entities)
+        connection = self._plugin.connection_type(**connection)
         scope_tx_rule_pairs = [
             (
-                self._plugin.tool_scope_type(**self._parse(raw_scope)),
-                
self._plugin.transformation_rule_type(**self._parse(raw_tx_rule)) if 
raw_tx_rule else None
+                self._plugin.tool_scope_type(**raw_scope),
+                self._plugin.transformation_rule_type(**raw_tx_rule) if 
raw_tx_rule else None
             )
             for raw_scope, raw_tx_rule in scope_tx_rule_pairs
         ]
+        entities = [DomainType(e) for e in entities]
         return self._plugin.make_pipeline(scope_tx_rule_pairs, entities, 
connection)
 
     @plugin_method
@@ -98,7 +107,6 @@ class PluginCommands:
 
     @plugin_method
     def remote_scopes(self, connection: dict, group_id: Optional[str] = None):
-        connection = self._parse(connection)
         c = self._plugin.connection_type(**connection)
         return self._plugin.make_remote_scopes(c, group_id)
 
@@ -106,26 +114,15 @@ class PluginCommands:
         self._plugin.startup(endpoint)
 
     def _mk_context(self, data: dict):
-        data = self._parse(data)
         db_url = data['db_url']
-        scope_dict = self._parse(data['scope'])
+        scope_dict = data['scope']
         scope = self._plugin.tool_scope_type(**scope_dict)
-        connection_dict = self._parse(data['connection'])
+        connection_dict = data['connection']
         connection = self._plugin.connection_type(**connection_dict)
-        if self._plugin.transformation_rule_type:
-            transformation_rule_dict = self._parse(data['transformation_rule'])
-            transformation_rule = 
self._plugin.transformation_rule_type(**transformation_rule_dict)
+        raw_tx_rule = data.get('transformation_rule')
+        if self._plugin.transformation_rule_type and raw_tx_rule:
+            transformation_rule = 
self._plugin.transformation_rule_type(**raw_tx_rule)
         else:
             transformation_rule = None
         options = data.get('options', {})
         return Context(db_url, scope, connection, transformation_rule, options)
-
-    def _parse(self, data: Union[str, dict, list]) -> Union[dict, list]:
-        if isinstance(data, (dict, list)):
-            return data
-        if isinstance(data, str):
-            try:
-                return json.loads(data)
-            except json.JSONDecodeError as e:
-                raise Exception(f"Invalid JSON: {e.msg}")
-        raise Exception(f"Invalid argument type: {type(data)}")
diff --git a/backend/python/pydevlake/pydevlake/message.py 
b/backend/python/pydevlake/pydevlake/message.py
index e3e72b912..a0d82eece 100644
--- a/backend/python/pydevlake/pydevlake/message.py
+++ b/backend/python/pydevlake/pydevlake/message.py
@@ -92,7 +92,7 @@ class PipelineTask(Message):
 
 class DynamicDomainScope(Message):
        type_name: str
-       data: dict
+       data: bytes
 
 
 class PipelineData(Message):
diff --git a/backend/python/pydevlake/pydevlake/model.py 
b/backend/python/pydevlake/pydevlake/model.py
index 7d49bcf67..e45dd893e 100644
--- a/backend/python/pydevlake/pydevlake/model.py
+++ b/backend/python/pydevlake/pydevlake/model.py
@@ -33,13 +33,13 @@ inflect_engine = inflect.engine()
 class Model(SQLModel):
     id: Optional[int] = Field(primary_key=True)
     created_at: Optional[datetime] = Field(
-        sa_column=Column(DateTime(), default=func.now())
+        sa_column=Column(DateTime(), default=datetime.utcnow)
     )
     updated_at: Optional[datetime] = Field(
-        sa_column=Column(DateTime(), default=func.now(), onupdate=func.now())
+        sa_column=Column(DateTime(), default=datetime.utcnow, 
onupdate=datetime.utcnow)
     )
 
-class ToolTable(Model):
+class ToolTable(SQLModel):
     @declared_attr
     def __tablename__(cls) -> str:
         plugin_name = _get_plugin_name(cls)
@@ -47,7 +47,7 @@ class ToolTable(Model):
         return f'_tool_{plugin_name}_{plural_entity}'
 
 
-class Connection(ToolTable):
+class Connection(ToolTable, Model):
     name: str
     proxy: Optional[AnyUrl]
 
@@ -58,7 +58,7 @@ class Connection(ToolTable):
         return proxy
 
 
-class TransformationRule(ToolTable):
+class TransformationRule(ToolTable, Model):
     name: str
 
 
@@ -79,18 +79,23 @@ class RawDataOrigin(SQLModel):
     raw_data_id: Optional[str] = 
Field(sa_column_kwargs={'name':'_raw_data_id'})
     raw_data_remark: Optional[str] = 
Field(sa_column_kwargs={'name':'_raw_data_remark'})
 
-    def set_origin(self, raw: RawModel):
+    def set_raw_origin(self, raw: RawModel):
         self.raw_data_id = raw.id
         self.raw_data_params = raw.params
         self.raw_data_table = raw.__tablename__
 
+    def set_tool_origin(self, tool_model: 'ToolModel'):
+        self.raw_data_id = tool_model.raw_data_id
+        self.raw_data_params = tool_model.raw_data_params
+        self.raw_data_table = tool_model.raw_data_table
+
 
 class NoPKModel(RawDataOrigin):
     created_at: Optional[datetime] = Field(
-        sa_column=Column(DateTime(), default=func.now())
+        sa_column=Column(DateTime(), default=datetime.utcnow)
     )
     updated_at: Optional[datetime] = Field(
-        sa_column=Column(DateTime(), default=func.now(), onupdate=func.now())
+        sa_column=Column(DateTime(), default=datetime.utcnow, 
onupdate=datetime.utcnow)
     )
 
 
@@ -103,10 +108,12 @@ class ToolModel(ToolTable, NoPKModel):
         originates from self.
         """
         model_type = type(self)
-        segments = [_get_plugin_name(model_type), model_type.__name__]
+        segments = [_get_plugin_name(model_type), model_type.__name__, 
str(self.connection_id)]
         mapper = inspect(model_type)
         for primary_key_column in mapper.primary_key:
             prop = mapper.get_property_by_column(primary_key_column)
+            if prop.key == 'connection_id':
+                continue
             attr_val = getattr(self, prop.key)
             segments.append(str(attr_val))
         return ':'.join(segments)
diff --git a/backend/python/pydevlake/pydevlake/plugin.py 
b/backend/python/pydevlake/pydevlake/plugin.py
index c227ad50b..f16ff94f3 100644
--- a/backend/python/pydevlake/pydevlake/plugin.py
+++ b/backend/python/pydevlake/pydevlake/plugin.py
@@ -25,7 +25,7 @@ from pydevlake.subtasks import Subtask
 from pydevlake.docgen import generate_doc
 from pydevlake.ipc import PluginCommands
 from pydevlake.context import Context
-from pydevlake.stream import Stream
+from pydevlake.stream import Stream, DomainType
 from pydevlake.model import ToolScope, DomainScope, Connection, 
TransformationRule
 
 
@@ -124,7 +124,7 @@ class Plugin(ABC):
         return msg.RemoteScopes(__root__=remote_scopes)
 
     def make_pipeline(self, scope_tx_rule_pairs: list[ScopeTxRulePair],
-                      entity_types: list[str], connection: Connection):
+                      entity_types: list[DomainType], connection: Connection):
         """
         Make a simple pipeline using the scopes declared by the plugin.
         """
@@ -136,7 +136,7 @@ class Plugin(ABC):
                 domain_scopes.append(
                     msg.DynamicDomainScope(
                         type_name=type(scope).__name__,
-                        data=scope.dict(exclude_unset=True)
+                        data=scope.json(exclude_unset=True)
                     )
                 )
         return msg.PipelineData(
@@ -145,7 +145,7 @@ class Plugin(ABC):
         )
 
     def make_pipeline_plan(self, scope_tx_rule_pairs: list[ScopeTxRulePair],
-                           entity_types: list[str], connection: Connection) -> 
list[list[msg.PipelineTask]]:
+                           entity_types: list[DomainType], connection: 
Connection) -> list[list[msg.PipelineTask]]:
         """
         Generate a pipeline plan with one stage per scope, plus optional 
additional stages.
         Redefine `extra_stages` to add stages at the end of this pipeline.
@@ -156,12 +156,12 @@ class Plugin(ABC):
         ]
 
     def extra_stages(self, scope_tx_rule_pairs: list[ScopeTxRulePair],
-                     entity_types: list[str], connection: Connection) -> 
list[list[msg.PipelineTask]]:
+                     entity_types: list[DomainType], connection: Connection) 
-> list[list[msg.PipelineTask]]:
         """Override this method to add extra stages to the pipeline plan"""
         return []
 
     def make_pipeline_stage(self, scope: ToolScope, tx_rule: 
Optional[TransformationRule],
-                            entity_types: list[str], connection: Connection) 
-> list[msg.PipelineTask]:
+                            entity_types: list[DomainType], connection: 
Connection) -> list[msg.PipelineTask]:
         """
         Generate a pipeline stage for the given scope, plus optional 
additional tasks.
         Subtasks are selected from `entity_types` via `select_subtasks`.
@@ -182,11 +182,11 @@ class Plugin(ABC):
         ]
 
     def extra_tasks(self, scope: ToolScope, tx_rule: 
Optional[TransformationRule],
-                    entity_types: list[str], connection: Connection) -> 
list[msg.PipelineTask]:
+                    entity_types: list[DomainType], connection: Connection) -> 
list[msg.PipelineTask]:
         """Override this method to add tasks to the given scope stage"""
         return []
 
-    def select_subtasks(self, scope: ToolScope, entity_types: list[str]) -> 
list[str]:
+    def select_subtasks(self, scope: ToolScope, entity_types: 
list[DomainType]) -> list[str]:
         """
         Returns the list of subtasks names that should be run for given scope 
and entity types.
         """
diff --git a/backend/python/pydevlake/pydevlake/subtasks.py 
b/backend/python/pydevlake/pydevlake/subtasks.py
index 15b84e92f..fb5b6c4fa 100644
--- a/backend/python/pydevlake/pydevlake/subtasks.py
+++ b/backend/python/pydevlake/pydevlake/subtasks.py
@@ -119,20 +119,22 @@ class Subtask:
             .where(SubtaskRun.subtask_name == self.name)
             .where(SubtaskRun.connection_id == connection_id)
             .where(SubtaskRun.completed != None)
-            .order_by(SubtaskRun.started)
+            .order_by(sql.desc(SubtaskRun.started))
         )
         subtask_run = session.exec(stmt).first()
         if subtask_run is not None:
             return json.loads(subtask_run.state)
         return {}
 
+    def _params(self, ctx: Context) -> str:
+        return json.dumps({
+            "connection_id": ctx.connection.id,
+            "scope_id": ctx.scope.id
+        })
 
 class SubtaskRun(SQLModel, table=True):
     """
     Table storing information about the execution of subtasks.
-
-    #TODO: rework id uniqueness:
-    # sync with Keon about the table he created for Singer MR
     """
     id: Optional[int] = Field(primary_key=True)
     subtask_name: str
@@ -158,12 +160,6 @@ class Collector(Subtask):
         )
         session.add(raw_model)
 
-    def _params(self, ctx: Context) -> str:
-        return json.dumps({
-            "connection_id": ctx.connection.id,
-            "scope_id": ctx.scope.id
-        })
-
     def delete(self, session, ctx):
         raw_model = self.stream.raw_model(session)
         stmt = sql.delete(raw_model).where(raw_model.params == 
self._params(ctx))
@@ -183,13 +179,13 @@ class Extractor(Subtask):
 
     def fetch(self, state: Dict, session: Session, ctx: Context) -> 
Iterable[Tuple[object, dict]]:
         raw_model = self.stream.raw_model(session)
-        # TODO: Should filter for same options?
-        for raw in session.query(raw_model).all():
+        query = session.query(raw_model).where(raw_model.params == 
self._params(ctx))
+        for raw in query.all():
             yield raw, state
 
     def process(self, raw: RawModel, session: Session, ctx: Context):
         tool_model = self.stream.extract(json.loads(raw.data))
-        tool_model.set_origin(raw)
+        tool_model.set_raw_origin(raw)
         tool_model.connection_id = ctx.connection.id
         session.merge(tool_model)
 
@@ -201,8 +197,10 @@ class Convertor(Subtask):
     def verb(self):
         return 'convert'
 
-    def fetch(self, state: Dict, session: Session, _) -> 
Iterable[Tuple[ToolModel, Dict]]:
-        for item in session.query(self.stream.tool_model).all():
+    def fetch(self, state: Dict, session: Session, ctx: Context) -> 
Iterable[Tuple[ToolModel, Dict]]:
+        model = self.stream.tool_model
+        query = session.query(model).where(model.raw_data_params == 
self._params(ctx))
+        for item in query.all():
             yield item, state
 
     def process(self, tool_model: ToolModel, session: Session, ctx: Context):
@@ -214,11 +212,9 @@ class Convertor(Subtask):
             self._save(tool_model, res, session, ctx.connection.id)
 
     def _save(self, tool_model: ToolModel, domain_model: DomainModel, session: 
Session, connection_id: int):
-        if not isinstance(domain_model, DomainModel):
-            logger.error(f'Expected a DomainModel but got a 
{type(domain_model)}: {domain_model}')
-            return
-
-        domain_model.id = tool_model.domain_id()
+        domain_model.set_tool_origin(tool_model)
+        if isinstance(domain_model, DomainModel):
+            domain_model.id = tool_model.domain_id()
         session.merge(domain_model)
 
     def delete(self, session, ctx):
diff --git a/backend/python/pydevlake/pydevlake/testing/testing.py 
b/backend/python/pydevlake/pydevlake/testing/testing.py
index 4c36ddb51..c690e95ed 100644
--- a/backend/python/pydevlake/pydevlake/testing/testing.py
+++ b/backend/python/pydevlake/pydevlake/testing/testing.py
@@ -39,6 +39,8 @@ class ContextBuilder:
 
     def with_scope(self, id='s', name='test_scope', **kwargs):
         self.scope = self.plugin.tool_scope_type(id=id, name=name, **kwargs)
+        if self.connection:
+            self.scope.connection_id = self.connection.id
         return self
 
     def with_transformation_rule(self, id=1, name='test_rule', **kwargs):
@@ -61,6 +63,8 @@ def assert_stream_convert(plugin: Union[Plugin, 
Type[Plugin]], stream_name: str,
         plugin = plugin()
     stream = plugin.get_stream(stream_name)
     tool_model = stream.extract(raw)
+    if ctx and ctx.connection:
+        tool_model.connection_id = ctx.connection.id
     domain_models = stream.convert(tool_model, ctx)
     if not isinstance(expected, list):
         expected = [expected]
diff --git a/backend/python/pydevlake/tests/stream_test.py 
b/backend/python/pydevlake/tests/stream_test.py
index 10195016a..6fe8f8669 100644
--- a/backend/python/pydevlake/tests/stream_test.py
+++ b/backend/python/pydevlake/tests/stream_test.py
@@ -105,6 +105,7 @@ def test_extract_data(stream, raw_data, ctx):
     with Session(ctx.engine) as session:
         for each in raw_data:
             raw_model = stream.raw_model(session)
+            raw_model.params = json.dumps({"connection_id": ctx.connection.id, 
"scope_id": ctx.scope.id})
             session.add(raw_model(data=json.dumps(each)))
         session.commit()
 
@@ -121,13 +122,13 @@ def test_extract_data(stream, raw_data, ctx):
     assert bob.id == 2
 
 
[email protected]  # TODO fix this test
 def test_convert_data(stream, raw_data, ctx):
     with Session(ctx.engine) as session:
         for each in raw_data:
             session.add(
                 DummyToolModel(
                     id=each["i"],
+                    connection_id=ctx.connection.id,
                     name=each["n"],
                     raw_data_table="_raw_dummy_model",
                     raw_data_params=json.dumps({"connection_id": 
ctx.connection.id, "scope_id": ctx.scope.id})
@@ -142,7 +143,7 @@ def test_convert_data(stream, raw_data, ctx):
     alice = tool_models[0]
     bob = tool_models[1]
     assert alice.Name == 'alice'
-    assert alice.id == 'test:DummyToolModel:11:1'
+    assert alice.id == 'tests:DummyToolModel:11:1'
 
     assert bob.Name == 'bob'
-    assert bob.id == 'test:DummyToolModel:11:2'
+    assert bob.id == 'tests:DummyToolModel:11:2'
diff --git a/backend/python/test/fakeplugin/fakeplugin/main.py 
b/backend/python/test/fakeplugin/fakeplugin/main.py
index ac6ac5308..a14c85d68 100644
--- a/backend/python/test/fakeplugin/fakeplugin/main.py
+++ b/backend/python/test/fakeplugin/fakeplugin/main.py
@@ -102,7 +102,7 @@ class FakeConnection(Connection):
 
 
 class FakeProject(ToolScope, table=True):
-    pass
+    url: str
 
 
 class FakeTransformationRule(TransformationRule):
@@ -123,10 +123,11 @@ class FakePlugin(Plugin):
         return FakeTransformationRule
 
     def domain_scopes(self, project: FakeProject):
+        project_name = "_".join(project.name.lower().split(" "))
         yield CicdScope(
             id=1,
             name=project.name,
-            url=f"http://fake.org/api/project/{project.name}";
+            url=f"http://fake.org/api/project/{project_name}";
         )
 
     def remote_scopes(self, connection: FakeConnection, group_id: str):
@@ -134,7 +135,8 @@ class FakePlugin(Plugin):
             return [
                 FakeProject(
                     id='p1',
-                    name='Project 1'
+                    name='Project 1',
+                    url='http://fake.org/api/project/p1'
                 )
             ]
         else:
diff --git a/backend/server/services/remote/models/dynamic_domain_scopes.go 
b/backend/server/services/remote/models/dynamic_domain_scopes.go
index 527c577c1..232c89ba7 100644
--- a/backend/server/services/remote/models/dynamic_domain_scopes.go
+++ b/backend/server/services/remote/models/dynamic_domain_scopes.go
@@ -18,10 +18,9 @@ limitations under the License.
 package models
 
 import (
+       "encoding/json"
        "fmt"
 
-       "github.com/mitchellh/mapstructure"
-
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/models/domainlayer/code"
        
"github.com/apache/incubator-devlake/core/models/domainlayer/codequality"
@@ -50,17 +49,9 @@ func (d DynamicDomainScope) Load() (plugin.Scope, 
errors.Error) {
        if type_err != nil {
                return nil, type_err
        }
-       config := &mapstructure.DecoderConfig{
-               TagName: "json",
-               Result:  scope,
-       }
-       decoder, err := mapstructure.NewDecoder(config)
-       if err != nil {
-               return nil, errors.Convert(err)
-       }
-       err = decoder.Decode(d.Data)
+       err := errors.Convert(json.Unmarshal([]byte(d.Data), &scope))
        if err != nil {
-               return nil, errors.Convert(err)
+               return nil, err
        }
        return scope, nil
 }
diff --git a/backend/server/services/remote/models/models.go 
b/backend/server/services/remote/models/models.go
index 85571a3ea..a12553ed6 100644
--- a/backend/server/services/remote/models/models.go
+++ b/backend/server/services/remote/models/models.go
@@ -87,8 +87,8 @@ type SubtaskMeta struct {
 }
 
 type DynamicDomainScope struct {
-       TypeName string                 `json:"type_name"`
-       Data     map[string]interface{} `json:"data"`
+       TypeName string `json:"type_name"`
+       Data     string `json:"data"`
 }
 
 type PipelineData struct {
diff --git a/backend/server/services/remote/plugin/plugin_extensions.go 
b/backend/server/services/remote/plugin/plugin_extensions.go
index caedce548..17c241a3a 100644
--- a/backend/server/services/remote/plugin/plugin_extensions.go
+++ b/backend/server/services/remote/plugin/plugin_extensions.go
@@ -65,7 +65,7 @@ func (p remoteDatasourcePlugin) 
MakeDataSourcePipelinePlanV200(connectionId uint
                if err != nil {
                        return nil, nil, err
                }
-               toolScopeTxRulePairs[i] = []interface{}{toolScope, txRule}
+               toolScopeTxRulePairs[i] = 
[]interface{}{wrappedToolScope.Unwrap(), txRule}
        }
 
        entities := bpScopes[0].Entities
diff --git a/backend/test/e2e/remote/helper.go 
b/backend/test/e2e/remote/helper.go
index 30586c0e6..e7521cc9c 100644
--- a/backend/test/e2e/remote/helper.go
+++ b/backend/test/e2e/remote/helper.go
@@ -44,6 +44,7 @@ type (
                Name                 string `json:"name"`
                ConnectionId         uint64 `json:"connectionId"`
                TransformationRuleId uint64 `json:"transformationRuleId"`
+               Url                  string `json:"url"`
        }
        FakeTxRule struct {
                Id   uint64 `json:"id"`
@@ -98,9 +99,10 @@ func CreateTestScope(client *helper.DevlakeClient, 
connectionId uint64) any {
        scope := client.CreateScope(PLUGIN_NAME,
                connectionId,
                FakeProject{
-                       Id:                   "12345",
-                       Name:                 "Test project",
+                       Id:                   "p1",
+                       Name:                 "Project 1",
                        ConnectionId:         connectionId,
+                       Url:                  "http://fake.org/api/project/p1";,
                        TransformationRuleId: ruleId,
                },
        )
diff --git a/backend/test/e2e/remote/python_plugin_test.go 
b/backend/test/e2e/remote/python_plugin_test.go
index 203149e85..d8d74e56e 100644
--- a/backend/test/e2e/remote/python_plugin_test.go
+++ b/backend/test/e2e/remote/python_plugin_test.go
@@ -73,6 +73,11 @@ func TestRemoteScopes(t *testing.T) {
        require.Equal(t, "group1", *scope.ParentId)
        require.Equal(t, "scope", scope.Type)
        require.NotNil(t, scope.Data)
+       data := scope.Data.(map[string]interface{})
+       require.Equal(t, float64(connection.ID), data["connectionId"])
+       require.Equal(t, "p1", data["id"])
+       require.Equal(t, "Project 1", data["name"])
+       require.Equal(t, "http://fake.org/api/project/p1";, data["url"])
 }
 
 func TestCreateScope(t *testing.T) {
@@ -83,6 +88,11 @@ func TestCreateScope(t *testing.T) {
 
        scopes := client.ListScopes(PLUGIN_NAME, connectionId)
        require.Equal(t, 1, len(scopes))
+       cicd_scope := scopes[0].(map[string]interface{})
+       require.Equal(t, float64(connectionId), cicd_scope["connectionId"])
+       require.Equal(t, "p1", cicd_scope["id"])
+       require.Equal(t, "Project 1", cicd_scope["name"])
+       require.Equal(t, "http://fake.org/api/project/p1";, cicd_scope["url"])
 }
 
 func TestRunPipeline(t *testing.T) {
@@ -100,7 +110,7 @@ func TestRunPipeline(t *testing.T) {
                                        Subtasks: nil,
                                        Options: map[string]interface{}{
                                                "connectionId": conn.ID,
-                                               "scopeId":      "12345",
+                                               "scopeId":      "p1",
                                        },
                                },
                        },
@@ -129,7 +139,7 @@ func TestBlueprintV200(t *testing.T) {
                                ConnectionId: connection.ID,
                                Scopes: []*plugin.BlueprintScopeV200{
                                        {
-                                               Id:   "12345",
+                                               Id:   "p1",
                                                Name: "Test scope",
                                                Entities: []string{
                                                        plugin.DOMAIN_TYPE_CICD,
@@ -142,6 +152,10 @@ func TestBlueprintV200(t *testing.T) {
                },
        )
 
+       plan, err := blueprint.UnmarshalPlan()
+       require.NoError(t, err)
+       _ = plan
+
        project := client.GetProject(projectName)
        require.Equal(t, blueprint.Name, project.Blueprint.Name)
        client.TriggerBlueprint(blueprint.ID)


Reply via email to