This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 083ab2b5c9 Remove dao in worker (#10994)
083ab2b5c9 is described below
commit 083ab2b5c9682e6afadefcbc4d5285622a4155d0
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri Jul 15 20:07:18 2022 +0800
Remove dao in worker (#10994)
---
.../dolphinscheduler/api/ApiApplicationServer.java | 32 +++++++++-
.../server/master/MasterServer.java | 2 +-
.../service/task/TaskPluginManager.java | 72 +++++++++-------------
dolphinscheduler-worker/pom.xml | 6 ++
.../server/worker/WorkerServer.java | 4 +-
5 files changed, 68 insertions(+), 48 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
index 9e6aa94530..078506898a 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
@@ -17,8 +17,20 @@
package org.apache.dolphinscheduler.api;
+import org.apache.dolphinscheduler.common.enums.PluginType;
+import org.apache.dolphinscheduler.dao.PluginDao;
+import org.apache.dolphinscheduler.dao.entity.PluginDefine;
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
+import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
+import org.apache.dolphinscheduler.spi.params.base.PluginParams;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@@ -32,16 +44,34 @@ import org.springframework.context.event.EventListener;
@ComponentScan("org.apache.dolphinscheduler")
public class ApiApplicationServer {
+ private final Logger logger =
LoggerFactory.getLogger(ApiApplicationServer.class);
+
@Autowired
private TaskPluginManager taskPluginManager;
+ @Autowired
+ private PluginDao pluginDao;
+
public static void main(String[] args) {
SpringApplication.run(ApiApplicationServer.class);
}
@EventListener
public void run(ApplicationReadyEvent readyEvent) {
+ logger.info("Received spring application context ready event will load
taskPlugin and write to DB");
// install task plugin
- taskPluginManager.installPlugin();
+ taskPluginManager.loadPlugin();
+ for (Map.Entry<String, TaskChannelFactory> entry :
taskPluginManager.getTaskChannelFactoryMap().entrySet()) {
+ String taskPluginName = entry.getKey();
+ TaskChannelFactory taskChannelFactory = entry.getValue();
+ List<PluginParams> params = taskChannelFactory.getParams();
+ String paramsJson =
PluginParamsTransfer.transferParamsToJson(params);
+
+ PluginDefine pluginDefine = new PluginDefine(taskPluginName,
PluginType.TASK.getDesc(), paramsJson);
+ int count = pluginDao.addOrUpdatePluginDefine(pluginDefine);
+ if (count <= 0) {
+ throw new TaskPluginException("Failed to update task plugin: "
+ taskPluginName);
+ }
+ }
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 0bf3c945e7..1415aaa840 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -87,7 +87,7 @@ public class MasterServer implements IStoppable {
this.masterRPCServer.start();
// install task plugin
- this.taskPluginManager.installPlugin();
+ this.taskPluginManager.loadPlugin();
// self tolerant
this.masterRegistryClient.init();
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java
index 289e8ddbfb..1289b5718a 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java
@@ -19,25 +19,18 @@ package org.apache.dolphinscheduler.service.task;
import static java.lang.String.format;
-import org.apache.dolphinscheduler.common.enums.PluginType;
-import org.apache.dolphinscheduler.dao.PluginDao;
-import org.apache.dolphinscheduler.dao.entity.PluginDefine;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
-import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
-import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
+import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,23 +40,42 @@ import org.springframework.stereotype.Component;
public class TaskPluginManager {
private static final Logger logger =
LoggerFactory.getLogger(TaskPluginManager.class);
- private final Map<String, TaskChannel> taskChannelMap = new
ConcurrentHashMap<>();
+ private final Map<String, TaskChannelFactory> taskChannelFactoryMap = new
HashMap<>();
+ private final Map<String, TaskChannel> taskChannelMap = new HashMap<>();
- private final PluginDao pluginDao;
+ private final AtomicBoolean loadedFlag = new AtomicBoolean(false);
- public TaskPluginManager(PluginDao pluginDao) {
- this.pluginDao = pluginDao;
- }
+ /**
+ * Load task plugins from classpath.
+ */
+ public void loadPlugin() {
+ if (!loadedFlag.compareAndSet(false, true)) {
+ logger.warn("The task plugin has already been loaded");
+ return;
+ }
+ ServiceLoader.load(TaskChannelFactory.class).forEach(factory -> {
+ final String name = factory.getName();
- private void loadTaskChannel(TaskChannelFactory taskChannelFactory) {
- TaskChannel taskChannel = taskChannelFactory.create();
- taskChannelMap.put(taskChannelFactory.getName(), taskChannel);
+ logger.info("Registering task plugin: {}", name);
+
+ if (taskChannelFactoryMap.containsKey(name)) {
+ throw new TaskPluginException(format("Duplicate task plugins
named '%s'", name));
+ }
+ taskChannelFactoryMap.put(name, factory);
+ taskChannelMap.put(name, factory.create());
+
+ logger.info("Registered task plugin: {}", name);
+ });
}
public Map<String, TaskChannel> getTaskChannelMap() {
return Collections.unmodifiableMap(taskChannelMap);
}
+ public Map<String, TaskChannelFactory> getTaskChannelFactoryMap() {
+ return Collections.unmodifiableMap(taskChannelFactoryMap);
+ }
+
public TaskChannel getTaskChannel(String type) {
return this.getTaskChannelMap().get(type);
}
@@ -85,30 +97,4 @@ public class TaskPluginManager {
return taskChannel.parseParameters(parametersNode);
}
- public void installPlugin() {
- final Set<String> names = new HashSet<>();
-
- ServiceLoader.load(TaskChannelFactory.class).forEach(factory -> {
- final String name = factory.getName();
-
- logger.info("Registering task plugin: {}", name);
-
- if (!names.add(name)) {
- throw new TaskPluginException(format("Duplicate task plugins
named '%s'", name));
- }
-
- loadTaskChannel(factory);
-
- logger.info("Registered task plugin: {}", name);
-
- List<PluginParams> params = factory.getParams();
- String paramsJson =
PluginParamsTransfer.transferParamsToJson(params);
-
- PluginDefine pluginDefine = new PluginDefine(name,
PluginType.TASK.getDesc(), paramsJson);
- int count = pluginDao.addOrUpdatePluginDefine(pluginDefine);
- if (count <= 0) {
- throw new TaskPluginException("Failed to update task plugin: "
+ name);
- }
- });
- }
}
diff --git a/dolphinscheduler-worker/pom.xml b/dolphinscheduler-worker/pom.xml
index 45a9890738..adb5bfcde5 100644
--- a/dolphinscheduler-worker/pom.xml
+++ b/dolphinscheduler-worker/pom.xml
@@ -34,6 +34,12 @@
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-service</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-dao</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index d03a2a951b..6b4b27a46e 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -88,8 +88,6 @@ public class WorkerServer implements IStoppable {
@Autowired
private WorkerRegistryClient workerRegistryClient;
- // todo: Can we just load the task spi, and don't install into mysql?
- // we don't need to rely the dao module in worker.
@Autowired
private TaskPluginManager taskPluginManager;
@@ -116,7 +114,7 @@ public class WorkerServer implements IStoppable {
public void run() {
this.workerRpcServer.start();
this.workerRpcClient.start();
- this.taskPluginManager.installPlugin();
+ this.taskPluginManager.loadPlugin();
this.workerRegistryClient.registry();
this.workerRegistryClient.setRegistryStoppable(this);