This is an automated email from the ASF dual-hosted git repository.

lynwee pushed a commit to branch feat-azuredevops-restranform
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/feat-azuredevops-restranform 
by this push:
     new b5ac0a3dc feat(azuredevops): restranformate without api client (#8035)
b5ac0a3dc is described below

commit b5ac0a3dceb11745e322ea28638f104b5022ed80
Author: Lynwee <[email protected]>
AuthorDate: Thu Sep 12 10:45:55 2024 +0800

    feat(azuredevops): restranformate without api client (#8035)
    
    * feat(plugins): restranformate without api client
    
    * fix(zentao): fix test errors
    
    * fix(bamboo): fix e2e test
    
    * fix(jira): fix e2e test
    
    * fix(zentao): fix e2e test
    
    * fix(bamboo): fix e2e test
    
    * feat(azuredevops): support param skipCollectors
---
 backend/python/pydevlake/pydevlake/ipc.py          |  4 +-
 backend/python/pydevlake/pydevlake/plugin.py       | 50 ++++++++++++----------
 backend/python/pydevlake/pydevlake/stream.py       |  4 ++
 .../services/remote/plugin/plugin_extensions.go    |  2 +-
 4 files changed, 34 insertions(+), 26 deletions(-)

diff --git a/backend/python/pydevlake/pydevlake/ipc.py 
b/backend/python/pydevlake/pydevlake/ipc.py
index 135d41cc1..669e4a751 100644
--- a/backend/python/pydevlake/pydevlake/ipc.py
+++ b/backend/python/pydevlake/pydevlake/ipc.py
@@ -93,7 +93,7 @@ class PluginCommands:
         return self._plugin.test_connection(connection)
 
     @plugin_method
-    def make_pipeline(self, _, scope_config_pairs: list[tuple[dict, dict]], 
connection: dict):
+    def make_pipeline(self, _, scope_config_pairs: list[tuple[dict, dict]], 
connection: dict,skip_collectors: bool):
         connection = self._plugin.connection_type(**connection)
         scope_config_pairs = [
             (
@@ -102,7 +102,7 @@ class PluginCommands:
             )
             for raw_scope, raw_config in scope_config_pairs
         ]
-        return self._plugin.make_pipeline(scope_config_pairs, connection)
+        return self._plugin.make_pipeline(scope_config_pairs, connection, 
skip_collectors)
 
     @plugin_method
     def plugin_info(self, _):
diff --git a/backend/python/pydevlake/pydevlake/plugin.py 
b/backend/python/pydevlake/pydevlake/plugin.py
index b365bad4a..84f941939 100644
--- a/backend/python/pydevlake/pydevlake/plugin.py
+++ b/backend/python/pydevlake/pydevlake/plugin.py
@@ -13,24 +13,23 @@
 # limitations under the License.
 
 
-from typing import Type, Union, Iterable, Optional
-from abc import ABC, abstractmethod
-from pathlib import Path
 import os
 import sys
+from abc import ABC, abstractmethod
+from pathlib import Path
+from typing import Type, Union, Iterable, Optional
 
 import fire
 
 import pydevlake.message as msg
 import pydevlake.model_info
-from pydevlake.subtasks import Subtask
-from pydevlake.logger import logger
-from pydevlake.ipc import PluginCommands
 from pydevlake.context import Context
-from pydevlake.stream import Stream
-from pydevlake.model import ToolScope, DomainScope, Connection, ScopeConfig, 
raw_data_params
+from pydevlake.ipc import PluginCommands
+from pydevlake.logger import logger
 from pydevlake.migration import MIGRATION_SCRIPTS
-
+from pydevlake.model import ToolScope, DomainScope, Connection, ScopeConfig, 
raw_data_params
+from pydevlake.stream import Stream
+from pydevlake.subtasks import Subtask
 
 ScopeConfigPair = tuple[ToolScope, ScopeConfig]
 
@@ -119,20 +118,20 @@ class Plugin(ABC):
                 tool_scope.raw_data_params = raw_data_params(connection.id, 
tool_scope.id)
                 tool_scope.raw_data_table = self._raw_scope_table_name()
                 yield msg.RemoteScope(
-                        id=tool_scope.id,
-                        parent_id=group_id,
-                        name=tool_scope.name,
-                        data=tool_scope
-                    )
+                    id=tool_scope.id,
+                    parent_id=group_id,
+                    name=tool_scope.name,
+                    data=tool_scope
+                )
         else:
             yield from self.remote_scope_groups(connection)
 
     def make_pipeline(self, scope_config_pairs: list[ScopeConfigPair],
-                      connection: Connection) -> msg.PipelineData:
+                      connection: Connection, skip_collectors: bool) -> 
msg.PipelineData:
         """
         Make a simple pipeline using the scopes declared by the plugin.
         """
-        plan = self.make_pipeline_plan(scope_config_pairs, connection)
+        plan = self.make_pipeline_plan(scope_config_pairs, connection, 
skip_collectors)
         domain_scopes = []
         for tool_scope, _ in scope_config_pairs:
             for scope in self.domain_scopes(tool_scope):
