This is an automated email from the ASF dual-hosted git repository.

hez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 4092092bb 5250 define migration script on python side (#5286)
4092092bb is described below

commit 4092092bba27ff3c258bdadd4177904f4701300f
Author: Camille Teruel <[email protected]>
AuthorDate: Wed May 31 19:45:44 2023 +0200

    5250 define migration script on python side (#5286)
    
    * feat: Improve dynamic struct generation from JSON schema
    
    * Support "date-time" string format: time.Time (or *time.Time if not 
required)
    * Support "number" property type: float64
    * Generate gorm `primaryKey` tag for Field(primary_key=True, ...)
    * Generate gorm `type` tag for string fields: `varchar(n)` if n <= 255, 
`text` otherwise
    * Generate gorm `serializer:encdec` tag for string format "password": Use 
SecretStr on python side
    * Generate `validate:"required"` for required properties
    
    * feat: AutoMigrate all tool models
    
    * feat: MigrationScripts for python plugins
    
    Support definition of migration scripts on python side.
    
    ---------
    
    Co-authored-by: Camille Teruel <[email protected]>
    Co-authored-by: Hezheng Yin <[email protected]>
---
 backend/core/utils/json.go                         |  74 ++++++++++
 backend/core/utils/json_test.go                    | 111 +++++++++++++++
 backend/python/README.md                           |  41 ++++--
 .../python/plugins/azuredevops/azuredevops/api.py  |   2 +-
 .../python/plugins/azuredevops/azuredevops/main.py |   2 +-
 .../plugins/azuredevops/azuredevops/models.py      |  36 ++---
 backend/python/pydevlake/pydevlake/__init__.py     |   8 +-
 backend/python/pydevlake/pydevlake/ipc.py          |   4 -
 backend/python/pydevlake/pydevlake/message.py      |   9 +-
 backend/python/pydevlake/pydevlake/migration.py    | 113 +++++++++++++++
 backend/python/pydevlake/pydevlake/model.py        |  12 +-
 backend/python/pydevlake/pydevlake/plugin.py       |  29 +---
 backend/python/pydevlake/tests/migration_test.py   |  65 +++++++++
 backend/python/test/fakeplugin/fakeplugin/main.py  |   5 +-
 backend/server/services/remote/init.go             |   5 +-
 .../server/services/remote/models/conversion.go    | 127 +++++++++++++----
 .../services/remote/models/conversion_test.go      | 153 +++++++++++++++++++++
 backend/server/services/remote/models/migration.go | 140 +++++++++++++++++++
 .../services/remote/models/migration_test.go       |  67 +++++++++
 backend/server/services/remote/models/models.go    |  39 +++---
 .../server/services/remote/models/plugin_remote.go |   3 +-
 .../server/services/remote/plugin/plugin_impl.go   |  59 ++++++--
 22 files changed, 955 insertions(+), 149 deletions(-)

diff --git a/backend/core/utils/json.go b/backend/core/utils/json.go
new file mode 100644
index 000000000..557f64cec
--- /dev/null
+++ b/backend/core/utils/json.go
@@ -0,0 +1,74 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package utils
+
+import (
+       "fmt"
+       "reflect"
+
+       "github.com/apache/incubator-devlake/core/errors"
+)
+
+type JsonObject = map[string]any
+type JsonArray = []any
+
+func GetProperty[T any](object JsonObject, key string) (T, errors.Error) {
+       property, ok := object[key]
+       if !ok {
+               return *new(T), errors.Default.New(fmt.Sprintf("Missing 
property \"%s\"", key))
+       }
+       return Convert[T](property)
+}
+
+func GetItem[T any](array JsonArray, index int) (T, errors.Error) {
+       if index < 0 || index >= len(array) {
+               return *new(T), errors.Default.New(fmt.Sprintf("Index %d out of 
range", index))
+       }
+       return Convert[T](array[index])
+}
+
+// Convert converts value to type T. If value is a slice, it converts each 
element of the slice to type T.
+// Does not support nested slices.
+func Convert[T any](value any) (T, errors.Error) {
+       var t T
+       tType := reflect.TypeOf(t)
+       if tType.Kind() == reflect.Slice {
+               valueSlice, ok := value.([]any)
+               if !ok {
+                       return t, errors.Default.New("Value is not a slice")
+               }
+               elemType := tType.Elem()
+               result := reflect.MakeSlice(tType, 0, len(valueSlice))
+               for i, v := range valueSlice {
+                       value := reflect.ValueOf(v)
+                       if elemType.AssignableTo(reflect.TypeOf(v)) {
+                               elem := value.Convert(elemType)
+                               result = reflect.Append(result, elem)
+                       } else {
+                               return t, 
errors.Default.New(fmt.Sprintf("Element %d is not of type %s", i, 
elemType.Name()))
+                       }
+               }
+               return result.Interface().(T), nil
+       } else {
+               result, ok := value.(T)
+               if !ok {
+                       return t, errors.Default.New(fmt.Sprintf("Value is not 
of type %T", t))
+               }
+               return result, nil
+       }
+}
diff --git a/backend/core/utils/json_test.go b/backend/core/utils/json_test.go
new file mode 100644
index 000000000..ae573b932
--- /dev/null
+++ b/backend/core/utils/json_test.go
@@ -0,0 +1,111 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package utils
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestExistingProperty(t *testing.T) {
+       object := map[string]interface{}{
+               "id": 1,
+       }
+
+       res, err := GetProperty[int](object, "id")
+
+       assert.NoError(t, err)
+       assert.Equal(t, res, 1)
+}
+
+func TestMissingProperty(t *testing.T) {
+       object := map[string]interface{}{
+               "id": 1,
+       }
+
+       _, err := GetProperty[int](object, "name")
+
+       assert.Error(t, err)
+       assert.Equal(t, "Missing property \"name\"", err.Error())
+}
+
+func TestInvalidPropertyType(t *testing.T) {
+       object := map[string]interface{}{
+               "id": 1,
+       }
+
+       _, err := GetProperty[string](object, "id")
+
+       assert.Error(t, err)
+       assert.Equal(t, "Value is not of type string", err.Error())
+}
+
+func TestGetItemInRange(t *testing.T) {
+       array := []any{1, 2, 3}
+
+       res, err := GetItem[int](array, 1)
+
+       assert.NoError(t, err)
+       assert.Equal(t, 2, res)
+}
+
+func TestGetItemOutOfRange(t *testing.T) {
+       array := []any{1, 2, 3}
+
+       _, err := GetItem[int](array, 3)
+
+       assert.Error(t, err)
+       assert.Equal(t, "Index 3 out of range", err.Error())
+}
+
+func TestConvertSlice(t *testing.T) {
+       value := []any{1, 2, 3}
+
+       res, err := Convert[[]int](value)
+
+       assert.NoError(t, err)
+       assert.Equal(t, []int{1, 2, 3}, res)
+}
+
+func TestConvertSliceInvalidType(t *testing.T) {
+       value := []any{1, 2, 3}
+
+       val, err := Convert[[]string](value)
+       _ = val
+       assert.Error(t, err)
+       assert.Equal(t, "Element 0 is not of type string", err.Error())
+}
+
+func TestConvertSliceInvalidValue(t *testing.T) {
+       value := []any{1, "2", 3}
+
+       _, err := Convert[[]int](value)
+
+       assert.Error(t, err)
+       assert.Equal(t, "Element 1 is not of type int", err.Error())
+}
+
+func TestConvertSliceInvalidSlice(t *testing.T) {
+       value := 1
+
+       _, err := Convert[[]int](value)
+
+       assert.Error(t, err)
+       assert.Equal(t, "Value is not a slice", err.Error())
+}
diff --git a/backend/python/README.md b/backend/python/README.md
index 192798e58..1e52053d4 100644
--- a/backend/python/README.md
+++ b/backend/python/README.md
@@ -100,11 +100,15 @@ and those that are used to customize conversion to domain 
models that are groupe
 For example, to add `url` and `token` parameter, edit `MyPluginConnection` as 
follow:
 
 ```python
+from pydantic import SecretStr
+
 class MyPluginConnection(Connection):
     url: str
-    token: str
+    token: SecretStr
 ```
 
+Using type `SecretStr` instead of `str` will encode the value in the database.
+To get the `str` value, you need to call `get_secret_value()`: 
`connection.token.get_secret_value()`.
 All plugin methods that have a connection parameter will be called with an 
instance of this class.
 Note that you should not define `__init__`.
 
@@ -235,20 +239,28 @@ To facilitate or even eliminate extraction, your tool 
models should be close to
 #### Migration of tool models
 
 Tool models, connection, scope and transformation rule types are stored in the 
DevLake database.
-When you change the definition of one of those types, you need to migrate the 
database.
-You should implement the migration logic in the model class by defining a 
`migrate` class method. This method takes a sqlalchemy session as argument that 
you can use to
-execute SQL `ALTER TABLE` statements.
+When you change the definition of one of those types, the database needs to be 
migrated.
+Automatic migration takes care of most modifications, but some changes require 
manual migration. For example, automatic migration never drops columns. Another 
example is adding a column to the primary key of a table, you need to write a 
script that remove the primary key constraint and add a new compound primary 
key.
 
-```python
-class User(ToolModel, table=True):
-    id: str = Field(primary_key=True)
-    name: str
-    email: str
-    age: int
+To declare a new migration script, you decorate a function with the 
`migration` decorator. The function name should describe what the script does. 
The `migration` decorator takes a version number that should be a 14 digits 
timestamp in the format `YYYYMMDDhhmmss`. The function takes a 
`MigrationScriptBuilder` as a parameter. This builder exposes methods to 
execute migration operations.
+
+##### Migration operations
+
+The `MigrationScriptBuilder` exposes the following methods:
+- `execute(sql: str, dialect: Optional[Dialect])`: execute a raw SQL 
statement. The `dialect` parameter is used to execute the SQL statement only if 
the database is of the given dialect. If `dialect` is `None`, the statement is 
executed unconditionally.
+- `drop_column(table: str, column: str)`: drop a column from a table
+- `drop_table(table: str)`: drop a table
 
-    @classmethod
-    def migrate(cls, session):
-        session.execute(f"ALTER TABLE {cls.__tablename__} ADD COLUMN age INT")
+
+```python
+from pydevlake.migration import MigrationScriptBuilder, migration, Dialect
+
+@migration(20230524181430)
+def add_build_id_as_job_primary_key(b: MigrationScriptBuilder):
+    table = Job.__tablename__
+    b.execute(f'ALTER TABLE {table} DROP PRIMARY KEY', Dialect.MYSQL)
+    b.execute(f'ALTER TABLE {table} DROP CONSTRAINT {table}_pkey', 
Dialect.POSTGRESQL)
+    b.execute(f'ALTER TABLE {table} ADD PRIMARY KEY (id, build_id)')
 ```
 
 
@@ -481,7 +493,8 @@ class UserComments(Substream):
         """
         This method will be called for each user collected from parent stream 
Users.
         """
-        for json in 
MyPluginAPI(context.connection.token).user_comments(user.id):
+        api = MyPluginAPI(context.connection.token.get_secret_value())
+        for json in api.user_comments(user.id):
             yield json, state
     ...
 ```
diff --git a/backend/python/plugins/azuredevops/azuredevops/api.py 
b/backend/python/plugins/azuredevops/azuredevops/api.py
index 4c63e507a..9461d0890 100644
--- a/backend/python/plugins/azuredevops/azuredevops/api.py
+++ b/backend/python/plugins/azuredevops/azuredevops/api.py
@@ -36,7 +36,7 @@ class AzureDevOpsAPI(API):
 
     @request_hook
     def authenticate(self, request: Request):
-        token_b64 = base64.b64encode((':' + 
self.connection.token).encode()).decode()
+        token_b64 = base64.b64encode((':' + 
self.connection.token.get_secret_value()).encode()).decode()
         request.headers['Authorization'] = 'Basic ' + token_b64
 
     @request_hook
diff --git a/backend/python/plugins/azuredevops/azuredevops/main.py 
b/backend/python/plugins/azuredevops/azuredevops/main.py
index fabc53a63..3a96a1964 100644
--- a/backend/python/plugins/azuredevops/azuredevops/main.py
+++ b/backend/python/plugins/azuredevops/azuredevops/main.py
@@ -109,7 +109,7 @@ class AzureDevOpsPlugin(Plugin):
     def extra_tasks(self, scope: GitRepository, tx_rule: 
AzureDevOpsTransformationRule, entity_types: list[DomainType], connection: 
AzureDevOpsConnection):
         if DomainType.CODE in entity_types:
             url = urlparse(scope.remote_url)
-            url = 
url._replace(netloc=f'{url.username}:{connection.token}@{url.hostname}')
+            url = 
url._replace(netloc=f'{url.username}:{connection.token.get_secret_value()}@{url.hostname}')
             yield gitextractor(url.geturl(), scope.domain_id(), 
connection.proxy)
 
     def extra_stages(self, scope_tx_rule_pairs: list[ScopeTxRulePair], 
entity_types: list[DomainType], _):
diff --git a/backend/python/plugins/azuredevops/azuredevops/models.py 
b/backend/python/plugins/azuredevops/azuredevops/models.py
index 5c656afd1..3e08ef68b 100644
--- a/backend/python/plugins/azuredevops/azuredevops/models.py
+++ b/backend/python/plugins/azuredevops/azuredevops/models.py
@@ -18,15 +18,16 @@ from enum import Enum
 from typing import Optional
 import re
 
-from sqlmodel import Session, Column, Text
+from pydantic import SecretStr
 
 from pydevlake import Field, Connection, TransformationRule
 from pydevlake.model import ToolModel, ToolScope
 from pydevlake.pipeline_tasks import RefDiffOptions
+from pydevlake.migration import migration, MigrationScriptBuilder, Dialect
 
 
 class AzureDevOpsConnection(Connection):
-    token: str
+    token: SecretStr
     organization: Optional[str]
 
 
@@ -52,7 +53,7 @@ class GitPullRequest(ToolModel, table=True):
         Completed = "completed"
 
     pull_request_id: int = Field(primary_key=True)
-    description: Optional[str] = Field(sa_column=Column(Text))
+    description: Optional[str]
     status: PRStatus
     created_by_id: str = Field(source='/createdBy/id')
     created_by_name: str = Field(source='/createdBy/displayName')
@@ -68,14 +69,6 @@ class GitPullRequest(ToolModel, table=True):
     source_ref_name: Optional[str]
     fork_repo_id: Optional[str] = Field(source='/forkSource/repository/id')
 
-    @classmethod
-    def migrate(self, session: Session):
-        dialect = session.bind.dialect.name
-        if dialect == 'mysql':
-            session.execute(f'ALTER TABLE {self.__tablename__} MODIFY COLUMN 
description TEXT')
-        elif dialect == 'postgresql':
-            session.execute(f'ALTER TABLE {self.__tablename__} ALTER COLUMN 
description TYPE TEXT')
-
 
 class GitPullRequestCommit(ToolModel, table=True):
     commit_id: str = Field(primary_key=True)
@@ -132,13 +125,14 @@ class Job(ToolModel, table=True):
     state: JobState
     result: JobResult
 
-    @classmethod
-    def migrate(self, session: Session):
-        dialect = session.bind.dialect.name
-        if dialect == 'mysql':
-            session.execute(f'ALTER TABLE {self.__tablename__} DROP PRIMARY 
KEY')
-        elif dialect == 'postgresql':
-            session.execute(f'ALTER TABLE {self.__tablename__} DROP CONSTRAINT 
{self.__tablename__}_pkey')
-        else:
-            raise Exception(f'Unsupported dialect {dialect}')
-        session.execute(f'ALTER TABLE {self.__tablename__} ADD PRIMARY KEY 
(id, build_id)')
+
+@migration(20230524181430)
+def add_build_id_as_job_primary_key(b: MigrationScriptBuilder):
+    # NOTE: We can't add a column to the primary key of an existing table
+    # so we have to drop the primary key constraint first,
+    # which is done differently in MySQL and PostgreSQL,
+    # and then add the new composite primary key.
+    table = Job.__tablename__
+    b.execute(f'ALTER TABLE {table} DROP PRIMARY KEY', Dialect.MYSQL)
+    b.execute(f'ALTER TABLE {table} DROP CONSTRAINT {table}_pkey', 
Dialect.POSTGRESQL)
+    b.execute(f'ALTER TABLE {table} ADD PRIMARY KEY (id, build_id)')
diff --git a/backend/python/pydevlake/pydevlake/__init__.py 
b/backend/python/pydevlake/pydevlake/__init__.py
index d645c0749..0c10c3e9f 100644
--- a/backend/python/pydevlake/pydevlake/__init__.py
+++ b/backend/python/pydevlake/pydevlake/__init__.py
@@ -21,14 +21,16 @@ pytest.register_assert_rewrite('pydevlake.testing')
 from sqlmodel import Field as _Field
 
 
-def Field(*args, schema_extra: Optional[dict[str, Any]]=None, source: 
Optional[str]=None, **kwargs):
+def Field(*args, primary_key: bool=False, source: Optional[str]=None, 
**kwargs):
     """
     A wrapper around sqlmodel.Field that adds a source parameter.
     """
-    schema_extra = schema_extra or {}
+    schema_extra = kwargs.get('schema_extra', {})
     if source:
         schema_extra['source'] = source
-    return _Field(*args, **kwargs, schema_extra=schema_extra)
+    if primary_key:
+        schema_extra['primaryKey'] = True
+    return _Field(*args, **kwargs, primary_key=primary_key, 
schema_extra=schema_extra)
 
 
 from .model import ToolModel, ToolScope, DomainScope, Connection, 
TransformationRule, domain_id
diff --git a/backend/python/pydevlake/pydevlake/ipc.py 
b/backend/python/pydevlake/pydevlake/ipc.py
index e59065af0..d783800a7 100644
--- a/backend/python/pydevlake/pydevlake/ipc.py
+++ b/backend/python/pydevlake/pydevlake/ipc.py
@@ -100,10 +100,6 @@ class PluginCommands:
         entities = [DomainType(e) for e in entities]
         return self._plugin.make_pipeline(scope_tx_rule_pairs, entities, 
connection)
 
-    @plugin_method
-    def run_migrations(self, db_url, force: bool):
-        self._plugin.run_migrations(create_db_engine(db_url), force)
-
     @plugin_method
     def plugin_info(self):
         return self._plugin.plugin_info()
diff --git a/backend/python/pydevlake/pydevlake/message.py 
b/backend/python/pydevlake/pydevlake/message.py
index 0cb7c23cd..44228f56c 100644
--- a/backend/python/pydevlake/pydevlake/message.py
+++ b/backend/python/pydevlake/pydevlake/message.py
@@ -20,6 +20,7 @@ from pydantic import BaseModel, Field
 import jsonref
 
 from pydevlake.model import ToolScope
+from pydevlake.migration import MigrationScript
 
 
 class Message(BaseModel):
@@ -48,6 +49,10 @@ class DynamicModelInfo(Message):
             # Replace $ref with actual schema
             schema = jsonref.replace_refs(schema, proxies=False)
             del schema['definitions']
+        # Pydantic forgets to put type in enums
+        for prop in schema['properties'].values():
+            if 'type' not in prop and 'enum' in prop:
+                prop['type'] = 'string'
         return DynamicModelInfo(
             json_schema=schema,
             table_name=model_class.__tablename__
@@ -60,11 +65,11 @@ class PluginInfo(Message):
     connection_model_info: DynamicModelInfo
     transformation_rule_model_info: Optional[DynamicModelInfo]
     scope_model_info: DynamicModelInfo
+    tool_model_infos: list[DynamicModelInfo]
+    migration_scripts: list[MigrationScript]
     plugin_path: str
     subtask_metas: list[SubtaskMeta]
     extension: str = "datasource"
-    type: str = "python-poetry"
-    tables: list[str]
 
 
 class RemoteProgress(Message):
diff --git a/backend/python/pydevlake/pydevlake/migration.py 
b/backend/python/pydevlake/pydevlake/migration.py
new file mode 100644
index 000000000..e64eb9115
--- /dev/null
+++ b/backend/python/pydevlake/pydevlake/migration.py
@@ -0,0 +1,113 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from typing import List, Literal, Optional, Union, Annotated
+from enum import Enum
+from datetime import datetime
+
+from pydantic import BaseModel, Field
+
+
+MIGRATION_SCRIPTS = []
+
+class Dialect(Enum):
+    MYSQL = "mysql"
+    POSTGRESQL = "postgres"
+
+
+class Execute(BaseModel):
+    type: Literal["execute"] = "execute"
+    sql: str
+    dialect: Optional[Dialect] = None
+
+
+class DropColumn(BaseModel):
+    type: Literal["drop_column"] = "drop_column"
+    table: str
+    column: str
+
+
+class DropTable(BaseModel):
+    type: Literal["drop_table"] = "drop_table"
+    table: str
+
+
+Operation = Annotated[
+    Union[Execute, DropColumn, DropTable],
+    Field(discriminator="type")
+]
+
+
+class MigrationScript(BaseModel):
+    operations: List[Operation]
+    version: int
+    name: str
+
+
+class MigrationScriptBuilder:
+    def __init__(self):
+        self.operations = []
+
+    def execute(self, sql: str, dialect: Optional[Dialect] = None):
+        """
+        Executes a raw SQL statement.
+        If dialect is specified the statement will be executed only if the db 
dialect matches.
+        """
+        self.operations.append(Execute(sql=sql, dialect=dialect))
+
+    def drop_column(self, table: str, column: str):
+        """
+        Drops a column from a table.
+        """
+        self.operations.append(DropColumn(table=table, column=column))
+
+    def drop_table(self, table: str):
+        """
+        Drops a table.
+        """
+        self.operations.append(DropTable(table=table))
+
+
+def migration(version: int, name: Optional[str] = None):
+    """
+    Builds a migration script from a function.
+
+    Usage:
+
+    @migration(20230511)
+    def change_description_type(b: MigrationScriptBuilder):
+        b.exec('ALTER TABLE my_table ...')
+    """
+    _validate_version(version)
+
+    def wrapper(fn):
+        builder = MigrationScriptBuilder()
+        fn(builder)
+        script = MigrationScript(operations=builder.operations, 
version=version, name=name or fn.__name__)
+        MIGRATION_SCRIPTS.append(script)
+        return script
+    return wrapper
+
+
+def _validate_version(version: int):
+    str_version = str(version)
+    err = ValueError(f"Invalid version {version}, must be in YYYYMMDDhhmmss 
format")
+    if len(str_version) != 14:
+        raise err
+    try:
+        datetime.strptime(str_version, "%Y%m%d%H%M%S")
+    except ValueError:
+        raise  err
diff --git a/backend/python/pydevlake/pydevlake/model.py 
b/backend/python/pydevlake/pydevlake/model.py
index 995c13b3a..2c576a12e 100644
--- a/backend/python/pydevlake/pydevlake/model.py
+++ b/backend/python/pydevlake/pydevlake/model.py
@@ -20,7 +20,7 @@ from inspect import getmodule
 from datetime import datetime
 
 import inflect
-from pydantic import AnyUrl, validator
+from pydantic import AnyUrl, SecretStr, validator
 from sqlalchemy import Column, DateTime
 from sqlalchemy.orm import declared_attr, Session
 from sqlalchemy.inspection import inspect
@@ -48,6 +48,9 @@ class ToolTable(SQLModel):
 
     class Config:
         allow_population_by_field_name = True
+        json_encoders = {
+            SecretStr: lambda v: v.get_secret_value() if v else None
+        }
 
         @classmethod
         def alias_generator(cls, attr_name: str) -> str:
@@ -56,13 +59,6 @@ class ToolTable(SQLModel):
             parts = attr_name.split('_')
             return parts[0] + ''.join(word.capitalize() for word in parts[1:])
 
-    @classmethod
-    def migrate(cls, session: Session):
-        """
-        Redefine this method to perform migration on this tool model.
-        """
-        pass
-
 
 class Connection(ToolTable, Model):
     name: str
diff --git a/backend/python/pydevlake/pydevlake/plugin.py 
b/backend/python/pydevlake/pydevlake/plugin.py
index d543fe791..e94791895 100644
--- a/backend/python/pydevlake/pydevlake/plugin.py
+++ b/backend/python/pydevlake/pydevlake/plugin.py
@@ -20,16 +20,14 @@ import os
 import sys
 
 import fire
-from sqlmodel import SQLModel, Session
-from sqlalchemy.inspection import inspect
-from sqlalchemy.engine import Engine
 
 import pydevlake.message as msg
 from pydevlake.subtasks import Subtask
 from pydevlake.ipc import PluginCommands
 from pydevlake.context import Context
 from pydevlake.stream import Stream, DomainType
-from pydevlake.model import ToolScope, DomainScope, Connection, 
TransformationRule, SubtaskRun
+from pydevlake.model import ToolScope, DomainScope, Connection, 
TransformationRule
+from pydevlake.migration import MIGRATION_SCRIPTS
 
 
 ScopeTxRulePair = tuple[ToolScope, Optional[TransformationRule]]
@@ -105,24 +103,6 @@ class Plugin(ABC):
     def convert(self, ctx: Context, stream: str):
         yield from self.get_stream(stream).convertor.run(ctx)
 
-    def run_migrations(self, engine: Engine, force: bool = False):
-        # NOTE: Not sure what "force" is for
-        # TODO: Support migration for transformation rule and connection tables
-        # They are currently created on go-side.
-        tool_models = [stream.tool_model for stream in self._streams.values()]
-        tool_models.append(self.tool_scope_type)
-        inspector = inspect(engine)
-        tables = SQLModel.metadata.tables
-        with Session(engine) as session:
-            for model in tool_models:
-                if inspector.has_table(model.__tablename__):
-                    # TODO: Add version table and migrate if needed
-                    model.migrate(session)
-                else:
-                    tables[model.__tablename__].create(engine)
-            session.commit()
-        tables[SubtaskRun.__tablename__].create(engine, checkfirst=True)
-
     def make_remote_scopes(self, connection: Connection, group_id: 
Optional[str] = None) -> msg.RemoteScopes:
         if group_id:
             remote_scopes = []
@@ -237,8 +217,6 @@ class Plugin(ABC):
             tx_rule_model_info = 
msg.DynamicModelInfo.from_model(self.transformation_rule_type)
         else:
             tx_rule_model_info = None
-        plugin_tables = [stream(self.name).raw_model_table for stream in 
self.streams] + \
-                        [stream.tool_model.__tablename__ for stream in 
self.streams]
         return msg.PluginInfo(
             name=self.name,
             description=self.description,
@@ -247,8 +225,9 @@ class Plugin(ABC):
             
connection_model_info=msg.DynamicModelInfo.from_model(self.connection_type),
             transformation_rule_model_info=tx_rule_model_info,
             
scope_model_info=msg.DynamicModelInfo.from_model(self.tool_scope_type),
+            
tool_model_infos=[msg.DynamicModelInfo.from_model(stream.tool_model) for stream 
in self._streams.values()],
             subtask_metas=subtask_metas,
-            tables=plugin_tables,
+            migration_scripts=MIGRATION_SCRIPTS
         )
 
     def _plugin_path(self):
diff --git a/backend/python/pydevlake/tests/migration_test.py 
b/backend/python/pydevlake/tests/migration_test.py
new file mode 100644
index 000000000..6e19825f0
--- /dev/null
+++ b/backend/python/pydevlake/tests/migration_test.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from pydevlake.migration import migration, MIGRATION_SCRIPTS
+
+
+@migration(20230520174322)
+def my_migration(b):
+    b.execute("SOME SQL")
+    b.drop_column("t", "c")
+    b.drop_table("t")
+
+
+def test_migration():
+    assert my_migration.version == 20230520174322
+    assert my_migration.name == "my_migration"
+    assert len(my_migration.operations) == 3
+
+    op1 = my_migration.operations[0]
+    assert op1.sql == "SOME SQL"
+    assert op1.dialect is None
+
+    op2 = my_migration.operations[1]
+    assert op2.table == "t"
+    assert op2.column == "c"
+
+    op3 = my_migration.operations[2]
+    assert op3.table == "t"
+
+
+def test_registration():
+    assert my_migration in MIGRATION_SCRIPTS
+
+
+def test_serialization():
+    val = my_migration.dict()
+    assert val["version"] == 20230520174322
+    assert val["name"] == "my_migration"
+    assert len(val["operations"]) == 3
+
+    op1 = val["operations"][0]
+    assert op1["type"] == "execute"
+    assert op1["sql"] == "SOME SQL"
+    assert "dialect" not in op1 or op1["dialect"] is None
+
+    op2 = val["operations"][1]
+    assert op2["type"] == "drop_column"
+    assert op2["table"] == "t"
+    assert op2["column"] == "c"
+
+    op3 = val["operations"][2]
+    assert op3["type"] == "drop_table"
+    assert op3["table"] == "t"
diff --git a/backend/python/test/fakeplugin/fakeplugin/main.py 
b/backend/python/test/fakeplugin/fakeplugin/main.py
index 6a048ff22..55c548965 100644
--- a/backend/python/test/fakeplugin/fakeplugin/main.py
+++ b/backend/python/test/fakeplugin/fakeplugin/main.py
@@ -19,6 +19,7 @@ from typing import Optional
 import json
 
 from sqlmodel import Field
+from pydantic import SecretStr
 
 from pydevlake import Plugin, Connection, TransformationRule, Stream, 
ToolModel, ToolScope, RemoteScopeGroup, DomainType
 from pydevlake.domain_layer.devops import CicdScope, CICDPipeline, CICDStatus, 
CICDResult, CICDType
@@ -96,7 +97,7 @@ class FakePipelineStream(Stream):
 
 
 class FakeConnection(Connection):
-    token: str
+    token: SecretStr
 
 
 class FakeProject(ToolScope, table=True):
@@ -149,7 +150,7 @@ class FakePlugin(Plugin):
         ]
 
     def test_connection(self, connection: FakeConnection):
-        if connection.token != VALID_TOKEN:
+        if connection.token.get_secret_value() != VALID_TOKEN:
             raise Exception("Invalid token")
 
     @property
diff --git a/backend/server/services/remote/init.go 
b/backend/server/services/remote/init.go
index 3e505b96c..4c6dd8ff1 100644
--- a/backend/server/services/remote/init.go
+++ b/backend/server/services/remote/init.go
@@ -19,7 +19,7 @@ package remote
 
 import (
        "fmt"
-       "github.com/apache/incubator-devlake/core/config"
+
        "github.com/apache/incubator-devlake/core/context"
        "github.com/apache/incubator-devlake/core/errors"
        pluginCore "github.com/apache/incubator-devlake/core/plugin"
@@ -43,8 +43,7 @@ func NewRemotePlugin(info *models.PluginInfo) 
(models.RemotePlugin, errors.Error
        if err != nil {
                return nil, err
        }
-       forceMigration := config.GetConfig().GetBool("FORCE_MIGRATION")
-       err = plugin.RunMigrations(forceMigration)
+       err = plugin.RunAutoMigrations()
        if err != nil {
                return nil, err
        }
diff --git a/backend/server/services/remote/models/conversion.go 
b/backend/server/services/remote/models/conversion.go
index bd221ecaa..877597ab4 100644
--- a/backend/server/services/remote/models/conversion.go
+++ b/backend/server/services/remote/models/conversion.go
@@ -20,33 +20,35 @@ package models
 import (
        "encoding/json"
        "fmt"
-       "github.com/apache/incubator-devlake/impls/dalgorm"
        "reflect"
        "strings"
        "time"
 
+       "github.com/apache/incubator-devlake/impls/dalgorm"
+
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/models"
+       "github.com/apache/incubator-devlake/core/utils"
        "gorm.io/datatypes"
 )
 
-func LoadTableModel(tableName string, schema map[string]any, encrypt bool, 
parentModel any) (*models.DynamicTabler, errors.Error) {
-       structType, err := GenerateStructType(schema, encrypt, 
reflect.TypeOf(parentModel))
+func LoadTableModel(tableName string, schema utils.JsonObject, parentModel 
any) (*models.DynamicTabler, errors.Error) {
+       structType, err := GenerateStructType(schema, 
reflect.TypeOf(parentModel))
        if err != nil {
                return nil, err
        }
        return models.NewDynamicTabler(tableName, structType), nil
 }
 
-func GenerateStructType(schema map[string]any, encrypt bool, baseType 
reflect.Type) (reflect.Type, errors.Error) {
+func GenerateStructType(schema utils.JsonObject, baseType reflect.Type) 
(reflect.Type, errors.Error) {
        var structFields []reflect.StructField
-       propsRaw, ok := schema["properties"]
-       if !ok {
-               return nil, errors.BadInput.New("Missing properties in JSON 
schema")
+       props, err := utils.GetProperty[utils.JsonObject](schema, "properties")
+       if err != nil {
+               return nil, err
        }
-       props, ok := propsRaw.(map[string]any)
-       if !ok {
-               return nil, errors.BadInput.New("JSON schema properties must be 
an object")
+       required, err := utils.GetProperty[[]string](schema, "required")
+       if err != nil {
+               return nil, err
        }
        if baseType != nil {
                anonymousField := reflect.StructField{
@@ -61,8 +63,8 @@ func GenerateStructType(schema map[string]any, encrypt bool, 
baseType reflect.Ty
                if isBaseTypeField(k, baseType) {
                        continue
                }
-               spec := v.(map[string]any)
-               field, err := generateStructField(k, encrypt, spec)
+               spec := v.(utils.JsonObject)
+               field, err := generateStructField(k, spec, isRequired(k, 
required))
                if err != nil {
                        return nil, err
                }
@@ -98,6 +100,15 @@ func ToDatabaseMap(tableName string, ifc any, createdAt 
*time.Time, updatedAt *t
        return m, nil
 }
 
+func isRequired(fieldName string, required []string) bool {
+       for _, r := range required {
+               if fieldName == r {
+                       return true
+               }
+       }
+       return false
+}
+
 func isBaseTypeField(fieldName string, baseType reflect.Type) bool {
        fieldName = canonicalFieldName(fieldName)
        for i := 0; i < baseType.NumField(); i++ {
@@ -118,26 +129,33 @@ func canonicalFieldName(fieldName string) string {
        return strings.ToLower(strings.Replace(fieldName, "_", "", -1))
 }
 
-func generateStructField(name string, encrypt bool, schema map[string]any) 
(*reflect.StructField, errors.Error) {
-       goType, err := getGoType(schema)
+var (
+       int64Type   = reflect.TypeOf(int64(0))
+       float64Type = reflect.TypeOf(float64(0))
+       boolType    = reflect.TypeOf(false)
+       stringType  = reflect.TypeOf("")
+       timeType    = reflect.TypeOf(time.Time{})
+       jsonMapType = reflect.TypeOf(datatypes.JSONMap{})
+)
+
+func generateStructField(name string, schema utils.JsonObject, required bool) 
(*reflect.StructField, errors.Error) {
+       goType, err := getGoType(schema, required)
        if err != nil {
                return nil, errors.Default.Wrap(err, fmt.Sprintf("couldn't 
resolve type for field: \"%s\"", name))
        }
+       tag, err := getTag(name, schema, goType, required)
+       if err != nil {
+               return nil, err
+       }
        sf := &reflect.StructField{
                Name: strings.Title(name), //nolint:staticcheck
                Type: goType,
-               Tag:  reflect.StructTag(fmt.Sprintf("json:\"%s\"", name)),
-       }
-       if encrypt {
-               sf.Tag = reflect.StructTag(fmt.Sprintf("json:\"%s\" "+
-                       "gorm:\"serializer:encdec\"", //just encrypt everything 
for GORM operations - makes things easy
-                       name))
+               Tag:  tag,
        }
        return sf, nil
 }
 
-func getGoType(schema map[string]any) (reflect.Type, errors.Error) {
-       var goType reflect.Type
+func getGoType(schema utils.JsonObject, required bool) (reflect.Type, 
errors.Error) {
        jsonType, ok := schema["type"].(string)
        if !ok {
                return nil, errors.BadInput.New("\"type\" property must be a 
string")
@@ -145,15 +163,70 @@ func getGoType(schema map[string]any) (reflect.Type, 
errors.Error) {
        switch jsonType {
        //TODO: support more types
        case "integer":
-               goType = reflect.TypeOf(uint64(0))
+               return int64Type, nil
+       case "number":
+               return float64Type, nil
        case "boolean":
-               goType = reflect.TypeOf(false)
+               return boolType, nil
        case "string":
-               goType = reflect.TypeOf("")
+               format, err := utils.GetProperty[string](schema, "format")
+               if err == nil && format == "date-time" {
+                       if required {
+                               return timeType, nil
+                       } else {
+                               return reflect.PtrTo(timeType), nil
+                       }
+               } else {
+                       return stringType, nil
+               }
        case "object":
-               goType = reflect.TypeOf(datatypes.JSONMap{})
+               return jsonMapType, nil
        default:
                return nil, errors.BadInput.New(fmt.Sprintf("Unsupported type 
%s", jsonType))
        }
-       return goType, nil
+}
+
+func getTag(name string, schema utils.JsonObject, goType reflect.Type, 
required bool) (reflect.StructTag, errors.Error) {
+       tags := []string{}
+       tags = append(tags, fmt.Sprintf("json:\"%s\"", name))
+       gormTag := getGormTag(schema, goType)
+       if gormTag != "" {
+               tags = append(tags, gormTag)
+       }
+       if required {
+               tags = append(tags, "validate:\"required\"")
+       }
+       return reflect.StructTag(strings.Join(tags, " ")), nil
+}
+
+func getGormTag(schema utils.JsonObject, goType reflect.Type) string {
+       gormTags := []string{}
+       primaryKey, err := utils.GetProperty[bool](schema, "primaryKey")
+       if err == nil && primaryKey {
+               gormTags = append(gormTags, "primaryKey")
+       }
+       if goType == stringType {
+               maxLength, err := utils.GetProperty[float64](schema, 
"maxLength")
+               maxLengthInt := int(maxLength)
+               if err == nil {
+                       if maxLengthInt > 255 {
+                               gormTags = append(gormTags, "type:text")
+                       } else {
+                               gormTags = append(gormTags, 
fmt.Sprintf("type:varchar(%d)", maxLengthInt))
+                       }
+               } else if primaryKey {
+                       // primary keys must have a key length
+                       gormTags = append(gormTags, "type:varchar(255)")
+               } else {
+                       gormTags = append(gormTags, "type:text")
+               }
+       }
+       format, err := utils.GetProperty[string](schema, "format")
+       if err == nil && format == "password" {
+               gormTags = append(gormTags, "serializer:encdec")
+       }
+       if len(gormTags) == 0 {
+               return ""
+       }
+       return fmt.Sprintf("gorm:\"%s\"", strings.Join(gormTags, ";"))
 }
diff --git a/backend/server/services/remote/models/conversion_test.go 
b/backend/server/services/remote/models/conversion_test.go
new file mode 100644
index 000000000..1b0bc9c2c
--- /dev/null
+++ b/backend/server/services/remote/models/conversion_test.go
@@ -0,0 +1,153 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package models
+
+import (
+       "reflect"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestGenerateSimpleField(t *testing.T) {
+       schema := map[string]interface{}{
+               "type": "integer",
+       }
+       field, err := generateStructField("i", schema, true)
+       assert.NoError(t, err)
+       assert.Equal(t, int64Type, field.Type)
+       assert.Equal(t, "I", field.Name)
+       json, ok := field.Tag.Lookup("json")
+       assert.True(t, ok)
+       assert.Equal(t, "i", json)
+       validate, ok := field.Tag.Lookup("validate")
+       assert.True(t, ok)
+       assert.Equal(t, "required", validate)
+       _, ok = field.Tag.Lookup("gorm")
+       assert.False(t, ok)
+}
+
+func TestGetGoTypeInt64(t *testing.T) {
+       schema := map[string]interface{}{
+               "type": "integer",
+       }
+       typ, err := getGoType(schema, false)
+       assert.NoError(t, err)
+       assert.Equal(t, int64Type, typ)
+}
+
+func TestGetGoTypeFloat64(t *testing.T) {
+       schema := map[string]interface{}{
+               "type": "number",
+       }
+       typ, err := getGoType(schema, false)
+       assert.NoError(t, err)
+       assert.Equal(t, float64Type, typ)
+}
+
+func TestGetGoTypeBool(t *testing.T) {
+       schema := map[string]interface{}{
+               "type": "boolean",
+       }
+       typ, err := getGoType(schema, false)
+       assert.NoError(t, err)
+       assert.Equal(t, boolType, typ)
+}
+
+func TestGetGoTypeString(t *testing.T) {
+       schema := map[string]interface{}{
+               "type": "string",
+       }
+       typ, err := getGoType(schema, false)
+       assert.NoError(t, err)
+       assert.Equal(t, stringType, typ)
+}
+
+func TestGetGoTypeTime(t *testing.T) {
+       schema := map[string]interface{}{
+               "type":   "string",
+               "format": "date-time",
+       }
+       typ, err := getGoType(schema, true)
+       assert.NoError(t, err)
+       assert.Equal(t, timeType, typ)
+}
+
+func TestGetGoTypeTimePointer(t *testing.T) {
+       schema := map[string]interface{}{
+               "type":   "string",
+               "format": "date-time",
+       }
+       typ, err := getGoType(schema, false)
+       assert.NoError(t, err)
+       assert.Equal(t, reflect.PtrTo(timeType), typ)
+}
+
+func TestGetGoTypeJsonMap(t *testing.T) {
+       schema := map[string]interface{}{
+               "type": "object",
+       }
+       typ, err := getGoType(schema, false)
+       assert.NoError(t, err)
+       assert.Equal(t, jsonMapType, typ)
+}
+
+func TestGetGormTagPrimaryKey(t *testing.T) {
+       schema := map[string]interface{}{
+               "type":       "integer",
+               "primaryKey": true,
+       }
+       tag := getGormTag(schema, int64Type)
+       assert.Equal(t, "gorm:\"primaryKey\"", tag)
+}
+
+func TestGetGormTagVarChar(t *testing.T) {
+       schema := map[string]interface{}{
+               "type":      "string",
+               "maxLength": float64(100),
+       }
+       tag := getGormTag(schema, stringType)
+       assert.Equal(t, "gorm:\"type:varchar(100)\"", tag)
+}
+
+func TestGetGormTagText(t *testing.T) {
+       schema := map[string]interface{}{
+               "type":      "string",
+               "maxLength": float64(300),
+       }
+       tag := getGormTag(schema, stringType)
+       assert.Equal(t, "gorm:\"type:text\"", tag)
+}
+
+func TestGetGormTagStringPrimaryKey(t *testing.T) {
+       schema := map[string]interface{}{
+               "type":       "string",
+               "primaryKey": true,
+       }
+       tag := getGormTag(schema, stringType)
+       assert.Equal(t, "gorm:\"primaryKey;type:varchar(255)\"", tag)
+}
+
+func TestGetGormTagEncDec(t *testing.T) {
+       schema := map[string]interface{}{
+               "type":   "string",
+               "format": "password",
+       }
+       tag := getGormTag(schema, stringType)
+       assert.Equal(t, "gorm:\"type:text;serializer:encdec\"", tag)
+}
diff --git a/backend/server/services/remote/models/migration.go 
b/backend/server/services/remote/models/migration.go
new file mode 100644
index 000000000..7d980354d
--- /dev/null
+++ b/backend/server/services/remote/models/migration.go
@@ -0,0 +1,140 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package models
+
+import (
+       "encoding/json"
+
+       "github.com/apache/incubator-devlake/core/context"
+       "github.com/apache/incubator-devlake/core/dal"
+       "github.com/apache/incubator-devlake/core/errors"
+       "github.com/apache/incubator-devlake/core/plugin"
+)
+
+type Operation interface {
+       Execute(dal.Dal) errors.Error
+}
+
+type ExecuteOperation struct {
+       Sql     string  `json:"sql"`
+       Dialect *string `json:"dialect"`
+}
+
+func (o ExecuteOperation) Execute(dal dal.Dal) errors.Error {
+       if o.Dialect != nil {
+               if dal.Dialect() == *o.Dialect {
+                       return dal.Exec(o.Sql)
+               }
+               return nil
+       } else {
+               return dal.Exec(o.Sql)
+       }
+}
+
+var _ Operation = (*ExecuteOperation)(nil)
+
+type DropColumnOperation struct {
+       Table  string `json:"table"`
+       Column string `json:"column"`
+}
+
+func (o DropColumnOperation) Execute(dal dal.Dal) errors.Error {
+       return dal.DropColumns(o.Table, o.Column)
+}
+
+var _ Operation = (*DropColumnOperation)(nil)
+
+type DropTableOperation struct {
+       Table  string `json:"table"`
+       Column string `json:"column"`
+}
+
+func (o DropTableOperation) Execute(dal dal.Dal) errors.Error {
+       return dal.DropTables(o.Table)
+}
+
+var _ Operation = (*DropTableOperation)(nil)
+
+type RemoteMigrationScript struct {
+       operations []Operation
+       version    uint64
+       name       string
+}
+
+type rawRemoteMigrationScript struct {
+       Operations []json.RawMessage `json:"operations"`
+       Version    uint64            `json:"version"`
+       Name       string            `json:"name"`
+}
+
+func (s *RemoteMigrationScript) UnmarshalJSON(data []byte) error {
+       var rawScript rawRemoteMigrationScript
+       err := json.Unmarshal(data, &rawScript)
+       if err != nil {
+               return err
+       }
+       s.version = rawScript.Version
+       s.name = rawScript.Name
+       s.operations = make([]Operation, len(rawScript.Operations))
+       for i, operationRaw := range rawScript.Operations {
+               operationMap := make(map[string]interface{})
+               err := json.Unmarshal(operationRaw, &operationMap)
+               if err != nil {
+                       return err
+               }
+               operationType := operationMap["type"].(string)
+               var operation Operation
+               switch operationType {
+               case "execute":
+                       operation = &ExecuteOperation{}
+               case "drop_column":
+                       operation = &DropColumnOperation{}
+               case "drop_table":
+                       operation = &DropTableOperation{}
+               default:
+                       return errors.BadInput.New("unsupported operation type")
+               }
+               err = json.Unmarshal(operationRaw, operation)
+               if err != nil {
+                       return err
+               }
+               s.operations[i] = operation
+       }
+       return nil
+}
+
+func (s *RemoteMigrationScript) Up(basicRes context.BasicRes) errors.Error {
+       dal := basicRes.GetDal()
+       for _, operation := range s.operations {
+               err := operation.Execute(dal)
+               if err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
+func (s *RemoteMigrationScript) Version() uint64 {
+       return s.version
+}
+
+func (s *RemoteMigrationScript) Name() string {
+       return s.name
+}
+
+var _ plugin.MigrationScript = (*RemoteMigrationScript)(nil)
diff --git a/backend/server/services/remote/models/migration_test.go 
b/backend/server/services/remote/models/migration_test.go
new file mode 100644
index 000000000..163bfb238
--- /dev/null
+++ b/backend/server/services/remote/models/migration_test.go
@@ -0,0 +1,67 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package models
+
+import (
+       "encoding/json"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestUnmarshallMigrationScript(t *testing.T) {
+       raw := []byte(`{
+               "name": "test",
+               "version": 20230420123456,
+               "operations": [
+                       {
+                               "type": "execute",
+                               "sql": "SOME SQL",
+                               "dialect": "mysql"
+                       },
+                       {
+                               "type": "drop_column",
+                               "table": "t",
+                               "column": "c"
+                       },
+                       {
+                               "type": "drop_table",
+                               "table": "t"
+                       }
+               ]
+       }`)
+
+       var script RemoteMigrationScript
+       err := json.Unmarshal(raw, &script)
+
+       assert.NoError(t, err)
+       assert.Equal(t, "test", script.name)
+       assert.Equal(t, uint64(20230420123456), script.version)
+       assert.Len(t, script.operations, 3)
+
+       op1 := script.operations[0].(*ExecuteOperation)
+       assert.Equal(t, "SOME SQL", op1.Sql)
+       assert.Equal(t, "mysql", *op1.Dialect)
+
+       op2 := script.operations[1].(*DropColumnOperation)
+       assert.Equal(t, "t", op2.Table)
+       assert.Equal(t, "c", op2.Column)
+
+       op3 := script.operations[2].(*DropTableOperation)
+       assert.Equal(t, "t", op3.Table)
+}
diff --git a/backend/server/services/remote/models/models.go 
b/backend/server/services/remote/models/models.go
index b220114e1..c3bc47b51 100644
--- a/backend/server/services/remote/models/models.go
+++ b/backend/server/services/remote/models/models.go
@@ -24,30 +24,25 @@ import (
        "github.com/apache/incubator-devlake/core/plugin"
 )
 
-const (
-       PythonPoetryCmd PluginType      = "python-poetry"
-       PythonCmd       PluginType      = "python"
-       None            PluginExtension = ""
-       Metric          PluginExtension = "metric"
-       Datasource      PluginExtension = "datasource"
-)
+type PluginExtension string
 
-type (
-       PluginType      string
-       PluginExtension string
+const (
+       None       PluginExtension = ""
+       Metric     PluginExtension = "metric"
+       Datasource PluginExtension = "datasource"
 )
 
 type PluginInfo struct {
-       Type                        PluginType        `json:"type" 
validate:"required"`
-       Name                        string            `json:"name" 
validate:"required"`
-       Extension                   PluginExtension   `json:"extension"`
-       ConnectionModelInfo         *DynamicModelInfo 
`json:"connection_model_info" validate:"required"`
-       TransformationRuleModelInfo *DynamicModelInfo 
`json:"transformation_rule_model_info"`
-       ScopeModelInfo              *DynamicModelInfo `json:"scope_model_info" 
validate:"dive"`
-       Description                 string            `json:"description"`
-       PluginPath                  string            `json:"plugin_path" 
validate:"required"`
-       SubtaskMetas                []SubtaskMeta     `json:"subtask_metas" 
validate:"dive"`
-       Tables                      []string          `json:"tables"`
+       Name                        string                  `json:"name" 
validate:"required"`
+       Description                 string                  `json:"description"`
+       ConnectionModelInfo         *DynamicModelInfo       
`json:"connection_model_info" validate:"required"`
+       TransformationRuleModelInfo *DynamicModelInfo       
`json:"transformation_rule_model_info"`
+       ScopeModelInfo              *DynamicModelInfo       
`json:"scope_model_info" validate:"required"`
+       ToolModelInfos              []*DynamicModelInfo     
`json:"tool_model_infos"`
+       MigrationScripts            []RemoteMigrationScript 
`json:"migration_scripts"`
+       PluginPath                  string                  `json:"plugin_path" 
validate:"required"`
+       SubtaskMetas                []SubtaskMeta           
`json:"subtask_metas"`
+       Extension                   PluginExtension         `json:"extension"`
 }
 
 // Type aliases used by the API helper for better readability
@@ -62,8 +57,8 @@ type DynamicModelInfo struct {
        TableName  string         `json:"table_name" validate:"required"`
 }
 
-func (d DynamicModelInfo) LoadDynamicTabler(encrypt bool, parentModel any) 
(*models.DynamicTabler, errors.Error) {
-       return LoadTableModel(d.TableName, d.JsonSchema, encrypt, parentModel)
+func (d DynamicModelInfo) LoadDynamicTabler(parentModel any) 
(*models.DynamicTabler, errors.Error) {
+       return LoadTableModel(d.TableName, d.JsonSchema, parentModel)
 }
 
 type ScopeModel struct {
diff --git a/backend/server/services/remote/models/plugin_remote.go 
b/backend/server/services/remote/models/plugin_remote.go
index e039912a6..1d0c0f57b 100644
--- a/backend/server/services/remote/models/plugin_remote.go
+++ b/backend/server/services/remote/models/plugin_remote.go
@@ -29,5 +29,6 @@ type RemotePlugin interface {
        plugin.PluginMeta
        plugin.PluginOpenApiSpec
        plugin.PluginModel
-       RunMigrations(forceMigrate bool) errors.Error
+       plugin.PluginMigration
+       RunAutoMigrations() errors.Error
 }
diff --git a/backend/server/services/remote/plugin/plugin_impl.go 
b/backend/server/services/remote/plugin/plugin_impl.go
index 259d2300c..02a613748 100644
--- a/backend/server/services/remote/plugin/plugin_impl.go
+++ b/backend/server/services/remote/plugin/plugin_impl.go
@@ -19,6 +19,7 @@ package plugin
 
 import (
        "fmt"
+       "strings"
 
        "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
@@ -41,9 +42,10 @@ type (
                connectionTabler         *coreModels.DynamicTabler
                scopeTabler              *coreModels.DynamicTabler
                transformationRuleTabler *coreModels.DynamicTabler
+               toolModelTablers         []*coreModels.DynamicTabler
+               migrationScripts         []plugin.MigrationScript
                resources                
map[string]map[string]plugin.ApiResourceHandler
                openApiSpec              string
-               tables                   []dal.Tabler
        }
        RemotePluginTaskData struct {
                DbUrl              string                 `json:"db_url"`
@@ -55,26 +57,39 @@ type (
 )
 
 func newPlugin(info *models.PluginInfo, invoker bridge.Invoker) 
(*remotePluginImpl, errors.Error) {
-       connectionTabler, err := 
info.ConnectionModelInfo.LoadDynamicTabler(true, common.Model{})
+       connectionTabler, err := 
info.ConnectionModelInfo.LoadDynamicTabler(common.Model{})
        if err != nil {
                return nil, errors.Default.Wrap(err, fmt.Sprintf("Couldn't load 
Connection type for plugin %s", info.Name))
        }
 
        var txRuleTabler *coreModels.DynamicTabler
        if info.TransformationRuleModelInfo != nil {
-               txRuleTabler, err = 
info.TransformationRuleModelInfo.LoadDynamicTabler(false, 
models.TransformationModel{})
+               txRuleTabler, err = 
info.TransformationRuleModelInfo.LoadDynamicTabler(models.TransformationModel{})
                if err != nil {
                        return nil, errors.Default.Wrap(err, 
fmt.Sprintf("Couldn't load TransformationRule type for plugin %s", info.Name))
                }
        }
-       scopeTabler, err := info.ScopeModelInfo.LoadDynamicTabler(false, 
models.ScopeModel{})
+       scopeTabler, err := 
info.ScopeModelInfo.LoadDynamicTabler(models.ScopeModel{})
        if err != nil {
                return nil, errors.Default.Wrap(err, fmt.Sprintf("Couldn't load 
Scope type for plugin %s", info.Name))
        }
+       toolModelTablers := make([]*coreModels.DynamicTabler, 
len(info.ToolModelInfos))
+       for i, toolModelInfo := range info.ToolModelInfos {
+               toolModelTabler, err := 
toolModelInfo.LoadDynamicTabler(common.NoPKModel{})
+               if err != nil {
+                       return nil, errors.Default.Wrap(err, 
fmt.Sprintf("Couldn't load ToolModel type for plugin %s", info.Name))
+               }
+               toolModelTablers[i] = toolModelTabler
+       }
        openApiSpec, err := doc.GenerateOpenApiSpec(info)
        if err != nil {
                return nil, errors.Default.Wrap(err, fmt.Sprintf("Couldn't 
generate OpenAPI spec for plugin %s", info.Name))
        }
+       scripts := make([]plugin.MigrationScript, 0)
+       for _, script := range info.MigrationScripts {
+               script := script
+               scripts = append(scripts, &script)
+       }
        p := remotePluginImpl{
                name:                     info.Name,
                invoker:                  invoker,
@@ -83,6 +98,8 @@ func newPlugin(info *models.PluginInfo, invoker 
bridge.Invoker) (*remotePluginIm
                connectionTabler:         connectionTabler,
                scopeTabler:              scopeTabler,
                transformationRuleTabler: txRuleTabler,
+               toolModelTablers:         toolModelTablers,
+               migrationScripts:         scripts,
                resources:                GetDefaultAPI(invoker, 
connectionTabler, txRuleTabler, scopeTabler, connectionHelper),
                openApiSpec:              *openApiSpec,
        }
@@ -97,9 +114,6 @@ func newPlugin(info *models.PluginInfo, invoker 
bridge.Invoker) (*remotePluginIm
                        DomainTypes:      subtask.DomainTypes,
                })
        }
-       for _, tableName := range info.Tables {
-               p.tables = append(p.tables, 
coreModels.NewDynamicTabler(tableName, nil))
-       }
        return &p, nil
 }
 
@@ -108,7 +122,13 @@ func (p *remotePluginImpl) SubTaskMetas() 
[]plugin.SubTaskMeta {
 }
 
 func (p *remotePluginImpl) GetTablesInfo() []dal.Tabler {
-       return p.tables
+       tables := make([]dal.Tabler, 0)
+       for _, toolModelTabler := range p.toolModelTablers {
+               tables = append(tables, toolModelTabler)
+               rawTableName := strings.Replace(toolModelTabler.TableName(), 
"_tool_", "_raw_", 1)
+               tables = append(tables, 
coreModels.NewDynamicTabler(rawTableName, nil))
+       }
+       return tables
 }
 
 func (p *remotePluginImpl) PrepareTaskData(taskCtx plugin.TaskContext, options 
map[string]interface{}) (interface{}, errors.Error) {
@@ -191,28 +211,37 @@ func (p *remotePluginImpl) ApiResources() 
map[string]map[string]plugin.ApiResour
        return p.resources
 }
 
-func (p *remotePluginImpl) RunMigrations(forceMigrate bool) errors.Error {
-       err := api.CallDB(basicRes.GetDal().AutoMigrate, 
p.connectionTabler.New())
+func (p *remotePluginImpl) RunAutoMigrations() errors.Error {
+       db := basicRes.GetDal()
+       err := api.CallDB(db.AutoMigrate, p.connectionTabler.New())
        if err != nil {
                return err
        }
-       err = api.CallDB(basicRes.GetDal().AutoMigrate, p.scopeTabler.New())
+       err = api.CallDB(db.AutoMigrate, p.scopeTabler.New())
        if err != nil {
                return err
        }
        if p.transformationRuleTabler != nil {
-               err = api.CallDB(basicRes.GetDal().AutoMigrate, 
p.transformationRuleTabler.New())
+               err = api.CallDB(db.AutoMigrate, 
p.transformationRuleTabler.New())
                if err != nil {
                        return err
                }
        }
-       dbUrl := basicRes.GetConfig("db_url")
-       err = p.invoker.Call("run-migrations", bridge.DefaultContext, dbUrl, 
forceMigrate).Err
-       return err
+       for _, toolModelTabler := range p.toolModelTablers {
+               err = api.CallDB(db.AutoMigrate, toolModelTabler.New())
+               if err != nil {
+                       return err
+               }
+       }
+       return nil
 }
 
 func (p *remotePluginImpl) OpenApiSpec() string {
        return p.openApiSpec
 }
 
+func (p *remotePluginImpl) MigrationScripts() []plugin.MigrationScript {
+       return p.migrationScripts
+}
+
 var _ models.RemotePlugin = (*remotePluginImpl)(nil)

Reply via email to