This is an automated email from the ASF dual-hosted git repository. lynwee pushed a commit to branch dev-1 in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
commit 81b5f3f6554a20f231b1258ef5148679f03f3fc7 Author: d4x1 <[email protected]> AuthorDate: Thu Sep 12 11:28:37 2024 +0800 fix(pydevlake): fix testing utils --- .../python/pydevlake/pydevlake/testing/testing.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/backend/python/pydevlake/pydevlake/testing/testing.py b/backend/python/pydevlake/pydevlake/testing/testing.py index 3b1ec9617..977511047 100644 --- a/backend/python/pydevlake/pydevlake/testing/testing.py +++ b/backend/python/pydevlake/pydevlake/testing/testing.py @@ -13,17 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -import pytest - from typing import Union, Type, Iterable, Generator, Optional +import pytest from pydantic import ValidationError from sqlmodel import create_engine from pydevlake.context import Context -from pydevlake.plugin import Plugin from pydevlake.message import RemoteScopeGroup, PipelineTask from pydevlake.model import DomainModel, Connection, DomainScope, ToolModel, ToolScope, ScopeConfig, DomainType +from pydevlake.plugin import Plugin from pydevlake.stream import Stream @@ -64,8 +63,8 @@ def make_context(connection, scope, scope_config): def assert_stream_convert(plugin: Union[Plugin, Type[Plugin]], stream_name: str, - raw: dict, expected: Union[DomainModel, Iterable[DomainModel]], - ctx=None): + raw: dict, expected: Union[DomainModel, Iterable[DomainModel]], + ctx=None): if isinstance(plugin, type): plugin = plugin() stream = plugin.get_stream(stream_name) @@ -178,10 +177,12 @@ def assert_valid_remote_scopes(plugin: Plugin, connection: Connection, group_id: return tool_scopes -def assert_valid_pipeline_plan(plugin: Plugin, connection: Connection, tool_scope: ToolScope, scope_config: ScopeConfig) -> list[list[PipelineTask]]: +def assert_valid_pipeline_plan(plugin: Plugin, connection: Connection, tool_scope: ToolScope, scope_config: ScopeConfig, + skip_collectors: bool) -> list[list[PipelineTask]]: plan = plugin.make_pipeline_plan( [(tool_scope, scope_config)], - connection + connection, + skip_collectors, ) assert len(plan) > 0, 'Pipeline plan has no stage' for stage in plan: @@ -198,14 +199,15 @@ def assert_valid_plugin(plugin: Plugin): assert_valid_streams(plugin) -def assert_plugin_run(plugin: Plugin, connection: Connection, scope_config: Optional[ScopeConfig] = None): +def assert_plugin_run(plugin: Plugin, connection: Connection, scope_config: Optional[ScopeConfig] = None, + skip_collectors: bool = False): assert_valid_plugin(plugin) assert_valid_connection(plugin, connection) groups = assert_valid_remote_scope_groups(plugin, connection) scope = assert_valid_remote_scopes(plugin, connection, groups[0].id)[0] assert_valid_domain_scopes(plugin, scope) scope_config = scope_config or plugin.scope_config_type() - assert_valid_pipeline_plan(plugin, connection, scope, scope_config) + assert_valid_pipeline_plan(plugin, connection, scope, scope_config, skip_collectors) for stream in plugin.streams: if isinstance(stream, type): stream = stream(plugin.name)
