This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 083ab2b5c9 Remove dao in worker (#10994)
083ab2b5c9 is described below

commit 083ab2b5c9682e6afadefcbc4d5285622a4155d0
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri Jul 15 20:07:18 2022 +0800

    Remove dao in worker (#10994)
---
 .../dolphinscheduler/api/ApiApplicationServer.java | 32 +++++++++-
 .../server/master/MasterServer.java                |  2 +-
 .../service/task/TaskPluginManager.java            | 72 +++++++++-------------
 dolphinscheduler-worker/pom.xml                    |  6 ++
 .../server/worker/WorkerServer.java                |  4 +-
 5 files changed, 68 insertions(+), 48 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
index 9e6aa94530..078506898a 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
@@ -17,8 +17,20 @@
 
 package org.apache.dolphinscheduler.api;
 
+import org.apache.dolphinscheduler.common.enums.PluginType;
+import org.apache.dolphinscheduler.dao.PluginDao;
+import org.apache.dolphinscheduler.dao.entity.PluginDefine;
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
 import org.apache.dolphinscheduler.service.task.TaskPluginManager;
+import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
+import org.apache.dolphinscheduler.spi.params.base.PluginParams;
 
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
@@ -32,16 +44,34 @@ import org.springframework.context.event.EventListener;
 @ComponentScan("org.apache.dolphinscheduler")
 public class ApiApplicationServer {
 
+    private final Logger logger = 
LoggerFactory.getLogger(ApiApplicationServer.class);
+
     @Autowired
     private TaskPluginManager taskPluginManager;
 
+    @Autowired
+    private PluginDao pluginDao;
+
     public static void main(String[] args) {
         SpringApplication.run(ApiApplicationServer.class);
     }
 
     @EventListener
     public void run(ApplicationReadyEvent readyEvent) {
+        logger.info("Received spring application context ready event will load 
taskPlugin and write to DB");
         // install task plugin
-        taskPluginManager.installPlugin();
+        taskPluginManager.loadPlugin();
+        for (Map.Entry<String, TaskChannelFactory> entry : 
taskPluginManager.getTaskChannelFactoryMap().entrySet()) {
+            String taskPluginName = entry.getKey();
+            TaskChannelFactory taskChannelFactory = entry.getValue();
+            List<PluginParams> params = taskChannelFactory.getParams();
+            String paramsJson = 
PluginParamsTransfer.transferParamsToJson(params);
+
+            PluginDefine pluginDefine = new PluginDefine(taskPluginName, 
PluginType.TASK.getDesc(), paramsJson);
+            int count = pluginDao.addOrUpdatePluginDefine(pluginDefine);
+            if (count <= 0) {
+                throw new TaskPluginException("Failed to update task plugin: " 
+ taskPluginName);
+            }
+        }
     }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 0bf3c945e7..1415aaa840 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -87,7 +87,7 @@ public class MasterServer implements IStoppable {
         this.masterRPCServer.start();
 
         // install task plugin
-        this.taskPluginManager.installPlugin();
+        this.taskPluginManager.loadPlugin();
 
         // self tolerant
         this.masterRegistryClient.init();
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java
index 289e8ddbfb..1289b5718a 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java
@@ -19,25 +19,18 @@ package org.apache.dolphinscheduler.service.task;
 
 import static java.lang.String.format;
 
-import org.apache.dolphinscheduler.common.enums.PluginType;
-import org.apache.dolphinscheduler.dao.PluginDao;
-import org.apache.dolphinscheduler.dao.entity.PluginDefine;
 import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
 import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
 import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
 import 
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
-import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
-import org.apache.dolphinscheduler.spi.params.base.PluginParams;
 
 import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.ServiceLoader;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,23 +40,42 @@ import org.springframework.stereotype.Component;
 public class TaskPluginManager {
     private static final Logger logger = 
LoggerFactory.getLogger(TaskPluginManager.class);
 
-    private final Map<String, TaskChannel> taskChannelMap = new 
ConcurrentHashMap<>();
+    private final Map<String, TaskChannelFactory> taskChannelFactoryMap = new 
HashMap<>();
+    private final Map<String, TaskChannel> taskChannelMap = new HashMap<>();
 
-    private final PluginDao pluginDao;
+    private final AtomicBoolean loadedFlag = new AtomicBoolean(false);
 
-    public TaskPluginManager(PluginDao pluginDao) {
-        this.pluginDao = pluginDao;
-    }
+    /**
+     * Load task plugins from classpath.
+     */
+    public void loadPlugin() {
+        if (!loadedFlag.compareAndSet(false, true)) {
+            logger.warn("The task plugin has already been loaded");
+            return;
+        }
+        ServiceLoader.load(TaskChannelFactory.class).forEach(factory -> {
+            final String name = factory.getName();
 
-    private void loadTaskChannel(TaskChannelFactory taskChannelFactory) {
-        TaskChannel taskChannel = taskChannelFactory.create();
-        taskChannelMap.put(taskChannelFactory.getName(), taskChannel);
+            logger.info("Registering task plugin: {}", name);
+
+            if (taskChannelFactoryMap.containsKey(name)) {
+                throw new TaskPluginException(format("Duplicate task plugins 
named '%s'", name));
+            }
+            taskChannelFactoryMap.put(name, factory);
+            taskChannelMap.put(name, factory.create());
+
+            logger.info("Registered task plugin: {}", name);
+        });
     }
 
     public Map<String, TaskChannel> getTaskChannelMap() {
         return Collections.unmodifiableMap(taskChannelMap);
     }
 
+    public Map<String, TaskChannelFactory> getTaskChannelFactoryMap() {
+        return Collections.unmodifiableMap(taskChannelFactoryMap);
+    }
+
     public TaskChannel getTaskChannel(String type) {
         return this.getTaskChannelMap().get(type);
     }
@@ -85,30 +97,4 @@ public class TaskPluginManager {
         return taskChannel.parseParameters(parametersNode);
     }
 
-    public void installPlugin() {
-        final Set<String> names = new HashSet<>();
-
-        ServiceLoader.load(TaskChannelFactory.class).forEach(factory -> {
-            final String name = factory.getName();
-
-            logger.info("Registering task plugin: {}", name);
-
-            if (!names.add(name)) {
-                throw new TaskPluginException(format("Duplicate task plugins 
named '%s'", name));
-            }
-
-            loadTaskChannel(factory);
-
-            logger.info("Registered task plugin: {}", name);
-
-            List<PluginParams> params = factory.getParams();
-            String paramsJson = 
PluginParamsTransfer.transferParamsToJson(params);
-
-            PluginDefine pluginDefine = new PluginDefine(name, 
PluginType.TASK.getDesc(), paramsJson);
-            int count = pluginDao.addOrUpdatePluginDefine(pluginDefine);
-            if (count <= 0) {
-                throw new TaskPluginException("Failed to update task plugin: " 
+ name);
-            }
-        });
-    }
 }
diff --git a/dolphinscheduler-worker/pom.xml b/dolphinscheduler-worker/pom.xml
index 45a9890738..adb5bfcde5 100644
--- a/dolphinscheduler-worker/pom.xml
+++ b/dolphinscheduler-worker/pom.xml
@@ -34,6 +34,12 @@
         <dependency>
             <groupId>org.apache.dolphinscheduler</groupId>
             <artifactId>dolphinscheduler-service</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.dolphinscheduler</groupId>
+                    <artifactId>dolphinscheduler-dao</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.dolphinscheduler</groupId>
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index d03a2a951b..6b4b27a46e 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -88,8 +88,6 @@ public class WorkerServer implements IStoppable {
     @Autowired
     private WorkerRegistryClient workerRegistryClient;
 
-    // todo: Can we just load the task spi, and don't install into mysql?
-    //  we don't need to rely the dao module in worker.
     @Autowired
     private TaskPluginManager taskPluginManager;
 
@@ -116,7 +114,7 @@ public class WorkerServer implements IStoppable {
     public void run() {
         this.workerRpcServer.start();
         this.workerRpcClient.start();
-        this.taskPluginManager.installPlugin();
+        this.taskPluginManager.loadPlugin();
 
         this.workerRegistryClient.registry();
         this.workerRegistryClient.setRegistryStoppable(this);

Reply via email to