This is an automated email from the ASF dual-hosted git repository.
lynwee pushed a commit to branch feat-azuredevops-restranform
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/feat-azuredevops-restranform
by this push:
new b5ac0a3dc feat(azuredevops): restranformate without api client (#8035)
b5ac0a3dc is described below
commit b5ac0a3dceb11745e322ea28638f104b5022ed80
Author: Lynwee <[email protected]>
AuthorDate: Thu Sep 12 10:45:55 2024 +0800
feat(azuredevops): restranformate without api client (#8035)
* feat(plugins): restranformate without api client
* fix(zentao): fix test errors
* fix(bamboo): fix e2e test
* fix(jira): fix e2e test
* fix(zentao): fix e2e test
* fix(bamboo): fix e2e test
* feat(azuredevops): support param skipCollectors
---
backend/python/pydevlake/pydevlake/ipc.py | 4 +-
backend/python/pydevlake/pydevlake/plugin.py | 50 ++++++++++++----------
backend/python/pydevlake/pydevlake/stream.py | 4 ++
.../services/remote/plugin/plugin_extensions.go | 2 +-
4 files changed, 34 insertions(+), 26 deletions(-)
diff --git a/backend/python/pydevlake/pydevlake/ipc.py
b/backend/python/pydevlake/pydevlake/ipc.py
index 135d41cc1..669e4a751 100644
--- a/backend/python/pydevlake/pydevlake/ipc.py
+++ b/backend/python/pydevlake/pydevlake/ipc.py
@@ -93,7 +93,7 @@ class PluginCommands:
return self._plugin.test_connection(connection)
@plugin_method
- def make_pipeline(self, _, scope_config_pairs: list[tuple[dict, dict]],
connection: dict):
+ def make_pipeline(self, _, scope_config_pairs: list[tuple[dict, dict]],
connection: dict,skip_collectors: bool):
connection = self._plugin.connection_type(**connection)
scope_config_pairs = [
(
@@ -102,7 +102,7 @@ class PluginCommands:
)
for raw_scope, raw_config in scope_config_pairs
]
- return self._plugin.make_pipeline(scope_config_pairs, connection)
+ return self._plugin.make_pipeline(scope_config_pairs, connection,
skip_collectors)
@plugin_method
def plugin_info(self, _):
diff --git a/backend/python/pydevlake/pydevlake/plugin.py
b/backend/python/pydevlake/pydevlake/plugin.py
index b365bad4a..84f941939 100644
--- a/backend/python/pydevlake/pydevlake/plugin.py
+++ b/backend/python/pydevlake/pydevlake/plugin.py
@@ -13,24 +13,23 @@
# limitations under the License.
-from typing import Type, Union, Iterable, Optional
-from abc import ABC, abstractmethod
-from pathlib import Path
import os
import sys
+from abc import ABC, abstractmethod
+from pathlib import Path
+from typing import Type, Union, Iterable, Optional
import fire
import pydevlake.message as msg
import pydevlake.model_info
-from pydevlake.subtasks import Subtask
-from pydevlake.logger import logger
-from pydevlake.ipc import PluginCommands
from pydevlake.context import Context
-from pydevlake.stream import Stream
-from pydevlake.model import ToolScope, DomainScope, Connection, ScopeConfig,
raw_data_params
+from pydevlake.ipc import PluginCommands
+from pydevlake.logger import logger
from pydevlake.migration import MIGRATION_SCRIPTS
-
+from pydevlake.model import ToolScope, DomainScope, Connection, ScopeConfig,
raw_data_params
+from pydevlake.stream import Stream
+from pydevlake.subtasks import Subtask
ScopeConfigPair = tuple[ToolScope, ScopeConfig]
@@ -119,20 +118,20 @@ class Plugin(ABC):
tool_scope.raw_data_params = raw_data_params(connection.id,
tool_scope.id)
tool_scope.raw_data_table = self._raw_scope_table_name()
yield msg.RemoteScope(
- id=tool_scope.id,
- parent_id=group_id,
- name=tool_scope.name,
- data=tool_scope
- )
+ id=tool_scope.id,
+ parent_id=group_id,
+ name=tool_scope.name,
+ data=tool_scope
+ )
else:
yield from self.remote_scope_groups(connection)
def make_pipeline(self, scope_config_pairs: list[ScopeConfigPair],
- connection: Connection) -> msg.PipelineData:
+ connection: Connection, skip_collectors: bool) ->
msg.PipelineData:
"""
Make a simple pipeline using the scopes declared by the plugin.
"""
- plan = self.make_pipeline_plan(scope_config_pairs, connection)
+ plan = self.make_pipeline_plan(scope_config_pairs, connection,
skip_collectors)
domain_scopes = []
for tool_scope, _ in scope_config_pairs:
for scope in self.domain_scopes(tool_scope):
@@ -151,13 +150,14 @@ class Plugin(ABC):
)
def make_pipeline_plan(self, scope_config_pairs: list[ScopeConfigPair],
- connection: Connection) ->
list[list[msg.PipelineTask]]:
+ connection: Connection, skip_collectors: bool) ->
list[list[msg.PipelineTask]]:
"""
Generate a pipeline plan with one stage per scope, plus optional
additional stages.
Redefine `extra_stages` to add stages at the end of this pipeline.
"""
return [
- *(self.make_pipeline_stage(scope, config, connection) for scope,
config in scope_config_pairs),
+ *(self.make_pipeline_stage(scope, config, connection,
skip_collectors) for scope, config in
+ scope_config_pairs),
*self.extra_stages(scope_config_pairs, connection)
]
@@ -170,7 +170,7 @@ class Plugin(ABC):
return []
def make_pipeline_stage(self, scope: ToolScope, config: ScopeConfig,
- connection: Connection) -> list[msg.PipelineTask]:
+ connection: Connection, skip_collectors: bool) ->
list[msg.PipelineTask]:
"""
Generate a pipeline stage for the given scope, plus optional
additional tasks.
Subtasks are selected from `entity_types` via `select_subtasks`.
@@ -180,7 +180,7 @@ class Plugin(ABC):
msg.PipelineTask(
plugin=self.name,
skip_on_fail=False,
- subtasks=self.select_subtasks(scope, config),
+ subtasks=self.select_subtasks(scope, config, skip_collectors),
options={
"scopeId": scope.id,
"scopeName": scope.name,
@@ -196,14 +196,17 @@ class Plugin(ABC):
"""Override this method to add tasks to the given scope stage"""
return []
- def select_subtasks(self, scope: ToolScope, config: ScopeConfig) ->
list[str]:
+ def select_subtasks(self, scope: ToolScope, config: ScopeConfig,
skip_collectors: bool) -> list[str]:
"""
Returns the list of subtasks names that should be run for given scope
and entity types.
"""
subtasks = []
for stream in self._streams.values():
if set(stream.domain_types).intersection(config.domain_types) and
stream.should_run_on(scope):
- for subtask in stream.subtasks:
+ target_subtasks = stream.subtasks
+ if skip_collectors:
+ target_subtasks = stream.subtasks_without_collector
+ for subtask in target_subtasks:
subtasks.append(subtask.name)
return subtasks
@@ -234,7 +237,8 @@ class Plugin(ABC):
connection_model_info=pydevlake.model_info.DynamicModelInfo.from_model(self.connection_type),
scope_model_info=pydevlake.model_info.DynamicModelInfo.from_model(self.tool_scope_type),
scope_config_model_info=pydevlake.model_info.DynamicModelInfo.from_model(self.scope_config_type),
-
tool_model_infos=[pydevlake.model_info.DynamicModelInfo.from_model(stream.tool_model)
for stream in self._streams.values()],
+
tool_model_infos=[pydevlake.model_info.DynamicModelInfo.from_model(stream.tool_model)
for stream in
+ self._streams.values()],
subtask_metas=subtask_metas,
migration_scripts=MIGRATION_SCRIPTS
)
diff --git a/backend/python/pydevlake/pydevlake/stream.py
b/backend/python/pydevlake/pydevlake/stream.py
index 6534aadb8..2dc689409 100644
--- a/backend/python/pydevlake/pydevlake/stream.py
+++ b/backend/python/pydevlake/pydevlake/stream.py
@@ -35,6 +35,10 @@ class Stream:
def subtasks(self):
return [self.collector, self.extractor, self.convertor]
+ @property
+ def subtasks_without_collector(self):
+ return [self.extractor, self.convertor]
+
@property
def name(self):
return type(self).__name__.lower()
diff --git a/backend/server/services/remote/plugin/plugin_extensions.go
b/backend/server/services/remote/plugin/plugin_extensions.go
index 13abba195..c7d9aab8c 100644
--- a/backend/server/services/remote/plugin/plugin_extensions.go
+++ b/backend/server/services/remote/plugin/plugin_extensions.go
@@ -62,7 +62,7 @@ func (p remoteDatasourcePlugin)
MakeDataSourcePipelinePlanV200(
}
planData := models.PipelineData{}
- err = p.invoker.Call("make-pipeline", bridge.DefaultContext,
toolScopeConfigPairs, connection.Unwrap()).Get(&planData)
+ err = p.invoker.Call("make-pipeline", bridge.DefaultContext,
toolScopeConfigPairs, connection.Unwrap(), skipCollectors).Get(&planData)
if err != nil {
return nil, nil, err
}