This is an automated email from the ASF dual-hosted git repository.

hez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 0d573063a feat(pyplugins): tx rule scope ap is py (#4438)
0d573063a is described below

commit 0d573063acfb1d3fe7649a2ff375badcec2a0dd5
Author: Camille Teruel <[email protected]>
AuthorDate: Sat Feb 18 00:11:14 2023 +0100

    feat(pyplugins): tx rule scope ap is py (#4438)
    
    * docs: Document hook execution order
    
    * fix: Add missing initialization of remote plugins
    
    * fix: Fix swagger body parameter for test connection endpoint
    
    * feat: Implement txRule and scope plugin APIs
    
    Provide a generic implementation for the transformation rule and scope APIs.
    
    * feat: Pass scopeId and transformation rule to python plugin
    
    ---------
    
    Co-authored-by: Camille Teruel <[email protected]>
---
 backend/core/runner/loader.go                      |  14 +-
 backend/python/pydevlake/README.md                 |   3 +
 backend/python/pydevlake/pydevlake/__init__.py     |   2 +-
 backend/python/pydevlake/pydevlake/context.py      |  14 +-
 .../python/pydevlake/pydevlake/doc.template.json   | 281 ++++++++++++++++++++-
 backend/python/pydevlake/pydevlake/docgen.py       |  12 +-
 backend/python/pydevlake/pydevlake/ipc.py          |   4 +-
 backend/python/pydevlake/pydevlake/message.py      |   5 +
 backend/python/pydevlake/pydevlake/plugin.py       |   7 +-
 backend/python/pydevlake/pydevlake/subtasks.py     |   2 +-
 backend/python/pydevlake/test/remote_test.go       |   3 +-
 backend/python/pydevlake/test/stream_test.py       |   5 +-
 backend/server/services/remote/models/models.go    |  17 +-
 .../server/services/remote/plugin/default_api.go   |  30 ++-
 backend/server/services/remote/plugin/init.go      |   3 +-
 .../server/services/remote/plugin/plugin_impl.go   |  77 ++++--
 backend/server/services/remote/plugin/scope_api.go | 222 ++++++++++++++++
 .../remote/plugin/transformation_rule_api.go       |  94 +++++++
 backend/test/remote/fakeplugin/fakeplugin/main.py  |   6 +-
 19 files changed, 747 insertions(+), 54 deletions(-)

diff --git a/backend/core/runner/loader.go b/backend/core/runner/loader.go
index 171e4c4f0..5c1a7c7a4 100644
--- a/backend/core/runner/loader.go
+++ b/backend/core/runner/loader.go
@@ -19,17 +19,25 @@ package runner
 
 import (
        "fmt"
-       "github.com/apache/incubator-devlake/core/context"
-       "github.com/apache/incubator-devlake/core/errors"
-       "github.com/apache/incubator-devlake/core/plugin"
        "io/fs"
        "path/filepath"
        goplugin "plugin"
+       "strconv"
        "strings"
+
+       "github.com/apache/incubator-devlake/core/context"
+       "github.com/apache/incubator-devlake/core/errors"
+       "github.com/apache/incubator-devlake/core/plugin"
+       "github.com/apache/incubator-devlake/server/services/remote"
 )
 
 // LoadPlugins load plugins from local directory
 func LoadPlugins(basicRes context.BasicRes) errors.Error {
+       remote_plugins_enabled, err := 
strconv.ParseBool(basicRes.GetConfig("ENABLE_REMOTE_PLUGINS"))
+       if err == nil && remote_plugins_enabled {
+               remote.Init(basicRes)
+       }
+
        pluginsDir := basicRes.GetConfig("PLUGIN_DIR")
        walkErr := filepath.WalkDir(pluginsDir, func(path string, d 
fs.DirEntry, err error) error {
                if err != nil {
diff --git a/backend/python/pydevlake/README.md 
b/backend/python/pydevlake/README.md
index ef675d307..fbfc976d2 100644
--- a/backend/python/pydevlake/README.md
+++ b/backend/python/pydevlake/README.md
@@ -223,6 +223,9 @@ class MyAPI(API):
 
 Here the method `authenticate` is a hook that is run on each request.
 Similarly you can declare response hooks with `@response_hook`.
+Multiple hooks are executed in the order of their declaration.
+The `API` base class declares some hooks that are executed first.
+
 
 #### Pagination
 
diff --git a/backend/python/pydevlake/pydevlake/__init__.py 
b/backend/python/pydevlake/pydevlake/__init__.py
index edabf45f0..d45d1ba7d 100644
--- a/backend/python/pydevlake/pydevlake/__init__.py
+++ b/backend/python/pydevlake/pydevlake/__init__.py
@@ -16,7 +16,7 @@
 
 from .model import ToolModel
 from .logger import logger
-from .message import Connection
+from .message import Connection, TransformationRule
 from .plugin import Plugin
 from .stream import Stream, Substream
 from .context import Context
diff --git a/backend/python/pydevlake/pydevlake/context.py 
b/backend/python/pydevlake/pydevlake/context.py
index a5c97b5ff..85eb9fb7c 100644
--- a/backend/python/pydevlake/pydevlake/context.py
+++ b/backend/python/pydevlake/pydevlake/context.py
@@ -17,15 +17,23 @@
 from urllib.parse import urlparse, parse_qsl
 from sqlmodel import SQLModel, create_engine
 
-from pydevlake.message import Connection
+from pydevlake.message import Connection, TransformationRule
 
 
 class Context:
-    def __init__(self, db_url: str, connection_id: int, connection: 
Connection, options: dict):
+    def __init__(self,
+                 db_url: str,
+                 scope_id: str,
+                 connection_id: int,
+                 connection: Connection,
+                 transformation_rule: TransformationRule = None,
+                 options: dict = None):
         self.db_url = db_url
+        self.scope_id = scope_id
         self.connection_id = connection_id
         self.connection = connection
-        self.options = options
+        self.transformation_rule = transformation_rule
+        self.options = options or {}
         self._engine = None
 
     @property
diff --git a/backend/python/pydevlake/pydevlake/doc.template.json 
b/backend/python/pydevlake/pydevlake/doc.template.json
index b152a5cf6..a565187cd 100644
--- a/backend/python/pydevlake/pydevlake/doc.template.json
+++ b/backend/python/pydevlake/pydevlake/doc.template.json
@@ -118,13 +118,16 @@
         "/plugins/$plugin_name/test": {
             "post": {
                 "description": "Test if a connection is valid",
-                "body": {
-                    "application/json": {
+                "parameters": [
+                    {
+                        "name": "connection",
+                        "required": true,
+                        "in": "body",
                         "schema": {
                             "$$ref": "#/components/schemas/connection"
                         }
                     }
-                }
+                ]
             },
             "response": {
                 "200": {
@@ -134,11 +137,245 @@
                     "description": "The connection is not valid"
                 }
             }
+        },
+        "/plugins/$plugin_name/connections/{connectionId}/scopes/{scopeId}": {
+            "get": {
+                "description": "Get a scope",
+                "parameters": [
+                    {
+                        "$$ref": "#/components/parameters/connectionId"
+                    },
+                    {
+                        "$$ref": "#/components/parameters/scopeId"
+                    }
+                ],
+                "responses": {
+                    "200": {
+                        "content": {
+                            "application/json": {
+                                "schema": {
+                                    "$$ref": "#/components/schemas/scope"
+                                }
+                            }
+                        }
+                    }
+                }
+            },
+            "patch": {
+                "description": "Update a scope",
+                "parameters": [
+                    {
+                        "$$ref": "#/components/parameters/connectionId"
+                    },
+                    {
+                        "$$ref": "#/components/parameters/scopeId"
+                    },
+                    {
+                        "name": "scope",
+                        "required": true,
+                        "in": "body",
+                        "schema": {
+                            "$$ref": "#/components/schemas/scope"
+                        }
+                    }
+                ],
+                "responses": {
+                    "200": {
+                        "content": {
+                            "application/json": {
+                                "schema": {
+                                    "$$ref": "#/components/schemas/scope"
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        },
+        "/plugins/$plugin_name/connections/{connectionId}/scopes": {
+            "get": {
+                "description": "Get all scopes",
+                "parameters": [
+                    {
+                        "$$ref": "#/components/parameters/pageSize"
+                    },
+                    {
+                        "$$ref": "#/components/parameters/page"
+                    }
+                ],
+                "responses": {
+                    "200": {
+                        "content": {
+                            "application/json": {
+                                "schema": {
+                                    "type": "array",
+                                    "items": {
+                                        "$$ref": "#/components/schemas/scope"
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            },
+            "put": {
+                "description": "Create a list of scopes",
+                "parameters": [
+                    {
+                        "name": "scopes",
+                        "required": true,
+                        "in": "body",
+                        "schema": {
+                            "type": "array",
+                            "items": {
+                                "$$ref": "#/components/schemas/scope"
+                            }
+                        }
+                    }
+                ],
+                "responses": {
+                    "200": {
+                        "content": {
+                            "application/json": {
+                                "schema": {
+                                    "type": "array",
+                                    "items": {
+                                        "$$ref": "#/components/schemas/scope"
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        },
+        "/plugins/$plugin_name/transformation_rules": {
+            "get": {
+                "description": "Get all transformation rules",
+                "parameters": [
+                    {
+                        "$$ref": "#/components/parameters/pageSize"
+                    },
+                    {
+                        "$$ref": "#/components/parameters/page"
+                    }
+                ],
+                "responses": {
+                    "200": {
+                        "content": {
+                            "application/json": {
+                                "schema": {
+                                    "type": "array",
+                                    "items": {
+                                        "$$ref": 
"#/components/schemas/transformationRule"
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            },
+            "post": {
+                "description": "Create a transformation rule",
+                "parameters": [
+                    {
+                        "name": "rules",
+                        "required": true,
+                        "in": "body",
+                        "schema": {
+                            "$$ref": "#/components/schemas/transformationRule"
+                        }
+                    }
+                ],
+                "responses": {
+                    "200": {
+                        "content": {
+                            "application/json": {
+                                "schema": {
+                                    "$$ref": 
"#/components/schemas/transformationRule"
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        },
+        "/plugins/$plugin_name/transformation_rules/{ruleId}": {
+            "get": {
+                "description": "Get a transformation rule",
+                "parameters": [
+                    {
+                        "$$ref": "#/components/parameters/ruleId"
+                    }
+                ],
+                "responses": {
+                    "200": {
+                        "content": {
+                            "application/json": {
+                                "schema": {
+                                    "$$ref": 
"#/components/schemas/transformationRule"
+                                }
+                            }
+                        }
+                    }
+                }
+            },
+            "patch": {
+                "description": "Update a transformation rule",
+                "parameters": [
+                    {
+                        "$$ref": "#/components/parameters/ruleId"
+                    },
+                    {
+                        "name": "rule",
+                        "required": true,
+                        "in": "body",
+                        "schema": {
+                            "$$ref": "#/components/schemas/transformationRule"
+                        }
+                    }
+                ],
+                "responses": {
+                    "200": {
+                        "content": {
+                            "application/json": {
+                                "schema": {
+                                    "$$ref": 
"#/components/schemas/transformationRule"
+                                }
+                            }
+                        }
+                    }
+                }
+            }
         }
     },
     "components": {
         "schemas": {
-            "connection": $connection_schema
+            "connection": $connection_schema,
+            "transformationRule": $transformation_rule_schema,
+            "scope": {
+                "title": "Scope",
+                "type": "object",
+                "properties": {
+                    "scopeId": {
+                        "title": "scope id",
+                        "type": "string"
+                    },
+                    "scopeName": {
+                        "title": "scope name",
+                        "type": "string"
+                    },
+                    "connectionId": {
+                        "title": "connection id",
+                        "type": "integer"
+                    },
+                    "transformationRuleId": {
+                        "title": "Transformation rule id",
+                        "type": "integer"
+                    }
+                },
+                "required": ["scopeId", "scopeName", "connectionId", 
"transformationRuleId"]
+            }
         },
         "parameters": {
             "connectionId": {
@@ -149,6 +386,42 @@
                 "schema": {
                     "type": "int"
                 }
+            },
+            "scopeId": {
+                "name": "scopeId",
+                "description": "Id of the scope",
+                "in": "path",
+                "required": true,
+                "schema": {
+                    "type": "string"
+                }
+            },
+            "ruleId": {
+                "name": "ruleId",
+                "description": "Id of the transformation rule",
+                "in": "path",
+                "required": true,
+                "schema": {
+                    "type": "int"
+                }
+            },
+            "pageSize": {
+                "name": "pageSize",
+                "description": "Number of items per page",
+                "in": "query",
+                "required": false,
+                "schema": {
+                    "type": "integer"
+                }
+            },
+            "page": {
+                "name": "page",
+                "description": "Page number",
+                "in": "query",
+                "required": false,
+                "schema": {
+                    "type": "integer"
+                }
             }
         }
     }
diff --git a/backend/python/pydevlake/pydevlake/docgen.py 
b/backend/python/pydevlake/pydevlake/docgen.py
index acce0a39f..9d91197cd 100644
--- a/backend/python/pydevlake/pydevlake/docgen.py
+++ b/backend/python/pydevlake/pydevlake/docgen.py
@@ -19,15 +19,21 @@ from pathlib import Path
 from string import Template
 import json
 
-from pydevlake.message import Connection
+from pydevlake.message import Connection, TransformationRule
 
 
 # TODO: Move swagger documentation generation to GO side along with API 
implementation
 TEMPLATE_PATH = str(Path(__file__).parent / 'doc.template.json')
 
-def generate_doc(plugin_name: str, connection_type: Type[Connection]):
+def generate_doc(plugin_name: str, 
+                 connection_type: Type[Connection], 
+                 transformation_rule_type: Type[TransformationRule]):
     with open(TEMPLATE_PATH, 'r') as f:
         doc_template = Template(f.read())
         connection_schema = connection_type.schema_json()
-        doc = doc_template.substitute(plugin_name=plugin_name, 
connection_schema=connection_schema)
+        transformation_rule_schema = transformation_rule_type.schema_json()
+        doc = doc_template.substitute(
+            plugin_name=plugin_name, 
+            connection_schema=connection_schema,
+            transformation_rule_schema=transformation_rule_schema)
         return json.loads(doc)
diff --git a/backend/python/pydevlake/pydevlake/ipc.py 
b/backend/python/pydevlake/pydevlake/ipc.py
index a457b9794..0b8d664e3 100644
--- a/backend/python/pydevlake/pydevlake/ipc.py
+++ b/backend/python/pydevlake/pydevlake/ipc.py
@@ -87,7 +87,9 @@ class PluginCommands:
 
     def _mk_context(self, data: dict):
         db_url = data['db_url']
+        scope_id = data['scope_id']
         connection_id = data['connection_id']
         connection = self._plugin.connection_type(**data['connection'])
+        transformation_rule = 
self._plugin.transformation_rule_type(**data['transformation_rule'])
         options = data.get('options', {})
-        return Context(db_url, connection_id, connection, options)
+        return Context(db_url, scope_id, connection_id, connection, 
transformation_rule, options)
diff --git a/backend/python/pydevlake/pydevlake/message.py 
b/backend/python/pydevlake/pydevlake/message.py
index de09167c4..aad4690d0 100644
--- a/backend/python/pydevlake/pydevlake/message.py
+++ b/backend/python/pydevlake/pydevlake/message.py
@@ -35,6 +35,7 @@ class PluginInfo(Message):
     name: str
     description: str
     connection_schema: dict
+    transformation_rule_schema: dict
     plugin_path: str
     subtask_metas: list[SubtaskMeta]
     extension: str = "datasource"
@@ -62,6 +63,10 @@ class Connection(Message):
     pass
 
 
+class TransformationRule(Message):
+    pass
+
+
 class PipelineTask(Message):
     plugin: str
     # Do not snake_case this attribute,
diff --git a/backend/python/pydevlake/pydevlake/plugin.py 
b/backend/python/pydevlake/pydevlake/plugin.py
index 0e3c0a166..3f81016d8 100644
--- a/backend/python/pydevlake/pydevlake/plugin.py
+++ b/backend/python/pydevlake/pydevlake/plugin.py
@@ -54,6 +54,10 @@ class Plugin:
     def connection_type(self) -> Type[msg.Connection]:
         pass
 
+    @property
+    def transformation_rule_type(self) -> Type[msg.TransformationRule]:
+        return msg.TransformationRule
+
     @abstractmethod
     def test_connection(self, connection: msg.Connection):
         """
@@ -133,7 +137,7 @@ class Plugin:
             swagger=msg.SwaggerDoc(
                 name=self.name,
                 resource=self.name,
-                spec=generate_doc(self.name, self.connection_type)
+                spec=generate_doc(self.name, self.connection_type, 
self.transformation_rule_type)
             )
         )
         resp = requests.post(f"{endpoint}/plugins/register", 
data=details.json())
@@ -160,6 +164,7 @@ class Plugin:
             plugin_path=self._plugin_path(),
             extension="datasource",
             connection_schema=self.connection_type.schema(),
+            transformation_rule_schema=self.transformation_rule_type.schema(),
             subtask_metas=subtask_metas
         )
 
diff --git a/backend/python/pydevlake/pydevlake/subtasks.py 
b/backend/python/pydevlake/pydevlake/subtasks.py
index 98591d331..343c29ac5 100644
--- a/backend/python/pydevlake/pydevlake/subtasks.py
+++ b/backend/python/pydevlake/pydevlake/subtasks.py
@@ -152,7 +152,7 @@ class Collector(Subtask):
     def _params(self, ctx: Context) -> str:
         return json.dumps({
             "connection_id": ctx.connection_id,
-            "scope_id": ctx.options['scopeId']
+            "scope_id": ctx.scope_id
         })
 
     def delete(self, session, ctx):
diff --git a/backend/python/pydevlake/test/remote_test.go 
b/backend/python/pydevlake/test/remote_test.go
index 9aaeec877..3c11ba971 100644
--- a/backend/python/pydevlake/test/remote_test.go
+++ b/backend/python/pydevlake/test/remote_test.go
@@ -64,10 +64,9 @@ func TestRunSubTask(t *testing.T) {
        dataflowTester := e2ehelper.NewDataFlowTester(t, "circleci", 
remotePlugin)
        subtask := remotePlugin.SubTaskMetas()[0]
        options := make(map[string]interface{})
-       options["project_slug"] = "gh/circleci/bond"
-       options["scopeId"] = "1"
        taskData := plg.RemotePluginTaskData{
                DbUrl:      bridge.DefaultContext.GetConfig("db_url"),
+               ScopeId:    "gh/circleci/bond",
                Connection: CircleCIConnection{ID: 1},
                Options:    options,
        }
diff --git a/backend/python/pydevlake/test/stream_test.py 
b/backend/python/pydevlake/test/stream_test.py
index db24465a2..234c7e79a 100644
--- a/backend/python/pydevlake/test/stream_test.py
+++ b/backend/python/pydevlake/test/stream_test.py
@@ -75,9 +75,10 @@ def connection(raw_data):
 def ctx(connection):
     return Context(
         db_url="sqlite+pysqlite:///:memory:",
+        scope_id="1",
         connection_id=11,
         connection=connection,
-        options={"scopeId": 1, "scopeName": "foo"}
+        options={}
     )
 
 
@@ -123,7 +124,7 @@ def test_convert_data(stream, raw_data, ctx):
                     id=each["i"],
                     name=each["n"],
                     raw_data_table="_raw_dummy_model",
-                    raw_data_params=json.dumps({"connection_id": 
ctx.connection_id, "scope_id": ctx.options.scopeId})
+                    raw_data_params=json.dumps({"connection_id": 
ctx.connection_id, "scope_id": ctx.scope_id})
                 )
             )
         session.commit()
diff --git a/backend/server/services/remote/models/models.go 
b/backend/server/services/remote/models/models.go
index d19a26fe7..c82783f32 100644
--- a/backend/server/services/remote/models/models.go
+++ b/backend/server/services/remote/models/models.go
@@ -31,14 +31,15 @@ type (
 )
 
 type PluginInfo struct {
-       Type             PluginType      `json:"type" validate:"required"`
-       Name             string          `json:"name" validate:"required"`
-       Extension        PluginExtension `json:"extension"`
-       ConnectionSchema map[string]any  `json:"connection_schema" 
validate:"required"`
-       Description      string          `json:"description"`
-       PluginPath       string          `json:"plugin_path" 
validate:"required"`
-       ApiEndpoints     []Endpoint      `json:"api_endpoints" validate:"dive"`
-       SubtaskMetas     []SubtaskMeta   `json:"subtask_metas" validate:"dive"`
+       Type                     PluginType      `json:"type" 
validate:"required"`
+       Name                     string          `json:"name" 
validate:"required"`
+       Extension                PluginExtension `json:"extension"`
+       ConnectionSchema         map[string]any  `json:"connection_schema" 
validate:"required"`
+       TransformationRuleSchema map[string]any  
`json:"transformation_rule_schema" validate:"required"`
+       Description              string          `json:"description"`
+       PluginPath               string          `json:"plugin_path" 
validate:"required"`
+       ApiEndpoints             []Endpoint      `json:"api_endpoints" 
validate:"dive"`
+       SubtaskMetas             []SubtaskMeta   `json:"subtask_metas" 
validate:"dive"`
 }
 
 type SubtaskMeta struct {
diff --git a/backend/server/services/remote/plugin/default_api.go 
b/backend/server/services/remote/plugin/default_api.go
index a2d500b28..b1fa1879c 100644
--- a/backend/server/services/remote/plugin/default_api.go
+++ b/backend/server/services/remote/plugin/default_api.go
@@ -24,12 +24,24 @@ import (
        "github.com/apache/incubator-devlake/server/services/remote/bridge"
 )
 
-func GetDefaultAPI(invoker bridge.Invoker, connType *models.DynamicTabler, 
helper *api.ConnectionApiHelper) 
map[string]map[string]plugin.ApiResourceHandler {
+func GetDefaultAPI(
+       invoker bridge.Invoker,
+       connType *models.DynamicTabler,
+       txRuleType *models.DynamicTabler,
+       helper *api.ConnectionApiHelper) 
map[string]map[string]plugin.ApiResourceHandler {
        connectionApi := &ConnectionAPI{
                invoker:  invoker,
                connType: connType,
                helper:   helper,
        }
+
+       scopeApi := &ScopeAPI{
+               txRuleType: txRuleType,
+       }
+       txruleApi := &TransformationRuleAPI{
+               txRuleType: txRuleType,
+       }
+
        return map[string]map[string]plugin.ApiResourceHandler{
                "test": {
                        "POST": connectionApi.TestConnection,
@@ -43,5 +55,21 @@ func GetDefaultAPI(invoker bridge.Invoker, connType 
*models.DynamicTabler, helpe
                        "PATCH":  connectionApi.PatchConnection,
                        "DELETE": connectionApi.DeleteConnection,
                },
+               "connections/:connectionId/scopes": {
+                       "PUT": scopeApi.PutScope,
+                       "GET": scopeApi.ListScopes,
+               },
+               "connections/:connectionId/scopes/*scopeId": {
+                       "GET":   scopeApi.GetScope,
+                       "PATCH": scopeApi.PatchScope,
+               },
+               "transformation_rules": {
+                       "POST": txruleApi.PostTransformationRules,
+                       "GET":  txruleApi.ListTransformationRules,
+               },
+               "transformation_rules/:id": {
+                       "GET":   txruleApi.GetTransformationRule,
+                       "PATCH": txruleApi.PatchTransformationRule,
+               },
        }
 }
diff --git a/backend/server/services/remote/plugin/init.go 
b/backend/server/services/remote/plugin/init.go
index 1ddd03001..676fa14d9 100644
--- a/backend/server/services/remote/plugin/init.go
+++ b/backend/server/services/remote/plugin/init.go
@@ -29,10 +29,11 @@ import (
 var (
        connectionHelper *api.ConnectionApiHelper
        basicRes         context.BasicRes
+       vld              *validator.Validate
 )
 
 func Init(br context.BasicRes) {
-       vld := validator.New()
+       vld = validator.New()
        basicRes = br
        connectionHelper = api.NewConnectionHelper(
                br,
diff --git a/backend/server/services/remote/plugin/plugin_impl.go 
b/backend/server/services/remote/plugin/plugin_impl.go
index 7e611b283..8b34b60d7 100644
--- a/backend/server/services/remote/plugin/plugin_impl.go
+++ b/backend/server/services/remote/plugin/plugin_impl.go
@@ -20,6 +20,7 @@ package plugin
 import (
        "fmt"
 
+       "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
        coreModels "github.com/apache/incubator-devlake/core/models"
        "github.com/apache/incubator-devlake/core/plugin"
@@ -30,35 +31,46 @@ import (
 
 type (
        remotePluginImpl struct {
-               name             string
-               subtaskMetas     []plugin.SubTaskMeta
-               pluginPath       string
-               description      string
-               invoker          bridge.Invoker
-               connectionTabler *coreModels.DynamicTabler
-               resources        map[string]map[string]plugin.ApiResourceHandler
+               name                     string
+               subtaskMetas             []plugin.SubTaskMeta
+               pluginPath               string
+               description              string
+               invoker                  bridge.Invoker
+               connectionTabler         *coreModels.DynamicTabler
+               transformationRuleTabler *coreModels.DynamicTabler
+               resources                
map[string]map[string]plugin.ApiResourceHandler
        }
        RemotePluginTaskData struct {
-               DbUrl        string                 `json:"db_url"`
-               ConnectionId uint64                 `json:"connection_id"`
-               Connection   interface{}            `json:"connection"`
-               Options      map[string]interface{} `json:"options"`
+               DbUrl              string                 `json:"db_url"`
+               ScopeId            string                 `json:"scope_id"`
+               ConnectionId       uint64                 `json:"connection_id"`
+               Connection         interface{}            `json:"connection"`
+               TransformationRule interface{}            
`json:"transformation_rule"`
+               Options            map[string]interface{} `json:"options"`
        }
 )
 
 func newPlugin(info *models.PluginInfo, invoker bridge.Invoker) 
(*remotePluginImpl, errors.Error) {
        connectionTableName := fmt.Sprintf("_tool_%s_connections", info.Name)
-       dynamicTabler, err := models.LoadTableModel(connectionTableName, 
info.ConnectionSchema)
+       connectionTabler, err := models.LoadTableModel(connectionTableName, 
info.ConnectionSchema)
        if err != nil {
                return nil, err
        }
+
+       txRuleTableName := fmt.Sprintf("_tool_%s_transformation_rules", 
info.Name)
+       txRuleTabler, err := models.LoadTableModel(txRuleTableName, 
info.TransformationRuleSchema)
+       if err != nil {
+               return nil, err
+       }
+
        p := remotePluginImpl{
-               name:             info.Name,
-               invoker:          invoker,
-               pluginPath:       info.PluginPath,
-               description:      info.Description,
-               connectionTabler: dynamicTabler,
-               resources:        GetDefaultAPI(invoker, dynamicTabler, 
connectionHelper),
+               name:                     info.Name,
+               invoker:                  invoker,
+               pluginPath:               info.PluginPath,
+               description:              info.Description,
+               connectionTabler:         connectionTabler,
+               transformationRuleTabler: txRuleTabler,
+               resources:                GetDefaultAPI(invoker, 
connectionTabler, txRuleTabler, connectionHelper),
        }
        remoteBridge := bridge.NewBridge(invoker)
        for _, subtask := range info.SubtaskMetas {
@@ -93,11 +105,27 @@ func (p *remotePluginImpl) PrepareTaskData(taskCtx 
plugin.TaskContext, options m
                return nil, errors.Convert(err)
        }
 
+       scopeId, ok := options["scopeId"].(string)
+       if !ok {
+               return nil, errors.BadInput.New("missing scopeId")
+       }
+
+       txRule := p.transformationRuleTabler.New()
+       txRuleId, ok := options["transformation_rule_id"].(uint64)
+       if ok {
+               db := taskCtx.GetDal()
+               err = db.First(&txRule, dal.Where("id = ?", txRuleId))
+               if err != nil {
+                       return nil, errors.BadInput.New("invalid transformation 
rule id")
+               }
+       }
+
        return RemotePluginTaskData{
-               DbUrl:        dbUrl,
-               ConnectionId: connectionId,
-               Connection:   connection.Unwrap(),
-               Options:      options,
+               DbUrl:              dbUrl,
+               ScopeId:            scopeId,
+               ConnectionId:       connectionId,
+               Connection:         connection.Unwrap(),
+               TransformationRule: txRule,
        }, nil
 }
 
@@ -123,6 +151,11 @@ func (p *remotePluginImpl) RunMigrations(forceMigrate 
bool) errors.Error {
                return err
        }
 
+       err = api.CallDB(basicRes.GetDal().AutoMigrate, 
p.transformationRuleTabler.New())
+       if err != nil {
+               return err
+       }
+
        err = p.invoker.Call("run-migrations", bridge.DefaultContext, 
forceMigrate).Get()
        return err
 }
diff --git a/backend/server/services/remote/plugin/scope_api.go 
b/backend/server/services/remote/plugin/scope_api.go
new file mode 100644
index 000000000..9bbdbaaf8
--- /dev/null
+++ b/backend/server/services/remote/plugin/scope_api.go
@@ -0,0 +1,222 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package plugin
+
+import (
+       "net/http"
+       "strconv"
+
+       "github.com/mitchellh/mapstructure"
+
+       "github.com/apache/incubator-devlake/core/dal"
+       "github.com/apache/incubator-devlake/core/errors"
+       "github.com/apache/incubator-devlake/core/models"
+       "github.com/apache/incubator-devlake/core/plugin"
+       "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+)
+
+type ScopeItem struct {
+       ScopeId              string `json:"scopeId"`
+       ScopeName            string `json:"scopeName"`
+       ConnectionId         uint64 `json:"connectionId"`
+       TransformationRuleId uint64 `json:"transformationRuleId,omitempty"`
+}
+
+// DTO that includes the transformation rule name
+type apiScopeResponse struct {
+       Scope                  ScopeItem
+       TransformationRuleName string `json:"transformationRuleId,omitempty"`
+}
+
+// Why a batch PUT?
+type request struct {
+       Data []*ScopeItem `json:"data"`
+}
+
+type ScopeAPI struct {
+       txRuleType *models.DynamicTabler
+}
+
+func (s *ScopeAPI) PutScope(input *plugin.ApiResourceInput) 
(*plugin.ApiResourceOutput, errors.Error) {
+       connectionId, _ := extractParam(input.Params)
+       if connectionId == 0 {
+               return nil, errors.BadInput.New("invalid connectionId")
+       }
+
+       var scopes request
+       err := errors.Convert(mapstructure.Decode(input.Body, &scopes))
+       if err != nil {
+               return nil, errors.BadInput.Wrap(err, "decoding scope error")
+       }
+
+       keeper := make(map[string]struct{})
+       for _, scope := range scopes.Data {
+               if _, ok := keeper[scope.ScopeId]; ok {
+                       return nil, errors.BadInput.New("duplicated item")
+               } else {
+                       keeper[scope.ScopeId] = struct{}{}
+               }
+               scope.ConnectionId = connectionId
+
+               err = verifyScope(scope)
+               if err != nil {
+                       return nil, err
+               }
+       }
+
+       err = basicRes.GetDal().CreateOrUpdate(scopes.Data)
+       if err != nil {
+               return nil, errors.Default.Wrap(err, "error on saving scope")
+       }
+
+       return &plugin.ApiResourceOutput{Body: scopes.Data, Status: 
http.StatusOK}, nil
+}
+
+func (s *ScopeAPI) PatchScope(input *plugin.ApiResourceInput) 
(*plugin.ApiResourceOutput, errors.Error) {
+       connectionId, scopeId := extractParam(input.Params)
+       if connectionId == 0 {
+               return nil, errors.BadInput.New("invalid connectionId")
+       }
+
+       db := basicRes.GetDal()
+       scope := ScopeItem{}
+       err := db.First(&scope, dal.Where("connection_id = ? AND scope_id = ?", 
connectionId, scopeId))
+       if err != nil {
+               return nil, errors.Default.Wrap(err, "scope not found")
+       }
+
+       err = api.DecodeMapStruct(input.Body, &scope)
+       if err != nil {
+               return nil, errors.Default.Wrap(err, "patch scope error")
+       }
+
+       err = verifyScope(&scope)
+       if err != nil {
+               return nil, err
+       }
+
+       err = db.Update(&scope)
+       if err != nil {
+               return nil, errors.Default.Wrap(err, "error on saving scope")
+       }
+       return &plugin.ApiResourceOutput{Body: scope, Status: http.StatusOK}, 
nil
+}
+
+func (s *ScopeAPI) ListScopes(input *plugin.ApiResourceInput) 
(*plugin.ApiResourceOutput, errors.Error) {
+       var scopes []ScopeItem
+       connectionId, _ := extractParam(input.Params)
+
+       if connectionId == 0 {
+               return nil, errors.BadInput.New("invalid connectionId")
+       }
+
+       limit, offset := api.GetLimitOffset(input.Query, "pageSize", "page")
+
+       if limit > 100 {
+               return nil, errors.BadInput.New("Page limit cannot exceed 100")
+       }
+
+       db := basicRes.GetDal()
+       err := db.All(&scopes, dal.Where("connection_id = ?", connectionId), 
dal.Limit(limit), dal.Offset(offset))
+       if err != nil {
+               return nil, err
+       }
+
+       var ruleIds []uint64
+       for _, scope := range scopes {
+               if scope.TransformationRuleId > 0 {
+                       ruleIds = append(ruleIds, scope.TransformationRuleId)
+               }
+       }
+
+       var txRuleId2Name []struct {
+               id   uint64
+               name string
+       }
+       if len(ruleIds) > 0 {
+               err = db.All(&txRuleId2Name,
+                       dal.Select("id, name"),
+                       dal.From(s.txRuleType.TableName()),
+                       dal.Where("id IN (?)", ruleIds))
+               if err != nil {
+                       return nil, err
+               }
+       }
+
+       names := make(map[uint64]string)
+       for _, r := range txRuleId2Name {
+               names[r.id] = r.name
+       }
+
+       var apiScopes []apiScopeResponse
+       for _, scope := range scopes {
+               txRuleName := names[scope.TransformationRuleId]
+               scopeRes := apiScopeResponse{
+                       Scope:                  scope,
+                       TransformationRuleName: txRuleName,
+               }
+               apiScopes = append(apiScopes, scopeRes)
+       }
+
+       return &plugin.ApiResourceOutput{Body: apiScopes, Status: 
http.StatusOK}, nil
+}
+
+func (s *ScopeAPI) GetScope(input *plugin.ApiResourceInput) 
(*plugin.ApiResourceOutput, errors.Error) {
+       var scope ScopeItem
+       connectionId, scopeId := extractParam(input.Params)
+       if connectionId == 0 {
+               return nil, errors.BadInput.New("invalid path params")
+       }
+
+       db := basicRes.GetDal()
+       err := db.First(&scope, dal.Where("connection_id = ? AND scope_id = ?", 
connectionId, scopeId))
+       if db.IsErrorNotFound(err) {
+               return nil, errors.NotFound.New("record not found")
+       }
+       if err != nil {
+               return nil, err
+       }
+
+       var ruleName string
+       if scope.TransformationRuleId > 0 {
+               err = db.First(&ruleName, dal.Select("name"), 
dal.From(s.txRuleType.TableName()), dal.Where("id = ?", 
scope.TransformationRuleId))
+               if err != nil {
+                       return nil, err
+               }
+       }
+
+       return &plugin.ApiResourceOutput{Body: apiScopeResponse{scope, 
ruleName}, Status: http.StatusOK}, nil
+}
+
+func extractParam(params map[string]string) (uint64, string) {
+       connectionId, _ := strconv.ParseUint(params["connectionId"], 10, 64)
+       scopeId := params["scopeId"]
+       return connectionId, scopeId
+}
+
+func verifyScope(scope *ScopeItem) errors.Error {
+       if scope.ConnectionId == 0 {
+               return errors.BadInput.New("invalid connectionId")
+       }
+
+       if scope.ScopeId == "" {
+               return errors.BadInput.New("invalid scope ID")
+       }
+
+       return nil
+}
diff --git a/backend/server/services/remote/plugin/transformation_rule_api.go 
b/backend/server/services/remote/plugin/transformation_rule_api.go
new file mode 100644
index 000000000..8a97bfc16
--- /dev/null
+++ b/backend/server/services/remote/plugin/transformation_rule_api.go
@@ -0,0 +1,94 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package plugin
+
+import (
+       "net/http"
+       "strconv"
+
+       "github.com/apache/incubator-devlake/core/dal"
+       "github.com/apache/incubator-devlake/core/errors"
+       "github.com/apache/incubator-devlake/core/models"
+       "github.com/apache/incubator-devlake/core/plugin"
+       "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+)
+
+type TransformationRuleAPI struct {
+       txRuleType *models.DynamicTabler
+}
+
+func (t *TransformationRuleAPI) PostTransformationRules(input 
*plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+       txRule := t.txRuleType.New()
+       err := api.Decode(input.Body, txRule, vld)
+       if err != nil {
+               return nil, errors.BadInput.Wrap(err, "error in decoding 
transformation rule")
+       }
+       db := basicRes.GetDal()
+       err = api.CallDB(db.Create, txRule)
+       if err != nil {
+               return nil, err
+       }
+       return &plugin.ApiResourceOutput{Body: txRule.Unwrap(), Status: 
http.StatusOK}, nil
+}
+
+func (t *TransformationRuleAPI) PatchTransformationRule(input 
*plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+       id, err := strconv.ParseUint(input.Params["id"], 10, 64)
+       if err != nil {
+               return nil, errors.Default.Wrap(err, "id should be an integer")
+       }
+
+       txRule := t.txRuleType.New()
+       db := basicRes.GetDal()
+       err = api.CallDB(db.First, txRule, dal.Where("id = ?", id))
+       if err != nil {
+               return nil, errors.Default.Wrap(err, "no transformation rule 
with given id")
+       }
+
+       err = api.Decode(input.Body, txRule, vld)
+       if err != nil {
+               return nil, errors.Default.Wrap(err, "decoding error")
+       }
+
+       return &plugin.ApiResourceOutput{Body: txRule.Unwrap(), Status: 
http.StatusOK}, nil
+}
+
+func (t *TransformationRuleAPI) GetTransformationRule(input 
*plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+       txRule := t.txRuleType.New()
+       db := basicRes.GetDal()
+       err := api.CallDB(db.First, txRule, dal.Where("id = ?", input.Params))
+       if err != nil {
+               return nil, errors.Default.Wrap(err, "no transformation rule 
with given id")
+       }
+
+       return &plugin.ApiResourceOutput{Body: txRule.Unwrap()}, nil
+}
+
+func (t *TransformationRuleAPI) ListTransformationRules(input 
*plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+       txRules := t.txRuleType.NewSlice()
+       limit, offset := api.GetLimitOffset(input.Query, "pageSize", "page")
+       if limit > 100 {
+               return nil, errors.BadInput.New("pageSize cannot exceed 100")
+       }
+
+       db := basicRes.GetDal()
+       err := api.CallDB(db.All, txRules, dal.Limit(limit), dal.Offset(offset))
+       if err != nil {
+               return nil, err
+       }
+       return &plugin.ApiResourceOutput{Body: txRules.Unwrap()}, nil
+}
diff --git a/backend/test/remote/fakeplugin/fakeplugin/main.py 
b/backend/test/remote/fakeplugin/fakeplugin/main.py
index 0c7c2db8e..51c13bb9b 100644
--- a/backend/test/remote/fakeplugin/fakeplugin/main.py
+++ b/backend/test/remote/fakeplugin/fakeplugin/main.py
@@ -19,7 +19,7 @@ from typing import Optional
 
 from sqlmodel import Field
 
-from pydevlake import Plugin, Connection, Stream, ToolModel
+from pydevlake import Plugin, Connection, TransformationRule, Stream, ToolModel
 from pydevlake.domain_layer.devops import CICDScope, CICDPipeline
 
 
@@ -94,6 +94,10 @@ class FakeConnection(Connection):
     token: str
 
 
+class FakeTransformationRule(TransformationRule):
+    tx1: str
+
+
 class FakePlugin(Plugin):
     @property
     def connection_type(self):


Reply via email to