@@ -151,13 +150,14 @@ class Plugin(ABC):
         )
 
     def make_pipeline_plan(self, scope_config_pairs: list[ScopeConfigPair],
-                           connection: Connection) -> 
list[list[msg.PipelineTask]]:
+                           connection: Connection, skip_collectors: bool) -> 
list[list[msg.PipelineTask]]:
         """
         Generate a pipeline plan with one stage per scope, plus optional 
additional stages.
         Redefine `extra_stages` to add stages at the end of this pipeline.
         """
         return [
-            *(self.make_pipeline_stage(scope, config, connection) for scope, 
config in scope_config_pairs),
+            *(self.make_pipeline_stage(scope, config, connection, 
skip_collectors) for scope, config in
+              scope_config_pairs),
             *self.extra_stages(scope_config_pairs, connection)
         ]
 
@@ -170,7 +170,7 @@ class Plugin(ABC):
         return []
 
     def make_pipeline_stage(self, scope: ToolScope, config: ScopeConfig,
-                            connection: Connection) -> list[msg.PipelineTask]:
+                            connection: Connection, skip_collectors: bool) -> 
list[msg.PipelineTask]:
         """
         Generate a pipeline stage for the given scope, plus optional 
additional tasks.
         Subtasks are selected from `entity_types` via `select_subtasks`.
@@ -180,7 +180,7 @@ class Plugin(ABC):
             msg.PipelineTask(
                 plugin=self.name,
                 skip_on_fail=False,
-                subtasks=self.select_subtasks(scope, config),
+                subtasks=self.select_subtasks(scope, config, skip_collectors),
                 options={
                     "scopeId": scope.id,
                     "scopeName": scope.name,
@@ -196,14 +196,17 @@ class Plugin(ABC):
         """Override this method to add tasks to the given scope stage"""
         return []
 
-    def select_subtasks(self, scope: ToolScope, config: ScopeConfig) -> 
list[str]:
+    def select_subtasks(self, scope: ToolScope, config: ScopeConfig, 
skip_collectors: bool) -> list[str]:
         """
         Returns the list of subtasks names that should be run for given scope 
and entity types.
         """
         subtasks = []
         for stream in self._streams.values():
             if set(stream.domain_types).intersection(config.domain_types) and 
stream.should_run_on(scope):
-                for subtask in stream.subtasks:
+                target_subtasks = stream.subtasks
+                if skip_collectors:
+                    target_subtasks = stream.subtasks_without_collector
+                for subtask in target_subtasks:
                     subtasks.append(subtask.name)
         return subtasks
 
@@ -234,7 +237,8 @@ class Plugin(ABC):
             
connection_model_info=pydevlake.model_info.DynamicModelInfo.from_model(self.connection_type),
             
scope_model_info=pydevlake.model_info.DynamicModelInfo.from_model(self.tool_scope_type),
             
scope_config_model_info=pydevlake.model_info.DynamicModelInfo.from_model(self.scope_config_type),
-            
tool_model_infos=[pydevlake.model_info.DynamicModelInfo.from_model(stream.tool_model)
 for stream in self._streams.values()],
+            
tool_model_infos=[pydevlake.model_info.DynamicModelInfo.from_model(stream.tool_model)
 for stream in
+                              self._streams.values()],
             subtask_metas=subtask_metas,
             migration_scripts=MIGRATION_SCRIPTS
         )
diff --git a/backend/python/pydevlake/pydevlake/stream.py 
b/backend/python/pydevlake/pydevlake/stream.py
index 6534aadb8..2dc689409 100644
--- a/backend/python/pydevlake/pydevlake/stream.py
+++ b/backend/python/pydevlake/pydevlake/stream.py
@@ -35,6 +35,10 @@ class Stream:
     def subtasks(self):
         return [self.collector, self.extractor, self.convertor]
 
+    @property
+    def subtasks_without_collector(self):
+        return [self.extractor, self.convertor]
+
     @property
     def name(self):
         return type(self).__name__.lower()
diff --git a/backend/server/services/remote/plugin/plugin_extensions.go 
b/backend/server/services/remote/plugin/plugin_extensions.go
index 13abba195..c7d9aab8c 100644
--- a/backend/server/services/remote/plugin/plugin_extensions.go
+++ b/backend/server/services/remote/plugin/plugin_extensions.go
@@ -62,7 +62,7 @@ func (p remoteDatasourcePlugin) 
MakeDataSourcePipelinePlanV200(
        }
 
        planData := models.PipelineData{}
-       err = p.invoker.Call("make-pipeline", bridge.DefaultContext, 
toolScopeConfigPairs, connection.Unwrap()).Get(&planData)
+       err = p.invoker.Call("make-pipeline", bridge.DefaultContext, 
toolScopeConfigPairs, connection.Unwrap(), skipCollectors).Get(&planData)
        if err != nil {
                return nil, nil, err
        }

Reply via email to