This is an automated email from the ASF dual-hosted git repository.
abeizn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new c7df6c831 feat(azuredevops): restranformate without api client (#8035)
(#8039)
c7df6c831 is described below
commit c7df6c831015375cbe87cfb9de8e3b6494911f81
Author: Lynwee <[email protected]>
AuthorDate: Thu Sep 12 12:27:11 2024 +0800
feat(azuredevops): restranformate without api client (#8035) (#8039)
* feat(azuredevops): restranformate without api client (#8035)
* feat(plugins): restranformate without api client
* fix(zentao): fix test errors
* fix(bamboo): fix e2e test
* fix(jira): fix e2e test
* fix(zentao): fix e2e test
* fix(bamboo): fix e2e test
* feat(azuredevops): support param skipCollectors
* fix(pydevlake): fix testing utils (#8040)
* feat(plugins): restranformate without api client
* fix(zentao): fix test errors
* fix(bamboo): fix e2e test
* fix(jira): fix e2e test
* fix(zentao): fix e2e test
* fix(bamboo): fix e2e test
* feat(azuredevops): support param skipCollectors
* fix(pydevlake): fix testing utils
---
backend/python/pydevlake/pydevlake/ipc.py | 4 +-
backend/python/pydevlake/pydevlake/plugin.py | 50 ++++++++++++----------
backend/python/pydevlake/pydevlake/stream.py | 4 ++
.../python/pydevlake/pydevlake/testing/testing.py | 20 +++++----
.../services/remote/plugin/plugin_extensions.go | 2 +-
5 files changed, 45 insertions(+), 35 deletions(-)
diff --git a/backend/python/pydevlake/pydevlake/ipc.py
b/backend/python/pydevlake/pydevlake/ipc.py
index 135d41cc1..669e4a751 100644
--- a/backend/python/pydevlake/pydevlake/ipc.py
+++ b/backend/python/pydevlake/pydevlake/ipc.py
@@ -93,7 +93,7 @@ class PluginCommands:
return self._plugin.test_connection(connection)
@plugin_method
- def make_pipeline(self, _, scope_config_pairs: list[tuple[dict, dict]],
connection: dict):
+ def make_pipeline(self, _, scope_config_pairs: list[tuple[dict, dict]],
connection: dict,skip_collectors: bool):
connection = self._plugin.connection_type(**connection)
scope_config_pairs = [
(
@@ -102,7 +102,7 @@ class PluginCommands:
)
for raw_scope, raw_config in scope_config_pairs
]
- return self._plugin.make_pipeline(scope_config_pairs, connection)
+ return self._plugin.make_pipeline(scope_config_pairs, connection,
skip_collectors)
@plugin_method
def plugin_info(self, _):
diff --git a/backend/python/pydevlake/pydevlake/plugin.py
b/backend/python/pydevlake/pydevlake/plugin.py
index b365bad4a..84f941939 100644
--- a/backend/python/pydevlake/pydevlake/plugin.py
+++ b/backend/python/pydevlake/pydevlake/plugin.py
@@ -13,24 +13,23 @@
# limitations under the License.
-from typing import Type, Union, Iterable, Optional
-from abc import ABC, abstractmethod
-from pathlib import Path
import os
import sys
+from abc import ABC, abstractmethod
+from pathlib import Path
+from typing import Type, Union, Iterable, Optional
import fire
import pydevlake.message as msg
import pydevlake.model_info
-from pydevlake.subtasks import Subtask
-from pydevlake.logger import logger
-from pydevlake.ipc import PluginCommands
from pydevlake.context import Context
-from pydevlake.stream import Stream
-from pydevlake.model import ToolScope, DomainScope, Connection, ScopeConfig,
raw_data_params
+from pydevlake.ipc import PluginCommands
+from pydevlake.logger import logger
from pydevlake.migration import MIGRATION_SCRIPTS
-
+from pydevlake.model import ToolScope, DomainScope, Connection, ScopeConfig,
raw_data_params
+from pydevlake.stream import Stream
+from pydevlake.subtasks import Subtask
ScopeConfigPair = tuple[ToolScope, ScopeConfig]
@@ -119,20 +118,20 @@ class Plugin(ABC):
tool_scope.raw_data_params = raw_data_params(connection.id,
tool_scope.id)
tool_scope.raw_data_table = self._raw_scope_table_name()
yield msg.RemoteScope(
- id=tool_scope.id,
- parent_id=group_id,
- name=tool_scope.name,
- data=tool_scope
- )
+ id=tool_scope.id,
+ parent_id=group_id,
+ name=tool_scope.name,
+ data=tool_scope
+ )
else:
yield from self.remote_scope_groups(connection)
def make_pipeline(self, scope_config_pairs: list[ScopeConfigPair],
- connection: Connection) -> msg.PipelineData:
+ connection: Connection, skip_collectors: bool) ->
msg.PipelineData:
"""
Make a simple pipeline using the scopes declared by the plugin.
"""
- plan = self.make_pipeline_plan(scope_config_pairs, connection)
+ plan = self.make_pipeline_plan(scope_config_pairs, connection,
skip_collectors)
domain_scopes = []
for tool_scope, _ in scope_config_pairs:
for scope in self.domain_scopes(tool_scope):
@@ -151,13 +150,14 @@ class Plugin(ABC):
)
def make_pipeline_plan(self, scope_config_pairs: list[ScopeConfigPair],
- connection: Connection) ->
list[list[msg.PipelineTask]]:
+ connection: Connection, skip_collectors: bool) ->
list[list[msg.PipelineTask]]:
"""
Generate a pipeline plan with one stage per scope, plus optional
additional stages.
Redefine `extra_stages` to add stages at the end of this pipeline.
"""
return [
- *(self.make_pipeline_stage(scope, config, connection) for scope,
config in scope_config_pairs),
+ *(self.make_pipeline_stage(scope, config, connection,
skip_collectors) for scope, config in
+ scope_config_pairs),
*self.extra_stages(scope_config_pairs, connection)
]
@@ -170,7 +170,7 @@ class Plugin(ABC):
return []
def make_pipeline_stage(self, scope: ToolScope, config: ScopeConfig,
- connection: Connection) -> list[msg.PipelineTask]:
+ connection: Connection, skip_collectors: bool) ->
list[msg.PipelineTask]:
"""
Generate a pipeline stage for the given scope, plus optional
additional tasks.
Subtasks are selected from `entity_types` via `select_subtasks`.
@@ -180,7 +180,7 @@ class Plugin(ABC):
msg.PipelineTask(
plugin=self.name,
skip_on_fail=False,
- subtasks=self.select_subtasks(scope, config),
+ subtasks=self.select_subtasks(scope, config, skip_collectors),
options={
"scopeId": scope.id,
"scopeName": scope.name,
@@ -196,14 +196,17 @@ class Plugin(ABC):
"""Override this method to add tasks to the given scope stage"""
return []
- def select_subtasks(self, scope: ToolScope, config: ScopeConfig) ->
list[str]:
+ def select_subtasks(self, scope: ToolScope, config: ScopeConfig,
skip_collectors: bool) -> list[str]:
"""
Returns the list of subtasks names that should be run for given scope
and entity types.
"""
subtasks = []
for stream in self._streams.values():
if set(stream.domain_types).intersection(config.domain_types) and
stream.should_run_on(scope):
- for subtask in stream.subtasks:
+ target_subtasks = stream.subtasks
+ if skip_collectors:
+ target_subtasks = stream.subtasks_without_collector
+ for subtask in target_subtasks:
subtasks.append(subtask.name)
return subtasks
@@ -234,7 +237,8 @@ class Plugin(ABC):
connection_model_info=pydevlake.model_info.DynamicModelInfo.from_model(self.connection_type),
scope_model_info=pydevlake.model_info.DynamicModelInfo.from_model(self.tool_scope_type),
scope_config_model_info=pydevlake.model_info.DynamicModelInfo.from_model(self.scope_config_type),
-
tool_model_infos=[pydevlake.model_info.DynamicModelInfo.from_model(stream.tool_model)
for stream in self._streams.values()],
+
tool_model_infos=[pydevlake.model_info.DynamicModelInfo.from_model(stream.tool_model)
for stream in
+ self._streams.values()],
subtask_metas=subtask_metas,
migration_scripts=MIGRATION_SCRIPTS
)
diff --git a/backend/python/pydevlake/pydevlake/stream.py
b/backend/python/pydevlake/pydevlake/stream.py
index 6534aadb8..2dc689409 100644
--- a/backend/python/pydevlake/pydevlake/stream.py
+++ b/backend/python/pydevlake/pydevlake/stream.py
@@ -35,6 +35,10 @@ class Stream:
def subtasks(self):
return [self.collector, self.extractor, self.convertor]
+ @property
+ def subtasks_without_collector(self):
+ return [self.extractor, self.convertor]
+
@property
def name(self):
return type(self).__name__.lower()
diff --git a/backend/python/pydevlake/pydevlake/testing/testing.py
b/backend/python/pydevlake/pydevlake/testing/testing.py
index 3b1ec9617..977511047 100644
--- a/backend/python/pydevlake/pydevlake/testing/testing.py
+++ b/backend/python/pydevlake/pydevlake/testing/testing.py
@@ -13,17 +13,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import pytest
-
from typing import Union, Type, Iterable, Generator, Optional
+import pytest
from pydantic import ValidationError
from sqlmodel import create_engine
from pydevlake.context import Context
-from pydevlake.plugin import Plugin
from pydevlake.message import RemoteScopeGroup, PipelineTask
from pydevlake.model import DomainModel, Connection, DomainScope, ToolModel,
ToolScope, ScopeConfig, DomainType
+from pydevlake.plugin import Plugin
from pydevlake.stream import Stream
@@ -64,8 +63,8 @@ def make_context(connection, scope, scope_config):
def assert_stream_convert(plugin: Union[Plugin, Type[Plugin]], stream_name:
str,
- raw: dict, expected: Union[DomainModel,
Iterable[DomainModel]],
- ctx=None):
+ raw: dict, expected: Union[DomainModel,
Iterable[DomainModel]],
+ ctx=None):
if isinstance(plugin, type):
plugin = plugin()
stream = plugin.get_stream(stream_name)
@@ -178,10 +177,12 @@ def assert_valid_remote_scopes(plugin: Plugin,
connection: Connection, group_id:
return tool_scopes
-def assert_valid_pipeline_plan(plugin: Plugin, connection: Connection,
tool_scope: ToolScope, scope_config: ScopeConfig) -> list[list[PipelineTask]]:
+def assert_valid_pipeline_plan(plugin: Plugin, connection: Connection,
tool_scope: ToolScope, scope_config: ScopeConfig,
+ skip_collectors: bool) ->
list[list[PipelineTask]]:
plan = plugin.make_pipeline_plan(
[(tool_scope, scope_config)],
- connection
+ connection,
+ skip_collectors,
)
assert len(plan) > 0, 'Pipeline plan has no stage'
for stage in plan:
@@ -198,14 +199,15 @@ def assert_valid_plugin(plugin: Plugin):
assert_valid_streams(plugin)
-def assert_plugin_run(plugin: Plugin, connection: Connection, scope_config:
Optional[ScopeConfig] = None):
+def assert_plugin_run(plugin: Plugin, connection: Connection, scope_config:
Optional[ScopeConfig] = None,
+ skip_collectors: bool = False):
assert_valid_plugin(plugin)
assert_valid_connection(plugin, connection)
groups = assert_valid_remote_scope_groups(plugin, connection)
scope = assert_valid_remote_scopes(plugin, connection, groups[0].id)[0]
assert_valid_domain_scopes(plugin, scope)
scope_config = scope_config or plugin.scope_config_type()
- assert_valid_pipeline_plan(plugin, connection, scope, scope_config)
+ assert_valid_pipeline_plan(plugin, connection, scope, scope_config,
skip_collectors)
for stream in plugin.streams:
if isinstance(stream, type):
stream = stream(plugin.name)
diff --git a/backend/server/services/remote/plugin/plugin_extensions.go
b/backend/server/services/remote/plugin/plugin_extensions.go
index 13abba195..c7d9aab8c 100644
--- a/backend/server/services/remote/plugin/plugin_extensions.go
+++ b/backend/server/services/remote/plugin/plugin_extensions.go
@@ -62,7 +62,7 @@ func (p remoteDatasourcePlugin)
MakeDataSourcePipelinePlanV200(
}
planData := models.PipelineData{}
- err = p.invoker.Call("make-pipeline", bridge.DefaultContext,
toolScopeConfigPairs, connection.Unwrap()).Get(&planData)
+ err = p.invoker.Call("make-pipeline", bridge.DefaultContext,
toolScopeConfigPairs, connection.Unwrap(), skipCollectors).Get(&planData)
if err != nil {
return nil, nil, err
}