This is an automated email from the ASF dual-hosted git repository. lynwee pushed a commit to branch dev-1 in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
commit 31f663afaf1b8b51c35abe8a53bede117ddba283 Author: d4x1 <[email protected]> AuthorDate: Thu Sep 12 10:09:58 2024 +0800 feat(azuredevops): support param skipCollectors --- backend/python/pydevlake/pydevlake/ipc.py | 4 +- backend/python/pydevlake/pydevlake/plugin.py | 50 ++++++++++++---------- backend/python/pydevlake/pydevlake/stream.py | 4 ++ .../services/remote/plugin/plugin_extensions.go | 2 +- 4 files changed, 34 insertions(+), 26 deletions(-) diff --git a/backend/python/pydevlake/pydevlake/ipc.py b/backend/python/pydevlake/pydevlake/ipc.py index 135d41cc1..669e4a751 100644 --- a/backend/python/pydevlake/pydevlake/ipc.py +++ b/backend/python/pydevlake/pydevlake/ipc.py @@ -93,7 +93,7 @@ class PluginCommands: return self._plugin.test_connection(connection) @plugin_method - def make_pipeline(self, _, scope_config_pairs: list[tuple[dict, dict]], connection: dict): + def make_pipeline(self, _, scope_config_pairs: list[tuple[dict, dict]], connection: dict,skip_collectors: bool): connection = self._plugin.connection_type(**connection) scope_config_pairs = [ ( @@ -102,7 +102,7 @@ class PluginCommands: ) for raw_scope, raw_config in scope_config_pairs ] - return self._plugin.make_pipeline(scope_config_pairs, connection) + return self._plugin.make_pipeline(scope_config_pairs, connection, skip_collectors) @plugin_method def plugin_info(self, _): diff --git a/backend/python/pydevlake/pydevlake/plugin.py b/backend/python/pydevlake/pydevlake/plugin.py index b365bad4a..84f941939 100644 --- a/backend/python/pydevlake/pydevlake/plugin.py +++ b/backend/python/pydevlake/pydevlake/plugin.py @@ -13,24 +13,23 @@ # limitations under the License. -from typing import Type, Union, Iterable, Optional -from abc import ABC, abstractmethod -from pathlib import Path import os import sys +from abc import ABC, abstractmethod +from pathlib import Path +from typing import Type, Union, Iterable, Optional import fire import pydevlake.message as msg import pydevlake.model_info -from pydevlake.subtasks import Subtask -from pydevlake.logger import logger -from pydevlake.ipc import PluginCommands from pydevlake.context import Context -from pydevlake.stream import Stream -from pydevlake.model import ToolScope, DomainScope, Connection, ScopeConfig, raw_data_params +from pydevlake.ipc import PluginCommands +from pydevlake.logger import logger from pydevlake.migration import MIGRATION_SCRIPTS - +from pydevlake.model import ToolScope, DomainScope, Connection, ScopeConfig, raw_data_params +from pydevlake.stream import Stream +from pydevlake.subtasks import Subtask ScopeConfigPair = tuple[ToolScope, ScopeConfig] @@ -119,20 +118,20 @@ class Plugin(ABC): tool_scope.raw_data_params = raw_data_params(connection.id, tool_scope.id) tool_scope.raw_data_table = self._raw_scope_table_name() yield msg.RemoteScope( - id=tool_scope.id, - parent_id=group_id, - name=tool_scope.name, - data=tool_scope - ) + id=tool_scope.id, + parent_id=group_id, + name=tool_scope.name, + data=tool_scope + ) else: yield from self.remote_scope_groups(connection) def make_pipeline(self, scope_config_pairs: list[ScopeConfigPair], - connection: Connection) -> msg.PipelineData: + connection: Connection, skip_collectors: bool) -> msg.PipelineData: """ Make a simple pipeline using the scopes declared by the plugin. """ - plan = self.make_pipeline_plan(scope_config_pairs, connection) + plan = self.make_pipeline_plan(scope_config_pairs, connection, skip_collectors) domain_scopes = [] for tool_scope, _ in scope_config_pairs: for scope in self.domain_scopes(tool_scope): @@ -151,13 +150,14 @@ class Plugin(ABC): ) def make_pipeline_plan(self, scope_config_pairs: list[ScopeConfigPair], - connection: Connection) -> list[list[msg.PipelineTask]]: + connection: Connection, skip_collectors: bool) -> list[list[msg.PipelineTask]]: """ Generate a pipeline plan with one stage per scope, plus optional additional stages. Redefine `extra_stages` to add stages at the end of this pipeline. """ return [ - *(self.make_pipeline_stage(scope, config, connection) for scope, config in scope_config_pairs), + *(self.make_pipeline_stage(scope, config, connection, skip_collectors) for scope, config in + scope_config_pairs), *self.extra_stages(scope_config_pairs, connection) ] @@ -170,7 +170,7 @@ class Plugin(ABC): return [] def make_pipeline_stage(self, scope: ToolScope, config: ScopeConfig, - connection: Connection) -> list[msg.PipelineTask]: + connection: Connection, skip_collectors: bool) -> list[msg.PipelineTask]: """ Generate a pipeline stage for the given scope, plus optional additional tasks. Subtasks are selected from `entity_types` via `select_subtasks`. @@ -180,7 +180,7 @@ class Plugin(ABC): msg.PipelineTask( plugin=self.name, skip_on_fail=False, - subtasks=self.select_subtasks(scope, config), + subtasks=self.select_subtasks(scope, config, skip_collectors), options={ "scopeId": scope.id, "scopeName": scope.name, @@ -196,14 +196,17 @@ class Plugin(ABC): """Override this method to add tasks to the given scope stage""" return [] - def select_subtasks(self, scope: ToolScope, config: ScopeConfig) -> list[str]: + def select_subtasks(self, scope: ToolScope, config: ScopeConfig, skip_collectors: bool) -> list[str]: """ Returns the list of subtasks names that should be run for given scope and entity types. """ subtasks = [] for stream in self._streams.values(): if set(stream.domain_types).intersection(config.domain_types) and stream.should_run_on(scope): - for subtask in stream.subtasks: + target_subtasks = stream.subtasks + if skip_collectors: + target_subtasks = stream.subtasks_without_collector + for subtask in target_subtasks: subtasks.append(subtask.name) return subtasks @@ -234,7 +237,8 @@ class Plugin(ABC): connection_model_info=pydevlake.model_info.DynamicModelInfo.from_model(self.connection_type), scope_model_info=pydevlake.model_info.DynamicModelInfo.from_model(self.tool_scope_type), scope_config_model_info=pydevlake.model_info.DynamicModelInfo.from_model(self.scope_config_type), - tool_model_infos=[pydevlake.model_info.DynamicModelInfo.from_model(stream.tool_model) for stream in self._streams.values()], + tool_model_infos=[pydevlake.model_info.DynamicModelInfo.from_model(stream.tool_model) for stream in + self._streams.values()], subtask_metas=subtask_metas, migration_scripts=MIGRATION_SCRIPTS ) diff --git a/backend/python/pydevlake/pydevlake/stream.py b/backend/python/pydevlake/pydevlake/stream.py index 6534aadb8..2dc689409 100644 --- a/backend/python/pydevlake/pydevlake/stream.py +++ b/backend/python/pydevlake/pydevlake/stream.py @@ -35,6 +35,10 @@ class Stream: def subtasks(self): return [self.collector, self.extractor, self.convertor] + @property + def subtasks_without_collector(self): + return [self.extractor, self.convertor] + @property def name(self): return type(self).__name__.lower() diff --git a/backend/server/services/remote/plugin/plugin_extensions.go b/backend/server/services/remote/plugin/plugin_extensions.go index 13abba195..c7d9aab8c 100644 --- a/backend/server/services/remote/plugin/plugin_extensions.go +++ b/backend/server/services/remote/plugin/plugin_extensions.go @@ -62,7 +62,7 @@ func (p remoteDatasourcePlugin) MakeDataSourcePipelinePlanV200( } planData := models.PipelineData{} - err = p.invoker.Call("make-pipeline", bridge.DefaultContext, toolScopeConfigPairs, connection.Unwrap()).Get(&planData) + err = p.invoker.Call("make-pipeline", bridge.DefaultContext, toolScopeConfigPairs, connection.Unwrap(), skipCollectors).Get(&planData) if err != nil { return nil, nil, err }
