This is an automated email from the ASF dual-hosted git repository.

hez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 9fba2d006 5081 fix missing records azuredevops (#5138)
9fba2d006 is described below

commit 9fba2d006bb5f3a7033feb6ca8e6be6000b23718
Author: Camille Teruel <[email protected]>
AuthorDate: Thu May 11 20:33:09 2023 +0200

    5081 fix missing records azuredevops (#5138)
    
    * refactor: Move db_url parsing and engine creation out of Context
    
    * feat: Implement tool layer table migration
    
    * fix: Make build_id a primary key to prevent duplicate
    
    A job id returned by the "timeline" endpoint is not unique as it is the id 
of the job definition
    not of the job execution.
    Add build_id as a primary key to prevent overwriting.
    
    * fix: build fake python plugin before e2e tests
    
    * fix: Fix enum name conflict
    
    ---------
    
    Co-authored-by: Camille Teruel <[email protected]>
---
 backend/Makefile                                   |  3 ++-
 backend/python/README.md                           | 19 +++++++++++++
 .../plugins/azuredevops/azuredevops/models.py      | 29 ++++++++++++--------
 .../azuredevops/azuredevops/streams/builds.py      | 18 ++++++-------
 .../azuredevops/azuredevops/streams/jobs.py        | 18 ++++++-------
 .../azuredevops/streams/pull_requests.py           |  6 ++---
 .../python/plugins/azuredevops/tests/__init__.py   | 14 ++++++++++
 backend/python/pydevlake/pydevlake/context.py      | 31 ++--------------------
 backend/python/pydevlake/pydevlake/ipc.py          | 25 ++++++++++++++---
 backend/python/pydevlake/pydevlake/model.py        | 21 ++++++++++++++-
 backend/python/pydevlake/pydevlake/plugin.py       | 25 ++++++++++++++---
 backend/python/pydevlake/pydevlake/stream.py       |  3 ++-
 backend/python/pydevlake/pydevlake/subtasks.py     | 17 +++---------
 .../python/pydevlake/pydevlake/testing/testing.py  |  6 +++--
 backend/python/pydevlake/tests/stream_test.py      | 12 ++++++---
 .../server/services/remote/plugin/plugin_impl.go   |  3 ++-
 16 files changed, 159 insertions(+), 91 deletions(-)

diff --git a/backend/Makefile b/backend/Makefile
index d07fb228d..2e3d9e50c 100644
--- a/backend/Makefile
+++ b/backend/Makefile
@@ -53,6 +53,7 @@ build-server: swag
 build-python: #don't mix this with the other build commands
        find ./python/ -name "*.sh" | xargs chmod +x &&\
        sh python/build.sh
+       sh python/build.sh python/test
 
 build: build-plugin build-server
 
@@ -92,7 +93,7 @@ build-pydevlake:
        poetry install -C python/pydevlake
 
 python-unit-test: build-pydevlake
-       sh python/build.sh test &&\
+       sh python/build.sh python/test &&\
        sh ./python/run_tests.sh
 
 e2e-plugins-test:
diff --git a/backend/python/README.md b/backend/python/README.md
index db99ee5f7..a68cca8ea 100644
--- a/backend/python/README.md
+++ b/backend/python/README.md
@@ -232,6 +232,25 @@ Do not forget `table=True`, otherwise no table will be 
created in the database.
 
 To facilitate or even eliminate extraction, your tool models should be close 
to the raw data you collect. Note that if you collect data from a JSON REST API 
that uses camelCased properties, you can still define snake_cased attributes in 
your model. The camelCased attributes aliases will be generated, so no special 
care is needed during extraction.
 
+#### Migration of tool models
+
+Tool models, connection, scope and transformation rule types are stored in the 
DevLake database.
+When you change the definition of one of those types, you need to migrate the 
database.
+You should implement the migration logic in the model class by defining a 
`migrate` class method. This method takes a sqlalchemy session as argument that 
you can use to
+execute SQL `ALTER TABLE` statements.
+
+```python
+class User(ToolModel, table=True):
+    id: str = Field(primary_key=True)
+    name: str
+    email: str
+    age: int
+
+    @classmethod
+    def migrate(cls, session):
+        session.execute(f"ALTER TABLE {cls.__tablename__} ADD COLUMN age INT")
+```
+
 
 ### Create the stream class
 
diff --git a/backend/python/plugins/azuredevops/azuredevops/models.py 
b/backend/python/plugins/azuredevops/azuredevops/models.py
index 35fc0e614..6d7f231f6 100644
--- a/backend/python/plugins/azuredevops/azuredevops/models.py
+++ b/backend/python/plugins/azuredevops/azuredevops/models.py
@@ -18,6 +18,8 @@ from enum import Enum
 from typing import Optional
 import re
 
+from sqlmodel import Session
+
 from pydevlake import Field, Connection, TransformationRule
 from pydevlake.model import ToolModel, ToolScope
 from pydevlake.pipeline_tasks import RefDiffOptions
@@ -44,14 +46,14 @@ class GitRepository(ToolScope, table=True):
 
 
 class GitPullRequest(ToolModel, table=True):
-    class Status(Enum):
+    class PRStatus(Enum):
         Abandoned = "abandoned"
         Active = "active"
         Completed = "completed"
 
     pull_request_id: int = Field(primary_key=True)
     description: Optional[str]
-    status: Status
+    status: PRStatus
     created_by_id: str = Field(source='/createdBy/id')
     created_by_name: str = Field(source='/createdBy/displayName')
     creation_date: datetime.datetime
@@ -76,14 +78,14 @@ class GitPullRequestCommit(ToolModel, table=True):
 
 
 class Build(ToolModel, table=True):
-    class Status(Enum):
+    class BuildStatus(Enum):
         Cancelling = "cancelling"
         Completed = "completed"
         InProgress = "inProgress"
         NotStarted = "notStarted"
         Postponed = "postponed"
 
-    class Result(Enum):
+    class BuildResult(Enum):
         Canceled = "canceled"
         Failed = "failed"
         Non = "none"
@@ -94,19 +96,19 @@ class Build(ToolModel, table=True):
     name: str = Field(source='/definition/name')
     start_time: Optional[datetime.datetime]
     finish_time: Optional[datetime.datetime]
-    status: Status
-    result: Result
+    status: BuildStatus
+    result: BuildResult
     source_branch: str
     source_version: str
 
 
 class Job(ToolModel, table=True):
-    class State(Enum):
+    class JobState(Enum):
         Completed = "completed"
         InProgress = "inProgress"
         Pending = "pending"
 
-    class Result(Enum):
+    class JobResult(Enum):
         Abandoned = "abandoned"
         Canceled = "canceled"
         Failed = "failed"
@@ -115,9 +117,14 @@ class Job(ToolModel, table=True):
         SucceededWithIssues = "succeededWithIssues"
 
     id: str = Field(primary_key=True)
-    build_id: str
+    build_id: str = Field(primary_key=True)
     name: str
     startTime: datetime.datetime
     finishTime: datetime.datetime
-    state: State
-    result: Result
+    state: JobState
+    result: JobResult
+
+    @classmethod
+    def migrate(self, session: Session):
+        session.execute(f'ALTER TABLE {self.__tablename__} DROP PRIMARY KEY')
+        session.execute(f'ALTER TABLE {self.__tablename__} ADD PRIMARY KEY 
(id, build_id)')
diff --git a/backend/python/plugins/azuredevops/azuredevops/streams/builds.py 
b/backend/python/plugins/azuredevops/azuredevops/streams/builds.py
index 006d3315a..97687f80a 100644
--- a/backend/python/plugins/azuredevops/azuredevops/streams/builds.py
+++ b/backend/python/plugins/azuredevops/azuredevops/streams/builds.py
@@ -37,25 +37,25 @@ class Builds(Stream):
 
     def convert(self, b: Build, ctx: Context):
         result = None
-        if b.result == Build.Result.Canceled:
+        if b.result == Build.BuildResult.Canceled:
             result = devops.CICDResult.ABORT
-        elif b.result == Build.Result.Failed:
+        elif b.result == Build.BuildResult.Failed:
             result = devops.CICDResult.FAILURE
-        elif b.result == Build.Result.PartiallySucceeded:
+        elif b.result == Build.BuildResult.PartiallySucceeded:
             result = devops.CICDResult.SUCCESS
-        elif b.result ==  Build.Result.Succeeded:
+        elif b.result ==  Build.BuildResult.Succeeded:
             result = devops.CICDResult.SUCCESS
 
         status = None
-        if b.status == Build.Status.Cancelling:
+        if b.status == Build.BuildStatus.Cancelling:
             status = devops.CICDStatus.DONE
-        elif b.status == Build.Status.Completed:
+        elif b.status == Build.BuildStatus.Completed:
             status = devops.CICDStatus.DONE
-        elif b.status ==  Build.Status.InProgress:
+        elif b.status ==  Build.BuildStatus.InProgress:
             status = devops.CICDStatus.IN_PROGRESS
-        elif b.status == Build.Status.NotStarted:
+        elif b.status == Build.BuildStatus.NotStarted:
             status = devops.CICDStatus.IN_PROGRESS
-        elif b.status ==  Build.Status.Postponed:
+        elif b.status ==  Build.BuildStatus.Postponed:
             status = devops.CICDStatus.IN_PROGRESS
 
         type = devops.CICDType.BUILD
diff --git a/backend/python/plugins/azuredevops/azuredevops/streams/jobs.py 
b/backend/python/plugins/azuredevops/azuredevops/streams/jobs.py
index 49676d97a..3ef53cfdf 100644
--- a/backend/python/plugins/azuredevops/azuredevops/streams/jobs.py
+++ b/backend/python/plugins/azuredevops/azuredevops/streams/jobs.py
@@ -38,25 +38,25 @@ class Jobs(Substream):
 
     def convert(self, j: Job, ctx: Context) -> Iterable[devops.CICDPipeline]:
         result = None
-        if j.result == Job.Result.Abandoned:
+        if j.result == Job.JobResult.Abandoned:
             result = devops.CICDResult.ABORT
-        elif j.result == Job.Result.Canceled:
+        elif j.result == Job.JobResult.Canceled:
             result = devops.CICDResult.ABORT
-        elif j.result == Job.Result.Failed:
+        elif j.result == Job.JobResult.Failed:
             result = devops.CICDResult.FAILURE
-        elif j.result == Job.Result.Skipped:
+        elif j.result == Job.JobResult.Skipped:
             result = devops.CICDResult.ABORT
-        elif j.result == Job.Result.Succeeded:
+        elif j.result == Job.JobResult.Succeeded:
             result = devops.CICDResult.SUCCESS
-        elif j.result == Job.Result.SucceededWithIssues:
+        elif j.result == Job.JobResult.SucceededWithIssues:
             result = devops.CICDResult.FAILURE
 
         status = None
-        if j.state == Job.State.Completed:
+        if j.state == Job.JobState.Completed:
             status = devops.CICDStatus.DONE
-        elif j.state == Job.State.InProgress:
+        elif j.state == Job.JobState.InProgress:
             status = devops.CICDStatus.IN_PROGRESS
-        if j.state == Job.State.Pending:
+        if j.state == Job.JobState.Pending:
             status = devops.CICDStatus.IN_PROGRESS
 
         type = devops.CICDType.BUILD
diff --git 
a/backend/python/plugins/azuredevops/azuredevops/streams/pull_requests.py 
b/backend/python/plugins/azuredevops/azuredevops/streams/pull_requests.py
index 2d9485e14..2d32ff091 100644
--- a/backend/python/plugins/azuredevops/azuredevops/streams/pull_requests.py
+++ b/backend/python/plugins/azuredevops/azuredevops/streams/pull_requests.py
@@ -39,11 +39,11 @@ class GitPullRequests(Stream):
 
         # Use the same status values as GitHub plugin
         status = None
-        if pr.status == GitPullRequest.Status.Abandoned:
+        if pr.status == GitPullRequest.PRStatus.Abandoned:
             status = 'CLOSED'
-        elif pr.status == GitPullRequest.Status.Active:
+        elif pr.status == GitPullRequest.PRStatus.Active:
             status = 'OPEN'
-        elif pr.status == GitPullRequest.Status.Completed:
+        elif pr.status == GitPullRequest.PRStatus.Completed:
             status = 'MERGED'
 
         yield code.PullRequest(
diff --git a/backend/python/plugins/azuredevops/tests/__init__.py 
b/backend/python/plugins/azuredevops/tests/__init__.py
new file mode 100644
index 000000000..65d64ce95
--- /dev/null
+++ b/backend/python/plugins/azuredevops/tests/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/backend/python/pydevlake/pydevlake/context.py 
b/backend/python/pydevlake/pydevlake/context.py
index 660ff8847..a9d5f4d11 100644
--- a/backend/python/pydevlake/pydevlake/context.py
+++ b/backend/python/pydevlake/pydevlake/context.py
@@ -14,52 +14,25 @@
 # limitations under the License.
 
 
-from urllib.parse import urlparse, parse_qsl
-
 from sqlalchemy.engine import Engine
-from sqlmodel import SQLModel, create_engine
 
 from pydevlake.model import Connection, TransformationRule, ToolScope
 
 
 class Context:
     def __init__(self,
-                 db_url: str,
+                 engine: Engine,
                  scope: ToolScope,
                  connection: Connection,
                  transformation_rule: TransformationRule = None,
                  options: dict = None):
-        self.db_url = db_url
+        self.engine = engine
         self.scope = scope
         self.connection = connection
         self.transformation_rule = transformation_rule
         self.options = options or {}
         self._engine = None
 
-    @property
-    def engine(self) -> Engine:
-        if not self._engine:
-            db_url, args = self.get_engine_db_url()
-            try:
-                self._engine = create_engine(db_url, connect_args=args)
-                SQLModel.metadata.create_all(self._engine)
-            except Exception as e:
-                raise Exception(f"Unable to make a database connection") from e
-        return self._engine
-
     @property
     def incremental(self) -> bool:
         return self.options.get('incremental') is True
-
-    def get_engine_db_url(self) -> tuple[str, dict[str, any]]:
-        db_url = self.db_url
-        if not db_url:
-            raise Exception("Missing db_url setting")
-        db_url = db_url.replace("postgres://", "postgresql://")
-        db_url = db_url.split('?')[0]
-        # `parseTime` parameter is not understood by MySQL driver,
-        # so we have to parse query args to remove it
-        connect_args = dict(parse_qsl(urlparse(self.db_url).query))
-        if 'parseTime' in connect_args:
-            del connect_args['parseTime']
-        return db_url, connect_args
diff --git a/backend/python/pydevlake/pydevlake/ipc.py 
b/backend/python/pydevlake/pydevlake/ipc.py
index 7acd95391..e59065af0 100644
--- a/backend/python/pydevlake/pydevlake/ipc.py
+++ b/backend/python/pydevlake/pydevlake/ipc.py
@@ -19,7 +19,10 @@ import json
 from functools import wraps
 from typing import Generator, TextIO, Optional
 
+from urllib.parse import urlparse, parse_qsl
 from fire.decorators import SetParseFn
+from sqlmodel import create_engine
+from sqlalchemy.engine import Engine
 
 from pydevlake.context import Context
 from pydevlake.message import Message
@@ -98,8 +101,8 @@ class PluginCommands:
         return self._plugin.make_pipeline(scope_tx_rule_pairs, entities, 
connection)
 
     @plugin_method
-    def run_migrations(self, force: bool):
-        self._plugin.run_migrations(force)
+    def run_migrations(self, db_url, force: bool):
+        self._plugin.run_migrations(create_db_engine(db_url), force)
 
     @plugin_method
     def plugin_info(self):
@@ -122,4 +125,20 @@ class PluginCommands:
         else:
             transformation_rule = None
         options = data.get('options', {})
-        return Context(db_url, scope, connection, transformation_rule, options)
+        return Context(create_db_engine(db_url), scope, connection, 
transformation_rule, options)
+
+def create_db_engine(db_url) -> Engine:
+    # SQLAlchemy doesn't understand postgres:// scheme
+    db_url = db_url.replace("postgres://", "postgresql://")
+    # Remove query args
+    base_url = db_url.split('?')[0]
+    # `parseTime` parameter is not understood by MySQL driver,
+    # so we have to parse query args to remove it
+    connect_args = dict(parse_qsl(urlparse(db_url).query))
+    if 'parseTime' in connect_args:
+        del connect_args['parseTime']
+    try:
+        engine = create_engine(base_url, connect_args=connect_args)
+        return engine
+    except Exception as e:
+        raise Exception(f"Unable to make a database connection") from e
diff --git a/backend/python/pydevlake/pydevlake/model.py 
b/backend/python/pydevlake/pydevlake/model.py
index 135e10c0a..995c13b3a 100644
--- a/backend/python/pydevlake/pydevlake/model.py
+++ b/backend/python/pydevlake/pydevlake/model.py
@@ -22,7 +22,7 @@ from datetime import datetime
 import inflect
 from pydantic import AnyUrl, validator
 from sqlalchemy import Column, DateTime
-from sqlalchemy.orm import declared_attr
+from sqlalchemy.orm import declared_attr, Session
 from sqlalchemy.inspection import inspect
 from sqlmodel import SQLModel, Field
 
@@ -56,6 +56,13 @@ class ToolTable(SQLModel):
             parts = attr_name.split('_')
             return parts[0] + ''.join(word.capitalize() for word in parts[1:])
 
+    @classmethod
+    def migrate(cls, session: Session):
+        """
+        Redefine this method to perform migration on this tool model.
+        """
+        pass
+
 
 class Connection(ToolTable, Model):
     name: str
@@ -163,3 +170,15 @@ def _get_plugin_name(cls):
     # that is not a python module
     depth = len(module.__name__.split('.')) + 1
     return path_segments[-depth]
+
+
+class SubtaskRun(SQLModel, table=True):
+    """
+    Table storing information about the execution of subtasks.
+    """
+    id: Optional[int] = Field(primary_key=True)
+    subtask_name: str
+    connection_id: int
+    started: datetime
+    completed: Optional[datetime]
+    state: str # JSON encoded dict of atomic values
diff --git a/backend/python/pydevlake/pydevlake/plugin.py 
b/backend/python/pydevlake/pydevlake/plugin.py
index d14b9ad5d..4958e729e 100644
--- a/backend/python/pydevlake/pydevlake/plugin.py
+++ b/backend/python/pydevlake/pydevlake/plugin.py
@@ -20,13 +20,16 @@ import os
 import sys
 
 import fire
+from sqlmodel import SQLModel, Session
+from sqlalchemy.inspection import inspect
+from sqlalchemy.engine import Engine
 
 import pydevlake.message as msg
 from pydevlake.subtasks import Subtask
 from pydevlake.ipc import PluginCommands
 from pydevlake.context import Context
 from pydevlake.stream import Stream, DomainType
-from pydevlake.model import ToolScope, DomainScope, Connection, 
TransformationRule
+from pydevlake.model import ToolScope, DomainScope, Connection, 
TransformationRule, SubtaskRun
 
 
 ScopeTxRulePair = tuple[ToolScope, Optional[TransformationRule]]
@@ -102,9 +105,23 @@ class Plugin(ABC):
     def convert(self, ctx: Context, stream: str):
         yield from self.get_stream(stream).convertor.run(ctx)
 
-    def run_migrations(self, force: bool):
-        # TODO: Create tables
-        pass
+    def run_migrations(self, engine: Engine, force: bool = False):
+        # NOTE: Not sure what "force" is for
+        # TODO: Support migration for transformation rule and connection tables
+        # They are currently created on go-side.
+        tool_models = [stream.tool_model for stream in self._streams.values()]
+        tool_models.append(self.tool_scope_type)
+        inspector = inspect(engine)
+        tables = SQLModel.metadata.tables
+        with Session(engine) as session:
+            for model in tool_models:
+                if inspector.has_table(model.__tablename__):
+                    # TODO: Add version table and migrate if needed
+                    model.migrate(session)
+                else:
+                    tables[model.__tablename__].create(engine)
+            session.commit()
+        tables[SubtaskRun.__tablename__].create(engine, checkfirst=True)
 
     def make_remote_scopes(self, connection: Connection, group_id: 
Optional[str] = None) -> msg.RemoteScopes:
         if group_id:
diff --git a/backend/python/pydevlake/pydevlake/stream.py 
b/backend/python/pydevlake/pydevlake/stream.py
index d6df9f7f4..ae7e421bd 100644
--- a/backend/python/pydevlake/pydevlake/stream.py
+++ b/backend/python/pydevlake/pydevlake/stream.py
@@ -80,7 +80,8 @@ class Stream:
             __tablename__ = table_name
 
         self._raw_model = StreamRawModel
-        RawModel.metadata.create_all(session.get_bind())
+        table = RawModel.metadata.tables[table_name]
+        table.create(session.get_bind(), checkfirst=True)
         return self._raw_model
 
     def collect(self, state, context) -> Iterable[tuple[object, dict]]:
diff --git a/backend/python/pydevlake/pydevlake/subtasks.py 
b/backend/python/pydevlake/pydevlake/subtasks.py
index fb5b6c4fa..c343c359f 100644
--- a/backend/python/pydevlake/pydevlake/subtasks.py
+++ b/backend/python/pydevlake/pydevlake/subtasks.py
@@ -17,13 +17,13 @@
 from abc import abstractmethod
 import json
 from datetime import datetime
-from typing import Tuple, Dict, Iterable, Optional, Generator
+from typing import Tuple, Dict, Iterable, Generator
 
 
 import sqlalchemy.sql as sql
-from sqlmodel import Session, SQLModel, Field, select
+from sqlmodel import Session, select
 
-from pydevlake.model import RawModel, ToolModel, DomainModel
+from pydevlake.model import RawModel, ToolModel, DomainModel, SubtaskRun
 from pydevlake.context import Context
 from pydevlake.message import RemoteProgress
 from pydevlake import logger
@@ -132,17 +132,6 @@ class Subtask:
             "scope_id": ctx.scope.id
         })
 
-class SubtaskRun(SQLModel, table=True):
-    """
-    Table storing information about the execution of subtasks.
-    """
-    id: Optional[int] = Field(primary_key=True)
-    subtask_name: str
-    connection_id: int
-    started: datetime
-    completed: Optional[datetime]
-    state: str # JSON encoded dict of atomic values
-
 
 class Collector(Subtask):
     @property
diff --git a/backend/python/pydevlake/pydevlake/testing/testing.py 
b/backend/python/pydevlake/pydevlake/testing/testing.py
index c690e95ed..41ac87ac2 100644
--- a/backend/python/pydevlake/pydevlake/testing/testing.py
+++ b/backend/python/pydevlake/pydevlake/testing/testing.py
@@ -17,6 +17,8 @@ import pytest
 
 from typing import Union, Type, Iterable, Generator, Optional
 
+from sqlmodel import create_engine
+
 from pydevlake.context import Context
 from pydevlake.plugin import Plugin
 from pydevlake.message import RemoteScopeGroup, PipelineTask
@@ -49,7 +51,7 @@ class ContextBuilder:
 
     def build(self):
         return Context(
-            db_url='sqlite:///:memory:',
+            engine=create_engine('sqlite:///:memory:'),
             scope=self.scope,
             connection=self.connection,
             transformation_rule=self.transformation_rule
@@ -78,7 +80,7 @@ def assert_stream_run(stream: Stream, connection: Connection, 
scope: ToolScope,
     """
     Test that a stream can run all 3 steps without error.
     """
-    ctx = Context(db_url='sqlite:///:memory:', connection=connection, 
scope=scope, transformation_rule=transformation_rule)
+    ctx = 
ContextBuilder().with_connection(connection).with_scope(scope).with_transformation_rule(transformation_rule).build()
     stream.collector.run(ctx)
     stream.extractor.run(ctx)
     stream.convertor.run(ctx)
diff --git a/backend/python/pydevlake/tests/stream_test.py 
b/backend/python/pydevlake/tests/stream_test.py
index 6fe8f8669..d1caee668 100644
--- a/backend/python/pydevlake/tests/stream_test.py
+++ b/backend/python/pydevlake/tests/stream_test.py
@@ -17,7 +17,7 @@
 import json
 
 import pytest
-from sqlmodel import Session, Field
+from sqlmodel import SQLModel, Session, Field, create_engine
 
 from pydevlake import Stream, Connection, Context, DomainType
 from pydevlake.model import ToolModel, DomainModel, ToolScope
@@ -58,6 +58,12 @@ class DummyConnection(Connection):
     raw_data: list[dict]
 
 
[email protected]
+def engine():
+    engine = create_engine("sqlite+pysqlite:///:memory:")
+    SQLModel.metadata.create_all(engine)
+    return engine
+
 @pytest.fixture
 def raw_data():
     return [
@@ -77,9 +83,9 @@ def scope():
 
 
 @pytest.fixture
-def ctx(connection, scope):
+def ctx(connection, scope, engine):
     return Context(
-        db_url="sqlite+pysqlite:///:memory:",
+        engine=engine,
         scope=scope,
         connection=connection,
         options={}
diff --git a/backend/server/services/remote/plugin/plugin_impl.go 
b/backend/server/services/remote/plugin/plugin_impl.go
index e8f563690..e46bd3157 100644
--- a/backend/server/services/remote/plugin/plugin_impl.go
+++ b/backend/server/services/remote/plugin/plugin_impl.go
@@ -198,7 +198,8 @@ func (p *remotePluginImpl) RunMigrations(forceMigrate bool) 
errors.Error {
                        return err
                }
        }
-       err = p.invoker.Call("run-migrations", bridge.DefaultContext, 
forceMigrate).Err
+       dbUrl := basicRes.GetConfig("db_url")
+       err = p.invoker.Call("run-migrations", bridge.DefaultContext, dbUrl, 
forceMigrate).Err
        return err
 }
 

Reply via email to