This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 7029062 [Improvement][Task]Check the task plugin configuration when
the worke… (#6184)
7029062 is described below
commit 7029062f4c2f247e9eac333e28e36e66b03fb435
Author: Kirs <[email protected]>
AuthorDate: Tue Sep 14 01:41:54 2021 +0800
[Improvement][Task]Check the task plugin configuration when the worke…
(#6184)
* [Improvement][Task]Check the task plugin configuration when the worker
starts
---
.../server/worker/WorkerServer.java | 10 +++++--
.../server/worker/runner/TaskExecuteThread.java | 19 ++++++++-----
.../spi/exception/PluginNotFoundException.java | 31 ++++++++++++++++++++++
.../dolphinscheduler/server/StandaloneServer.java | 13 +++++++++
4 files changed, 65 insertions(+), 8 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 05a0140..9705b44 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -35,10 +35,13 @@ import
org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThr
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.spi.exception.PluginNotFoundException;
import org.apache.dolphinscheduler.spi.plugin.DolphinPluginLoader;
import org.apache.dolphinscheduler.spi.plugin.DolphinPluginManagerConfig;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
+import org.apache.commons.collections4.MapUtils;
+
import java.util.Set;
import javax.annotation.PostConstruct;
@@ -180,12 +183,15 @@ public class WorkerServer implements IStoppable {
taskPluginManagerConfig.setMavenLocalRepository(workerConfig.getMavenLocalRepository().trim());
}
- DolphinPluginLoader alertPluginLoader = new
DolphinPluginLoader(taskPluginManagerConfig,
ImmutableList.of(taskPluginManager));
+ DolphinPluginLoader taskPluginLoader = new
DolphinPluginLoader(taskPluginManagerConfig,
ImmutableList.of(taskPluginManager));
try {
- alertPluginLoader.loadPlugins();
+ taskPluginLoader.loadPlugins();
} catch (Exception e) {
throw new RuntimeException("Load Task Plugin Failed !", e);
}
+ if (MapUtils.isEmpty(taskPluginManager.getTaskChannelMap())) {
+ throw new PluginNotFoundException("Task Plugin Not Found,Please
Check Config File");
+ }
}
public void close(String cause) {
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index c06b265..5a164e8 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -35,6 +35,7 @@ import
org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
+import org.apache.dolphinscheduler.spi.exception.PluginNotFoundException;
import org.apache.dolphinscheduler.spi.task.AbstractTask;
import org.apache.dolphinscheduler.spi.task.TaskChannel;
import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
@@ -61,7 +62,7 @@ import org.slf4j.LoggerFactory;
import com.github.rholder.retry.RetryException;
/**
- * task scheduler thread
+ * task scheduler thread
*/
public class TaskExecuteThread implements Runnable, Delayed {
@@ -103,7 +104,8 @@ public class TaskExecuteThread implements Runnable, Delayed
{
private TaskPluginManager taskPluginManager;
/**
- * constructor
+ * constructor
+ *
* @param taskExecutionContext taskExecutionContext
* @param taskCallbackService taskCallbackService
*/
@@ -128,7 +130,7 @@ public class TaskExecuteThread implements Runnable, Delayed
{
@Override
public void run() {
- TaskExecuteResponseCommand responseCommand = new
TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId(),taskExecutionContext.getProcessInstanceId());
+ TaskExecuteResponseCommand responseCommand = new
TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId(),
taskExecutionContext.getProcessInstanceId());
try {
logger.info("script path : {}",
taskExecutionContext.getExecutePath());
// check if the OS user exists
@@ -161,6 +163,9 @@ public class TaskExecuteThread implements Runnable, Delayed
{
taskExecutionContext.getTaskInstanceId()));
TaskChannel taskChannel =
taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());
+ if (null == taskChannel) {
+ throw new PluginNotFoundException(String.format("%s Task
Plugin Not Found,Please Check Config File.",
taskExecutionContext.getTaskType()));
+ }
//TODO Temporary operation, To be adjusted
TaskRequest taskRequest =
JSONUtils.parseObject(JSONUtils.toJsonString(taskExecutionContext),
TaskRequest.class);
@@ -229,6 +234,7 @@ public class TaskExecuteThread implements Runnable, Delayed
{
/**
* get global paras map
+ *
* @return map
*/
private Map<String, String> getGlobalParamsMap() {
@@ -251,7 +257,7 @@ public class TaskExecuteThread implements Runnable, Delayed
{
try {
task.cancelApplication(true);
} catch (Exception e) {
- logger.error(e.getMessage(),e);
+ logger.error(e.getMessage(), e);
}
}
}
@@ -270,7 +276,7 @@ public class TaskExecuteThread implements Runnable, Delayed
{
Set<Map.Entry<String, String>> resEntries = projectRes.entrySet();
- for (Map.Entry<String,String> resource : resEntries) {
+ for (Map.Entry<String, String> resource : resEntries) {
String fullName = resource.getKey();
String tenantCode = resource.getValue();
File resFile = new File(execLocalPath, fullName);
@@ -282,7 +288,7 @@ public class TaskExecuteThread implements Runnable, Delayed
{
logger.info("get resource file from hdfs :{}",
resHdfsPath);
HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath,
execLocalPath + File.separator + fullName, false, true);
} catch (Exception e) {
- logger.error(e.getMessage(),e);
+ logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage());
}
} else {
@@ -329,6 +335,7 @@ public class TaskExecuteThread implements Runnable, Delayed
{
/**
* get current TaskExecutionContext
+ *
* @return TaskExecutionContext
*/
public TaskExecutionContext getTaskExecutionContext() {
diff --git
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/exception/PluginNotFoundException.java
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/exception/PluginNotFoundException.java
new file mode 100644
index 0000000..2153299
--- /dev/null
+++
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/exception/PluginNotFoundException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.spi.exception;
+
+public class PluginNotFoundException extends RuntimeException {
+
+ private static final long serialVersionUID = -5487812425126112159L;
+
+ public PluginNotFoundException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public PluginNotFoundException(String message) {
+ super(message);
+ }
+}
diff --git
a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java
b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java
index 5360dda..94b6ca7 100644
---
a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java
+++
b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java
@@ -61,6 +61,8 @@ public class StandaloneServer {
startAlertServer();
+ setTaskPlugin();
+
new SpringApplicationBuilder(
ApiApplicationServer.class,
MasterServer.class,
@@ -114,4 +116,15 @@ public class StandaloneServer {
final ScriptRunner runner = new ScriptRunner(ds.getConnection(), true,
true);
runner.runScript(new FileReader("sql/dolphinscheduler_h2.sql"));
}
+
+ private static void setTaskPlugin() {
+ final Path taskPluginPath = Paths.get(
+
StandaloneServer.class.getProtectionDomain().getCodeSource().getLocation().getPath(),
+
"../../../dolphinscheduler-task-plugin/dolphinscheduler-task-shell/pom.xml"
+ ).toAbsolutePath();
+ if (Files.exists(taskPluginPath)) {
+ System.setProperty("task.plugin.binding",
taskPluginPath.toString());
+ System.setProperty("task.plugin.dir", "");
+ }
+ }
}