This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new b90152f [SPI][Task]add SQL Task (#6237)
b90152f is described below
commit b90152fc2e09ce1567a10ec929f02b24faa72065
Author: Kirs <[email protected]>
AuthorDate: Fri Sep 17 13:10:44 2021 +0800
[SPI][Task]add SQL Task (#6237)
Add Sql Task And Fix Task Parse Err
---
.../dolphinscheduler-alert-email/pom.xml | 6 -
.../api/service/impl/UdfFuncServiceImpl.java | 2 +-
.../api/service/UdfFuncServiceTest.java | 2 +-
.../src/main/provisio/dolphinscheduler.xml | 5 +
.../builder/TaskExecutionContextBuilder.java | 6 +-
.../server/entity/TaskExecutionContext.java | 3 +
.../master/consumer/TaskPriorityQueueConsumer.java | 16 +-
.../server/worker/runner/TaskExecuteThread.java | 12 +-
.../server/entity/SQLTaskExecutionContextTest.java | 189 -------
.../consumer/TaskPriorityQueueConsumerTest.java | 2 +-
.../spi/common/UiChannelFactory.java | 4 +-
.../spi/task/AbstractParameters.java | 6 +-
.../dolphinscheduler/spi/task/AbstractTask.java | 28 +-
.../dolphinscheduler/spi/task/TaskAlertInfo.java | 35 +-
.../dolphinscheduler/spi/task/TaskChannel.java | 4 +-
.../dolphinscheduler/spi/task/TaskConstants.java | 5 +
.../task/request}/DataxTaskExecutionContext.java | 2 +-
.../spi/task/request}/SQLTaskExecutionContext.java | 12 +-
.../task/request}/SqoopTaskExecutionContext.java | 2 +-
.../spi/task/request/TaskRequest.java | 39 ++
.../spi/task/request/UdfFuncRequest.java | 231 +++++++++
.../request/UdfType.java} | 52 +-
.../plugin/task/api/AbstractYarnTask.java | 2 -
.../plugin/task/http/HttpTaskChannel.java | 1 -
.../dolphinscheduler-task-sql/pom.xml | 12 +-
.../plugin/task/sql/SqlBinds.java} | 27 +-
.../plugin/task/sql/SqlParameters.java | 294 +++++++++++
.../dolphinscheduler/plugin/task/sql/SqlTask.java | 547 +++++++++++++++++++++
.../plugin/task/sql/SqlTaskChannel.java} | 7 +-
.../plugin/task/sql/SqlTaskChannelFactory.java} | 21 +-
.../plugin/task/sql/SqlTaskPlugin.java | 15 +-
.../dolphinscheduler/plugin/task/sql/SqlType.java | 17 +-
32 files changed, 1305 insertions(+), 301 deletions(-)
diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/pom.xml
b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/pom.xml
index 079185c..9181e0d 100644
--- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/pom.xml
+++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/pom.xml
@@ -72,12 +72,6 @@
</dependency>
<dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java
index 62b69fe..f156ead 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java
@@ -204,7 +204,7 @@ public class UdfFuncServiceImpl extends BaseServiceImpl
implements UdfFuncServic
// verify udfFuncName is exist
if (!funcName.equals(udf.getFuncName())) {
if (checkUdfFuncNameExists(funcName)) {
- logger.error("UdfFunc {} has exist, can't create again.",
funcName);
+ logger.error("UdfFuncRequest {} has exist, can't create
again.", funcName);
result.put(Constants.STATUS, Status.UDF_FUNCTION_EXISTS);
result.put(Constants.MSG, Status.UDF_FUNCTION_EXISTS.getMsg());
return result;
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java
index dddfb6d..23ac7b0 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java
@@ -230,7 +230,7 @@ public class UdfFuncServiceTest {
}
/**
- * get UdfFunc id
+ * get UdfFuncRequest id
*/
private UdfFunc getUdfFunc() {
UdfFunc udfFunc = new UdfFunc();
diff --git a/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml
b/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml
index 53e3afb..bc2be45 100644
--- a/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml
+++ b/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml
@@ -106,5 +106,10 @@
<unpack/>
</artifact>
</artifactSet>
+ <artifactSet to="lib/plugin/task/sql">
+ <artifact
id="${project.groupId}:dolphinscheduler-task-sql:zip:${project.version}">
+ <unpack/>
+ </artifact>
+ </artifactSet>
</runtime>
\ No newline at end of file
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
index f37af08..1865d3e 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
@@ -25,11 +25,11 @@ import
org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext;
-import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
-import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
+import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext;
+import org.apache.dolphinscheduler.spi.task.request.SqoopTaskExecutionContext;
/**
* TaskExecutionContext builder
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
index c32c383..52353be 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
@@ -21,6 +21,9 @@ import
org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
+import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext;
+import org.apache.dolphinscheduler.spi.task.request.SqoopTaskExecutionContext;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index bab83db..fc10a29 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -42,10 +42,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
-import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext;
-import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
-import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
@@ -55,6 +52,10 @@ import
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
+import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
+import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext;
+import org.apache.dolphinscheduler.spi.task.request.SqoopTaskExecutionContext;
+import org.apache.dolphinscheduler.spi.task.request.UdfFuncRequest;
import org.apache.commons.lang.StringUtils;
@@ -345,13 +346,14 @@ public class TaskPriorityQueueConsumer extends Thread {
}
List<UdfFunc> udfFuncList =
processService.queryUdfFunListByIds(udfFunIdsArray);
- Map<UdfFunc, String> udfFuncMap = new HashMap<>();
+ UdfFuncRequest udfFuncRequest;
+ Map<UdfFuncRequest, String> udfFuncRequestMap = new HashMap<>();
for (UdfFunc udfFunc : udfFuncList) {
+ udfFuncRequest =
JSONUtils.parseObject(JSONUtils.toJsonString(udfFunc), UdfFuncRequest.class);
String tenantCode =
processService.queryTenantCodeByResName(udfFunc.getResourceName(),
ResourceType.UDF);
- udfFuncMap.put(udfFunc, tenantCode);
+ udfFuncRequestMap.put(udfFuncRequest, tenantCode);
}
-
- sqlTaskExecutionContext.setUdfFuncTenantCodeMap(udfFuncMap);
+ sqlTaskExecutionContext.setUdfFuncTenantCodeMap(udfFuncRequestMap);
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 5a164e8..b0a5625 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -37,6 +37,7 @@ import
org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.spi.exception.PluginNotFoundException;
import org.apache.dolphinscheduler.spi.task.AbstractTask;
+import org.apache.dolphinscheduler.spi.task.TaskAlertInfo;
import org.apache.dolphinscheduler.spi.task.TaskChannel;
import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
@@ -166,8 +167,6 @@ public class TaskExecuteThread implements Runnable, Delayed
{
if (null == taskChannel) {
throw new PluginNotFoundException(String.format("%s Task
Plugin Not Found,Please Check Config File.",
taskExecutionContext.getTaskType()));
}
-
- //TODO Temporary operation, To be adjusted
TaskRequest taskRequest =
JSONUtils.parseObject(JSONUtils.toJsonString(taskExecutionContext),
TaskRequest.class);
task = taskChannel.createTask(taskRequest);
@@ -179,8 +178,9 @@ public class TaskExecuteThread implements Runnable, Delayed
{
this.task.handle();
// task result process
- this.task.after();
-
+ if (this.task.getNeedAlert()) {
+ sendAlert(this.task.getTaskAlertInfo());
+ }
responseCommand.setStatus(this.task.getExitStatus().getCode());
responseCommand.setEndTime(new Date());
responseCommand.setProcessId(this.task.getProcessId());
@@ -203,6 +203,10 @@ public class TaskExecuteThread implements Runnable,
Delayed {
}
}
+ private void sendAlert(TaskAlertInfo taskAlertInfo) {
+ alertClientService.sendAlert(taskAlertInfo.getAlertGroupId(),
taskAlertInfo.getTitle(), taskAlertInfo.getContent());
+ }
+
/**
* when task finish, clear execute path.
*/
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContextTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContextTest.java
deleted file mode 100644
index a2ca6be..0000000
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContextTest.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.entity;
-
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.UdfFunc;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.junit.Test;
-
-public class SQLTaskExecutionContextTest {
-
- /**
- * test parse josn String to TaskExecutionContext
- */
- @Test
- public void testTaskExecutionContext() {
- String contextJson = "{\n"
- + " \"taskInstanceId\":32,\n"
- + " \"taskName\":\"test-hive-func\",\n"
- + " \"startTime\":\"2020-07-19 16:45:46\",\n"
- + " \"taskType\":\"SQL\",\n"
- + " \"host\":null,\n"
- + "
\"executePath\":\"/tmp/dolphinscheduler/exec/process/1/5/14/32\",\n"
- + " \"logPath\":null,\n"
- + "
\"taskJson\":\"{\\\"id\\\":\\\"tasks-70999\\\",\\\"name\\\":\\\"test-hive-func\\\""
- +
",\\\"desc\\\":null,\\\"type\\\":\\\"SQL\\\",\\\"runFlag\\\":\\\"NORMAL\\\","
- + "\\\"loc\\\":null,\\\"maxRetryTimes\\\":0,\\\"retryInterval\\\":1,"
- + "\\\"params\\\":{\\\"type\\\":\\\"HIVE\\\",\\\"datasource\\\":2,"
- + "\\\"sql\\\":\\\"select mid_id, user_id,"
- + " version_code, version_name, lang, source, os, area, model, "
- + "brand, sdk_version, gmail, height_width, app_time, network,"
- + " lng, lat, dt,\\\\n Lower(model)\\\\nfrom dws_uv_detail_day
limit 5;"
- +
"\\\",\\\"udfs\\\":\\\"1\\\",\\\"sqlType\\\":\\\"0\\\",\\\"title\\\":\\\""
- + "test-hive-user-func\\\",\\\"receivers\\\":\\\"[email protected]\\\","
- +
"\\\"receiversCc\\\":\\\"\\\",\\\"showType\\\":\\\"TABLE\\\",\\\"localParams\\\":[],"
- +
"\\\"connParams\\\":\\\"\\\",\\\"preStatements\\\":[],\\\"postStatements\\\":[]},"
- +
"\\\"preTasks\\\":[],\\\"extras\\\":null,\\\"depList\\\":[],\\\"dependence\\\":{},"
- +
"\\\"conditionResult\\\":{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]},"
- +
"\\\"taskInstancePriority\\\":\\\"MEDIUM\\\",\\\"workerGroup\\\":\\\"default\\\","
- +
"\\\"workerGroupId\\\":null,\\\"timeout\\\":{\\\"strategy\\\":\\\"\\\",\\\"interval\\\":null,"
- +
"\\\"enable\\\":false},\\\"conditionsTask\\\":false,\\\"forbidden\\\":false,"
- +
"\\\"taskTimeoutParameter\\\":{\\\"enable\\\":false,\\\"strategy\\\":null,"
- + "\\\"interval\\\":0}}\",\n"
- + " \"processId\":0,\n"
- + " \"appIds\":null,\n"
- + " \"processInstanceId\":14,\n"
- + " \"scheduleTime\":null,\n"
- + " \"globalParams\":null,\n"
- + " \"executorId\":2,\n"
- + " \"cmdTypeIfComplement\":2,\n"
- + " \"tenantCode\":\"sl\",\n"
- + " \"queue\":\"sl\",\n"
- + " \"processDefineId\":5,\n"
- + " \"projectId\":1,\n"
- + " \"taskParams\":null,\n"
- + " \"envFile\":null,\n"
- + " \"definedParams\":null,\n"
- + " \"taskAppId\":null,\n"
- + " \"taskTimeoutStrategy\":0,\n"
- + " \"taskTimeout\":0,\n"
- + " \"workerGroup\":\"default\",\n"
- + " \"resources\":{\n"
- + " },\n"
- + " \"sqlTaskExecutionContext\":{\n"
- + " \"warningGroupId\":0,\n"
- + " \"connectionParams\":\"{\\\"type\\\":null,\\\"address\\\":"
- +
"\\\"jdbc:hive2://localhost:10000\\\",\\\"database\\\":\\\"gmall\\\","
- + "\\\"jdbcUrl\\\":\\\"jdbc:hive2://localhost:10000/gmall\\\","
- +
"\\\"user\\\":\\\"sl-test\\\",\\\"password\\\":\\\"123456sl\\\"}\",\n"
- + " \"udfFuncTenantCodeMap\": null"
- + " },\n"
- + " \"dataxTaskExecutionContext\":{\n"
- + " \"dataSourceId\":0,\n"
- + " \"sourcetype\":0,\n"
- + " \"sourceConnectionParams\":null,\n"
- + " \"dataTargetId\":0,\n"
- + " \"targetType\":0,\n"
- + " \"targetConnectionParams\":null\n"
- + " },\n"
- + " \"dependenceTaskExecutionContext\":null,\n"
- + " \"sqoopTaskExecutionContext\":{\n"
- + " \"dataSourceId\":0,\n"
- + " \"sourcetype\":0,\n"
- + " \"sourceConnectionParams\":null,\n"
- + " \"dataTargetId\":0,\n"
- + " \"targetType\":0,\n"
- + " \"targetConnectionParams\":null\n"
- + " },\n"
- + " \"procedureTaskExecutionContext\":{\n"
- + " \"connectionParams\":null\n"
- + " }\n"
- + "}\n";
-
- TaskExecutionContext taskExecutionContext =
JSONUtils.parseObject(contextJson, TaskExecutionContext.class);
-
- assertNotNull(taskExecutionContext);
- }
-
- @Test
- public void testSqlTaskExecutionContext() {
-
- SQLTaskExecutionContext sqlTaskExecutionContext = new
SQLTaskExecutionContext();
- sqlTaskExecutionContext.setWarningGroupId(0);
-
- Map<UdfFunc, String> udfmap = new HashMap<>();
-
- UdfFunc udfFunc = new UdfFunc();
- udfFunc.setArgTypes("1");
- udfFunc.setId(1);
- udfFunc.setResourceName("name1");
- udfmap.put(udfFunc, "map1");
-
- UdfFunc udfFunc2 = new UdfFunc();
- udfFunc2.setArgTypes("2");
- udfFunc2.setId(2);
- udfFunc2.setResourceName("name2");
- udfmap.put(udfFunc2, "map2");
-
- sqlTaskExecutionContext.setUdfFuncTenantCodeMap(udfmap);
-
- String contextJson = JSONUtils.toJsonString(sqlTaskExecutionContext);
- SQLTaskExecutionContext parseSqlTask = JSONUtils.parseObject(contextJson,
SQLTaskExecutionContext.class);
-
- assertNotNull(parseSqlTask);
- assertEquals(sqlTaskExecutionContext.getWarningGroupId(),
parseSqlTask.getWarningGroupId());
- assertEquals(sqlTaskExecutionContext.getUdfFuncTenantCodeMap().size(),
parseSqlTask.getUdfFuncTenantCodeMap().size());
- }
-
- /**
- * test the SQLTaskExecutionContext
- */
- @Test
- public void testSqlTaskExecutionContextParse() {
-
- // SQLTaskExecutionContext.udfFuncTenantCodeMap is null
- String contextJson = "{\n"
- + " \"warningGroupId\":0,\n"
- + " \"connectionParams\":null,\n"
- + " \"udfFuncTenantCodeMap\":null"
- + "}\n}";
- SQLTaskExecutionContext parseSqlTask = JSONUtils.parseObject(contextJson,
SQLTaskExecutionContext.class);
-
- assertNotNull(parseSqlTask);
- assertEquals(0,parseSqlTask.getWarningGroupId());
- assertNull(parseSqlTask.getUdfFuncTenantCodeMap());
-
- // SQLTaskExecutionContext.udfFuncTenantCodeMap is not null
- contextJson = "{\"warningGroupId\":0,"
- + "\"connectionParams\":null,"
- + "\"udfFuncTenantCodeMap\":{\""
- + "{\\\"id\\\":2,\\\"userId\\\":0,"
- +
"\\\"funcName\\\":null,\\\"className\\\":null,\\\"argTypes\\\":\\\"2\\\",\\\"database\\\":null,"
- +
"\\\"description\\\":null,\\\"resourceId\\\":0,\\\"resourceName\\\":\\\"name2\\\",\\\"type\\\":null,"
- + "\\\"createTime\\\":null,\\\"updateTime\\\":null}\":\"map2\","
- + "\"{\\\"id\\\":1,\\\"userId\\\":0,\\\"funcName\\\":null,"
- + "\\\"className\\\":null,\\\"argTypes\\\":\\\"1\\\","
- + "\\\"database\\\":null,\\\"description\\\":null,"
- + "\\\"resourceId\\\":0,\\\"resourceName\\\":\\\"name1\\\","
- +
"\\\"type\\\":null,\\\"createTime\\\":null,\\\"updateTime\\\":null}\":\"map1\"}}\n";
-
- SQLTaskExecutionContext parseSqlTask2 = JSONUtils.parseObject(contextJson,
SQLTaskExecutionContext.class);
-
- assertNotNull(parseSqlTask2);
- assertEquals(0,parseSqlTask2.getWarningGroupId());
- assertEquals(2,parseSqlTask2.getUdfFuncTenantCodeMap().size());
- }
-
-}
\ No newline at end of file
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
index 2d4a1fe..976e058 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
@@ -32,12 +32,12 @@ import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
-import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
+import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
import java.util.ArrayList;
import java.util.Date;
diff --git
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/common/UiChannelFactory.java
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/common/UiChannelFactory.java
index d0e822b..8b89215 100644
---
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/common/UiChannelFactory.java
+++
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/common/UiChannelFactory.java
@@ -1,4 +1,4 @@
-package org.apache.dolphinscheduler.spi.common;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,6 +15,8 @@ package org.apache.dolphinscheduler.spi.common;/*
* limitations under the License.
*/
+package org.apache.dolphinscheduler.spi.common;
+
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import java.util.List;
diff --git
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractParameters.java
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractParameters.java
index adb9136..55f5203 100644
---
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractParameters.java
+++
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractParameters.java
@@ -1,4 +1,4 @@
-package org.apache.dolphinscheduler.spi.task;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,6 +15,8 @@ package org.apache.dolphinscheduler.spi.task;/*
* limitations under the License.
*/
+package org.apache.dolphinscheduler.spi.task;
+
import org.apache.dolphinscheduler.spi.utils.CollectionUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
@@ -37,7 +39,7 @@ public abstract class AbstractParameters implements
IParameters {
/**
* local parameters
*/
- private List<Property> localParams;
+ protected List<Property> localParams;
/**
* var pool
diff --git
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractTask.java
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractTask.java
index b9c2632..5a6cfd2 100644
---
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractTask.java
+++
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractTask.java
@@ -60,6 +60,10 @@ public abstract class AbstractTask {
*/
protected volatile int exitStatusCode = -1;
+ protected boolean needAlert = false;
+
+ protected TaskAlertInfo taskAlertInfo;
+
/**
* constructor
*
@@ -79,7 +83,6 @@ public abstract class AbstractTask {
return null;
}
-
/**
* task handle
*
@@ -142,6 +145,22 @@ public abstract class AbstractTask {
this.resultString = resultString;
}
+ public boolean getNeedAlert() {
+ return needAlert;
+ }
+
+ public void setNeedAlert(boolean needAlert) {
+ this.needAlert = needAlert;
+ }
+
+ public TaskAlertInfo getTaskAlertInfo() {
+ return taskAlertInfo;
+ }
+
+ public void setTaskAlertInfo(TaskAlertInfo taskAlertInfo) {
+ this.taskAlertInfo = taskAlertInfo;
+ }
+
/**
* get task parameters
*
@@ -150,13 +169,6 @@ public abstract class AbstractTask {
public abstract AbstractParameters getParameters();
/**
- * result processing maybe
- */
- public void after() {
-
- }
-
- /**
* get exit status according to exitCode
*
* @return exit status
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskAlertInfo.java
similarity index 57%
copy from
dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
copy to
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskAlertInfo.java
index 01484c4..75faad5 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
+++
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskAlertInfo.java
@@ -15,22 +15,37 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.plugin.task.http;
+package org.apache.dolphinscheduler.spi.task;
-import org.apache.dolphinscheduler.spi.task.AbstractTask;
-import org.apache.dolphinscheduler.spi.task.TaskChannel;
-import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
+public class TaskAlertInfo {
-public class HttpTaskChannel implements TaskChannel {
+ private String title;
- @Override
- public void cancelApplication(boolean status) {
+ private String content;
+ private Integer alertGroupId;
+
+ public String getTitle() {
+ return title;
+ }
+
+ public void setTitle(String title) {
+ this.title = title;
+ }
+
+ public String getContent() {
+ return content;
+ }
+
+ public void setContent(String content) {
+ this.content = content;
}
- @Override
- public AbstractTask createTask(TaskRequest taskRequest) {
- return new HttpTask(taskRequest);
+ public Integer getAlertGroupId() {
+ return alertGroupId;
}
+ public void setAlertGroupId(Integer alertGroupId) {
+ this.alertGroupId = alertGroupId;
+ }
}
diff --git
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskChannel.java
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskChannel.java
index 7ab3b17..681970f 100644
---
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskChannel.java
+++
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskChannel.java
@@ -1,4 +1,4 @@
-package org.apache.dolphinscheduler.spi.task;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,6 +15,8 @@ package org.apache.dolphinscheduler.spi.task;/*
* limitations under the License.
*/
+package org.apache.dolphinscheduler.spi.task;
+
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
public interface TaskChannel {
diff --git
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskConstants.java
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskConstants.java
index b7d5cbb..5b7ae95 100644
---
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskConstants.java
+++
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskConstants.java
@@ -223,6 +223,11 @@ public class TaskConstants {
public static final int LOG_QUERY_LIMIT = 4096;
/**
+ * default display rows
+ */
+ public static final int DEFAULT_DISPLAY_ROWS = 10;
+
+ /**
* jar
*/
public static final String JAR = "jar";
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/DataxTaskExecutionContext.java
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/DataxTaskExecutionContext.java
similarity index 98%
rename from
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/DataxTaskExecutionContext.java
rename to
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/DataxTaskExecutionContext.java
index dd8d646..276923d 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/DataxTaskExecutionContext.java
+++
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/DataxTaskExecutionContext.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.entity;
+package org.apache.dolphinscheduler.spi.task.request;
import java.io.Serializable;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SQLTaskExecutionContext.java
similarity index 84%
rename from
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java
rename to
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SQLTaskExecutionContext.java
index fa670e2..b712b50 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java
+++
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SQLTaskExecutionContext.java
@@ -15,10 +15,10 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.entity;
+package org.apache.dolphinscheduler.spi.task.request;
+
+import org.apache.dolphinscheduler.spi.task.UdfFuncBean.UdfFuncDeserializer;
-import org.apache.dolphinscheduler.dao.entity.UdfFunc;
-import org.apache.dolphinscheduler.dao.entity.UdfFunc.UdfFuncDeserializer;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import java.io.Serializable;
import java.util.Map;
@@ -42,7 +42,7 @@ public class SQLTaskExecutionContext implements Serializable {
* udf function tenant code map
*/
@JsonDeserialize(keyUsing = UdfFuncDeserializer.class)
- private Map<UdfFunc,String> udfFuncTenantCodeMap;
+ private Map<UdfFuncRequest,String> udfFuncTenantCodeMap;
public int getWarningGroupId() {
@@ -53,11 +53,11 @@ public class SQLTaskExecutionContext implements
Serializable {
this.warningGroupId = warningGroupId;
}
- public Map<UdfFunc, String> getUdfFuncTenantCodeMap() {
+ public Map<UdfFuncRequest, String> getUdfFuncTenantCodeMap() {
return udfFuncTenantCodeMap;
}
- public void setUdfFuncTenantCodeMap(Map<UdfFunc, String>
udfFuncTenantCodeMap) {
+ public void setUdfFuncTenantCodeMap(Map<UdfFuncRequest, String>
udfFuncTenantCodeMap) {
this.udfFuncTenantCodeMap = udfFuncTenantCodeMap;
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SqoopTaskExecutionContext.java
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SqoopTaskExecutionContext.java
similarity index 98%
rename from
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SqoopTaskExecutionContext.java
rename to
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SqoopTaskExecutionContext.java
index c74414b..720892c 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SqoopTaskExecutionContext.java
+++
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SqoopTaskExecutionContext.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.entity;
+package org.apache.dolphinscheduler.spi.task.request;
import java.io.Serializable;
diff --git
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/TaskRequest.java
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/TaskRequest.java
index 0de96e1..3e131f2 100644
---
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/TaskRequest.java
+++
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/TaskRequest.java
@@ -188,6 +188,22 @@ public class TaskRequest {
private Map<String, Property> paramsMap;
+
+ /**
+ * sql TaskExecutionContext
+ */
+ private SQLTaskExecutionContext sqlTaskExecutionContext;
+
+ /**
+ * datax TaskExecutionContext
+ */
+ private DataxTaskExecutionContext dataxTaskExecutionContext;
+
+ /**
+ * sqoop TaskExecutionContext
+ */
+ private SqoopTaskExecutionContext sqoopTaskExecutionContext;
+
public Map<String, String> getResources() {
return resources;
}
@@ -428,4 +444,27 @@ public class TaskRequest {
this.delayTime = delayTime;
}
+ public SQLTaskExecutionContext getSqlTaskExecutionContext() {
+ return sqlTaskExecutionContext;
+ }
+
+ public void setSqlTaskExecutionContext(SQLTaskExecutionContext
sqlTaskExecutionContext) {
+ this.sqlTaskExecutionContext = sqlTaskExecutionContext;
+ }
+
+ public DataxTaskExecutionContext getDataxTaskExecutionContext() {
+ return dataxTaskExecutionContext;
+ }
+
+ public void setDataxTaskExecutionContext(DataxTaskExecutionContext
dataxTaskExecutionContext) {
+ this.dataxTaskExecutionContext = dataxTaskExecutionContext;
+ }
+
+ public SqoopTaskExecutionContext getSqoopTaskExecutionContext() {
+ return sqoopTaskExecutionContext;
+ }
+
+ public void setSqoopTaskExecutionContext(SqoopTaskExecutionContext
sqoopTaskExecutionContext) {
+ this.sqoopTaskExecutionContext = sqoopTaskExecutionContext;
+ }
}
diff --git
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/UdfFuncRequest.java
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/UdfFuncRequest.java
new file mode 100644
index 0000000..cbbd9e2
--- /dev/null
+++
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/UdfFuncRequest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.spi.task.request;
+
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.io.IOException;
+import java.util.Date;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.KeyDeserializer;
+
+/**
+ * udf function
+ */
+public class UdfFuncRequest {
+ /**
+ * id
+ */
+ private int id;
+ /**
+ * user id
+ */
+ private int userId;
+
+ /**
+ * udf function name
+ */
+ private String funcName;
+
+ /**
+ * udf class name
+ */
+ private String className;
+
+ /**
+ * udf argument types
+ */
+ private String argTypes;
+
+ /**
+ * udf data base
+ */
+ private String database;
+
+ /**
+ * udf description
+ */
+ private String description;
+
+ /**
+ * resource id
+ */
+ private int resourceId;
+
+ /**
+ * resource name
+ */
+ private String resourceName;
+
+ /**
+ * udf function type: hive / spark
+ */
+ private UdfType type;
+
+ /**
+ * create time
+ */
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
+ private Date createTime;
+
+ /**
+ * update time
+ */
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
+ private Date updateTime;
+
+ public int getId() {
+ return id;
+ }
+
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ public int getUserId() {
+ return userId;
+ }
+
+ public void setUserId(int userId) {
+ this.userId = userId;
+ }
+
+ public String getFuncName() {
+ return funcName;
+ }
+
+ public void setFuncName(String funcName) {
+ this.funcName = funcName;
+ }
+
+ public String getClassName() {
+ return className;
+ }
+
+ public void setClassName(String className) {
+ this.className = className;
+ }
+
+ public String getArgTypes() {
+ return argTypes;
+ }
+
+ public void setArgTypes(String argTypes) {
+ this.argTypes = argTypes;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public void setDatabase(String database) {
+ this.database = database;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
+ public int getResourceId() {
+ return resourceId;
+ }
+
+ public void setResourceId(int resourceId) {
+ this.resourceId = resourceId;
+ }
+
+ public String getResourceName() {
+ return resourceName;
+ }
+
+ public void setResourceName(String resourceName) {
+ this.resourceName = resourceName;
+ }
+
+ public UdfType getType() {
+ return type;
+ }
+
+ public void setType(UdfType type) {
+ this.type = type;
+ }
+
+ public Date getCreateTime() {
+ return createTime;
+ }
+
+ public void setCreateTime(Date createTime) {
+ this.createTime = createTime;
+ }
+
+ public Date getUpdateTime() {
+ return updateTime;
+ }
+
+ public void setUpdateTime(Date updateTime) {
+ this.updateTime = updateTime;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ UdfFuncRequest udfFuncRequest = (UdfFuncRequest) o;
+
+ if (id != udfFuncRequest.id) {
+ return false;
+ }
+ return !(funcName != null ? !funcName.equals(udfFuncRequest.funcName)
: udfFuncRequest.funcName != null);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = id;
+ result = 31 * result + (funcName != null ? funcName.hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return JSONUtils.toJsonString(this);
+ }
+
+ public static class UdfFuncDeserializer extends KeyDeserializer {
+
+ @Override
+ public Object deserializeKey(String key, DeserializationContext ctxt)
throws IOException {
+ if (StringUtils.isBlank(key)) {
+ return null;
+ }
+ return JSONUtils.parseObject(key, UdfFuncRequest.class);
+ }
+ }
+}
diff --git
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/common/UiChannelFactory.java
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/UdfType.java
similarity index 54%
copy from
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/common/UiChannelFactory.java
copy to
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/UdfType.java
index d0e822b..2b31087 100644
---
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/common/UiChannelFactory.java
+++
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/UdfType.java
@@ -1,4 +1,4 @@
-package org.apache.dolphinscheduler.spi.common;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,27 +15,41 @@ package org.apache.dolphinscheduler.spi.common;/*
* limitations under the License.
*/
-import org.apache.dolphinscheduler.spi.params.base.PluginParams;
-
-import java.util.List;
-
-public interface UiChannelFactory {
+package org.apache.dolphinscheduler.spi.task.request;
+/**
+ * UDF type
+ */
+public enum UdfType {
/**
- * plugin name
- * Must be UNIQUE .
- * This alert plugin name eg: email , message ...
- * Name can often be displayed on the page ui eg : email , message , MR ,
spark , hive ...
- *
- * @return this alert plugin name
+ * 0 hive; 1 spark
*/
- String getName();
+ HIVE(0, "hive"),
+ SPARK(1, "spark");
- /**
- * Returns the configurable parameters that this plugin needs to display
on the web ui
- *
- * @return this alert plugin params
- */
- List<PluginParams> getParams();
+ UdfType(int code, String descp) {
+ this.code = code;
+ this.descp = descp;
+ }
+
+ private final int code;
+ private final String descp;
+
+ public int getCode() {
+ return code;
+ }
+
+ public String getDescp() {
+ return descp;
+ }
+
+ public static UdfType of(int type) {
+ for (UdfType ut : values()) {
+ if (ut.getCode() == type) {
+ return ut;
+ }
+ }
+ throw new IllegalArgumentException("invalid type : " + type);
+ }
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
index d5e3e0d..d2c7c55 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
@@ -65,8 +65,6 @@ public abstract class AbstractYarnTask extends
AbstractTaskExecutor {
public void cancelApplication(boolean status) throws Exception {
cancel = true;
// cancel process
-
- //todo 交给上层处理
shellCommandExecutor.cancelApplication();
// TaskInstance taskInstance =
processService.findTaskInstanceById(taskExecutionContext.getTaskInstanceId());
// if (status && taskInstance != null){
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
index 01484c4..6f78063 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
@@ -32,5 +32,4 @@ public class HttpTaskChannel implements TaskChannel {
public AbstractTask createTask(TaskRequest taskRequest) {
return new HttpTask(taskRequest);
}
-
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/pom.xml
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/pom.xml
index 6f56744..09ddf41 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/pom.xml
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/pom.xml
@@ -26,6 +26,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>dolphinscheduler-task-sql</artifactId>
+ <packaging>dolphinscheduler-plugin</packaging>
<dependencies>
<dependency>
@@ -39,8 +40,15 @@
<version>${project.version}</version>
</dependency>
- </dependencies>
-
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <finalName>dolphinscheduler-task-sql-${project.version}</finalName>
+ </build>
</project>
\ No newline at end of file
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlBinds.java
similarity index 58%
copy from
dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
copy to
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlBinds.java
index 01484c4..a90c8db 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlBinds.java
@@ -15,22 +15,29 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.plugin.task.http;
+package org.apache.dolphinscheduler.plugin.task.sql;
-import org.apache.dolphinscheduler.spi.task.AbstractTask;
-import org.apache.dolphinscheduler.spi.task.TaskChannel;
-import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
+import org.apache.dolphinscheduler.spi.task.Property;
-public class HttpTaskChannel implements TaskChannel {
+import java.util.Map;
- @Override
- public void cancelApplication(boolean status) {
+/**
+ * Used to contains both prepared sql string and its to-be-bind parameters
+ */
+public class SqlBinds {
+ private final String sql;
+ private final Map<Integer, Property> paramsMap;
+ public SqlBinds(String sql, Map<Integer, Property> paramsMap) {
+ this.sql = sql;
+ this.paramsMap = paramsMap;
}
- @Override
- public AbstractTask createTask(TaskRequest taskRequest) {
- return new HttpTask(taskRequest);
+ public String getSql() {
+ return sql;
}
+ public Map<Integer, Property> getParamsMap() {
+ return paramsMap;
+ }
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlParameters.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlParameters.java
new file mode 100644
index 0000000..487c5bd
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlParameters.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.sql;
+
+import org.apache.dolphinscheduler.spi.enums.DataType;
+import org.apache.dolphinscheduler.spi.task.AbstractParameters;
+import org.apache.dolphinscheduler.spi.task.Property;
+import org.apache.dolphinscheduler.spi.task.ResourceInfo;
+import org.apache.dolphinscheduler.spi.utils.CollectionUtils;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Sql/Hql parameter
+ */
+public class SqlParameters extends AbstractParameters {
+ /**
+ * data source type,eg MYSQL, POSTGRES, HIVE ...
+ */
+ private String type;
+
+ /**
+ * datasource id
+ */
+ private int datasource;
+
+ /**
+ * sql
+ */
+ private String sql;
+
+ /**
+ * sql type
+ * 0 query
+ * 1 NON_QUERY
+ */
+ private int sqlType;
+
+ /**
+ * send email
+ */
+ private Boolean sendEmail;
+
+ /**
+ * display rows
+ */
+ private int displayRows;
+
+ /**
+ * udf list
+ */
+ private String udfs;
+ /**
+ * show type
+ * 0 TABLE
+ * 1 TEXT
+ * 2 attachment
+ * 3 TABLE+attachment
+ */
+ private String showType;
+ /**
+ * SQL connection parameters
+ */
+ private String connParams;
+ /**
+ * Pre Statements
+ */
+ private List<String> preStatements;
+ /**
+ * Post Statements
+ */
+ private List<String> postStatements;
+
+ /**
+ * groupId
+ */
+ private int groupId;
+ /**
+ * title
+ */
+ private String title;
+
+ private int limit;
+
+ public int getLimit() {
+ return limit;
+ }
+
+ public void setLimit(int limit) {
+ this.limit = limit;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public int getDatasource() {
+ return datasource;
+ }
+
+ public void setDatasource(int datasource) {
+ this.datasource = datasource;
+ }
+
+ public String getSql() {
+ return sql;
+ }
+
+ public void setSql(String sql) {
+ this.sql = sql;
+ }
+
+ public String getUdfs() {
+ return udfs;
+ }
+
+ public void setUdfs(String udfs) {
+ this.udfs = udfs;
+ }
+
+ public int getSqlType() {
+ return sqlType;
+ }
+
+ public void setSqlType(int sqlType) {
+ this.sqlType = sqlType;
+ }
+
+ public Boolean getSendEmail() {
+ return sendEmail;
+ }
+
+ public void setSendEmail(Boolean sendEmail) {
+ this.sendEmail = sendEmail;
+ }
+
+ public int getDisplayRows() {
+ return displayRows;
+ }
+
+ public void setDisplayRows(int displayRows) {
+ this.displayRows = displayRows;
+ }
+
+ public String getShowType() {
+ return showType;
+ }
+
+ public void setShowType(String showType) {
+ this.showType = showType;
+ }
+
+ public String getConnParams() {
+ return connParams;
+ }
+
+ public void setConnParams(String connParams) {
+ this.connParams = connParams;
+ }
+
+ public String getTitle() {
+ return title;
+ }
+
+ public void setTitle(String title) {
+ this.title = title;
+ }
+
+ public List<String> getPreStatements() {
+ return preStatements;
+ }
+
+ public void setPreStatements(List<String> preStatements) {
+ this.preStatements = preStatements;
+ }
+
+ public List<String> getPostStatements() {
+ return postStatements;
+ }
+
+ public void setPostStatements(List<String> postStatements) {
+ this.postStatements = postStatements;
+ }
+
+ public int getGroupId() {
+ return groupId;
+ }
+
+ public void setGroupId(int groupId) {
+ this.groupId = groupId;
+ }
+
+ @Override
+ public boolean checkParameters() {
+ return datasource != 0 && StringUtils.isNotEmpty(type) &&
StringUtils.isNotEmpty(sql);
+ }
+
+ @Override
+ public List<ResourceInfo> getResourceFilesList() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public void dealOutParam(String result) {
+ if (CollectionUtils.isEmpty(localParams)) {
+ return;
+ }
+ List<Property> outProperty = getOutProperty(localParams);
+ if (CollectionUtils.isEmpty(outProperty)) {
+ return;
+ }
+ if (StringUtils.isEmpty(result)) {
+ varPool.addAll(outProperty);
+ return;
+ }
+ List<Map<String, String>> sqlResult = getListMapByString(result);
+ if (CollectionUtils.isEmpty(sqlResult)) {
+ return;
+ }
+ //if sql return more than one line
+ if (sqlResult.size() > 1) {
+ Map<String, List<String>> sqlResultFormat = new HashMap<>();
+ //init sqlResultFormat
+ Set<String> keySet = sqlResult.get(0).keySet();
+ for (String key : keySet) {
+ sqlResultFormat.put(key, new ArrayList<>());
+ }
+ for (Map<String, String> info : sqlResult) {
+ for (String key : info.keySet()) {
+
sqlResultFormat.get(key).add(String.valueOf(info.get(key)));
+ }
+ }
+ for (Property info : outProperty) {
+ if (info.getType() == DataType.LIST) {
+
info.setValue(JSONUtils.toJsonString(sqlResultFormat.get(info.getProp())));
+ varPool.add(info);
+ }
+ }
+ } else {
+ //result only one line
+ Map<String, String> firstRow = sqlResult.get(0);
+ for (Property info : outProperty) {
+ info.setValue(String.valueOf(firstRow.get(info.getProp())));
+ varPool.add(info);
+ }
+ }
+
+ }
+
+ @Override
+ public String toString() {
+ return "SqlParameters{"
+ + "type='" + type + '\''
+ + ", datasource=" + datasource
+ + ", sql='" + sql + '\''
+ + ", sqlType=" + sqlType
+ + ", sendEmail=" + sendEmail
+ + ", displayRows=" + displayRows
+ + ", limit=" + limit
+ + ", udfs='" + udfs + '\''
+ + ", showType='" + showType + '\''
+ + ", connParams='" + connParams + '\''
+ + ", groupId='" + groupId + '\''
+ + ", title='" + title + '\''
+ + ", preStatements=" + preStatements
+ + ", postStatements=" + postStatements
+ + '}';
+ }
+}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
new file mode 100644
index 0000000..6fd4397
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.sql;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.datasource.BaseConnectionParam;
+import org.apache.dolphinscheduler.plugin.task.datasource.DatasourceUtil;
+import org.apache.dolphinscheduler.plugin.task.util.MapUtils;
+import org.apache.dolphinscheduler.spi.enums.DbType;
+import org.apache.dolphinscheduler.spi.enums.TaskTimeoutStrategy;
+import org.apache.dolphinscheduler.spi.task.AbstractParameters;
+import org.apache.dolphinscheduler.spi.task.Direct;
+import org.apache.dolphinscheduler.spi.task.Property;
+import org.apache.dolphinscheduler.spi.task.TaskAlertInfo;
+import org.apache.dolphinscheduler.spi.task.TaskConstants;
+import org.apache.dolphinscheduler.spi.task.paramparser.ParamUtils;
+import org.apache.dolphinscheduler.spi.task.paramparser.ParameterUtils;
+import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext;
+import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
+import org.apache.dolphinscheduler.spi.task.request.UdfFuncRequest;
+import org.apache.dolphinscheduler.spi.utils.CollectionUtils;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class SqlTask extends AbstractYarnTask {
+
+ /**
+ * taskExecutionContext
+ */
+ private TaskRequest taskExecutionContext;
+
+ /**
+ * sql parameters
+ */
+ private SqlParameters sqlParameters;
+
+ /**
+ * base datasource
+ */
+ private BaseConnectionParam baseConnectionParam;
+
+ /**
+ * create function format
+ */
+ private static final String CREATE_FUNCTION_FORMAT = "create temporary
function {0} as ''{1}''";
+
+ /**
+ * Abstract Yarn Task
+ *
+ * @param taskRequest taskRequest
+ */
+ public SqlTask(TaskRequest taskRequest) {
+ super(taskRequest);
+ this.taskExecutionContext = taskRequest;
+ this.sqlParameters =
JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
SqlParameters.class);
+
+ assert sqlParameters != null;
+ if (!sqlParameters.checkParameters()) {
+ throw new RuntimeException("sql task params is not valid");
+ }
+ }
+
+ @Override
+ public AbstractParameters getParameters() {
+ return null;
+ }
+
+ @Override
+ protected String buildCommand() {
+ return null;
+ }
+
+ @Override
+ protected void setMainJarName() {
+
+ }
+
+ @Override
+ public void handle() throws Exception {
+ // set the name of the current thread
+ String threadLoggerInfoName =
String.format(TaskConstants.TASK_LOG_INFO_FORMAT,
taskExecutionContext.getTaskAppId());
+ Thread.currentThread().setName(threadLoggerInfoName);
+
+ logger.info("Full sql parameters: {}", sqlParameters);
+ logger.info("sql type : {}, datasource : {}, sql : {} , localParams :
{},udfs : {},showType : {},connParams : {},varPool : {} ,query max result limit
{}",
+ sqlParameters.getType(),
+ sqlParameters.getDatasource(),
+ sqlParameters.getSql(),
+ sqlParameters.getLocalParams(),
+ sqlParameters.getUdfs(),
+ sqlParameters.getShowType(),
+ sqlParameters.getConnParams(),
+ sqlParameters.getVarPool(),
+ sqlParameters.getLimit());
+ try {
+ SQLTaskExecutionContext sqlTaskExecutionContext =
taskExecutionContext.getSqlTaskExecutionContext();
+
+ // get datasource
+ baseConnectionParam = (BaseConnectionParam)
DatasourceUtil.buildConnectionParams(
+ DbType.valueOf(sqlParameters.getType()),
+ sqlTaskExecutionContext.getConnectionParams());
+
+ // ready to execute SQL and parameter entity Map
+ SqlBinds mainSqlBinds =
getSqlAndSqlParamsMap(sqlParameters.getSql());
+ List<SqlBinds> preStatementSqlBinds =
Optional.ofNullable(sqlParameters.getPreStatements())
+ .orElse(new ArrayList<>())
+ .stream()
+ .map(this::getSqlAndSqlParamsMap)
+ .collect(Collectors.toList());
+ List<SqlBinds> postStatementSqlBinds =
Optional.ofNullable(sqlParameters.getPostStatements())
+ .orElse(new ArrayList<>())
+ .stream()
+ .map(this::getSqlAndSqlParamsMap)
+ .collect(Collectors.toList());
+
+ List<String> createFuncs =
createFuncs(sqlTaskExecutionContext.getUdfFuncTenantCodeMap(),
+ logger);
+
+ // execute sql task
+ executeFuncAndSql(mainSqlBinds, preStatementSqlBinds,
postStatementSqlBinds, createFuncs);
+
+ setExitStatusCode(TaskConstants.EXIT_CODE_SUCCESS);
+
+ } catch (Exception e) {
+ setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+ logger.error("sql task error: {}", e.toString());
+ throw e;
+ }
+ }
+
+ /**
+ * execute function and sql
+ *
+ * @param mainSqlBinds main sql binds
+ * @param preStatementsBinds pre statements binds
+ * @param postStatementsBinds post statements binds
+ * @param createFuncs create functions
+ */
+ public void executeFuncAndSql(SqlBinds mainSqlBinds,
+ List<SqlBinds> preStatementsBinds,
+ List<SqlBinds> postStatementsBinds,
+ List<String> createFuncs) throws Exception {
+ Connection connection = null;
+ PreparedStatement stmt = null;
+ ResultSet resultSet = null;
+ try {
+
+ // create connection
+ connection =
DatasourceUtil.getConnection(DbType.valueOf(sqlParameters.getType()),
baseConnectionParam);
+ // create temp function
+ if (CollectionUtils.isNotEmpty(createFuncs)) {
+ createTempFunction(connection, createFuncs);
+ }
+
+ // pre sql
+ preSql(connection, preStatementsBinds);
+ stmt = prepareStatementAndBind(connection, mainSqlBinds);
+
+ String result = null;
+ // decide whether to executeQuery or executeUpdate based on sqlType
+ if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) {
+ // query statements need to be convert to JsonArray and
inserted into Alert to send
+ resultSet = stmt.executeQuery();
+ result = resultProcess(resultSet);
+
+ } else if (sqlParameters.getSqlType() ==
SqlType.NON_QUERY.ordinal()) {
+ // non query statement
+ String updateResult = String.valueOf(stmt.executeUpdate());
+ result = setNonQuerySqlReturn(updateResult,
sqlParameters.getLocalParams());
+ }
+ //deal out params
+ sqlParameters.dealOutParam(result);
+ postSql(connection, postStatementsBinds);
+ } catch (Exception e) {
+ logger.error("execute sql error: {}", e.getMessage());
+ throw e;
+ } finally {
+ close(resultSet, stmt, connection);
+ }
+ }
+
+ private String setNonQuerySqlReturn(String updateResult, List<Property>
properties) {
+ String result = null;
+ for (Property info : properties) {
+ if (Direct.OUT == info.getDirect()) {
+ List<Map<String, String>> updateRL = new ArrayList<>();
+ Map<String, String> updateRM = new HashMap<>();
+ updateRM.put(info.getProp(), updateResult);
+ updateRL.add(updateRM);
+ result = JSONUtils.toJsonString(updateRL);
+ break;
+ }
+ }
+ return result;
+ }
+
+ /**
+ * result process
+ *
+ * @param resultSet resultSet
+ * @throws Exception Exception
+ */
+ private String resultProcess(ResultSet resultSet) throws Exception {
+ ArrayNode resultJSONArray = JSONUtils.createArrayNode();
+ if (resultSet != null) {
+ ResultSetMetaData md = resultSet.getMetaData();
+ int num = md.getColumnCount();
+
+ int rowCount = 0;
+
+ while (rowCount < sqlParameters.getLimit() && resultSet.next()) {
+ ObjectNode mapOfColValues = JSONUtils.createObjectNode();
+ for (int i = 1; i <= num; i++) {
+ mapOfColValues.set(md.getColumnLabel(i),
JSONUtils.toJsonNode(resultSet.getObject(i)));
+ }
+ resultJSONArray.add(mapOfColValues);
+ rowCount++;
+ }
+
+ int displayRows = sqlParameters.getDisplayRows() > 0 ?
sqlParameters.getDisplayRows() : TaskConstants.DEFAULT_DISPLAY_ROWS;
+ displayRows = Math.min(displayRows, resultJSONArray.size());
+ logger.info("display sql result {} rows as follows:", displayRows);
+ for (int i = 0; i < displayRows; i++) {
+ String row = JSONUtils.toJsonString(resultJSONArray.get(i));
+ logger.info("row {} : {}", i + 1, row);
+ }
+ }
+ String result = JSONUtils.toJsonString(resultJSONArray);
+ if (sqlParameters.getSendEmail() == null ||
sqlParameters.getSendEmail()) {
+ sendAttachment(sqlParameters.getGroupId(),
StringUtils.isNotEmpty(sqlParameters.getTitle())
+ ? sqlParameters.getTitle()
+ : taskExecutionContext.getTaskName() + " query result
sets", result);
+ }
+ logger.debug("execute sql result : {}", result);
+ return result;
+ }
+
+ /**
+ * send alert as an attachment
+ *
+ * @param title title
+ * @param content content
+ */
+ private void sendAttachment(int groupId, String title, String content) {
+ setNeedAlert(Boolean.TRUE);
+ TaskAlertInfo taskAlertInfo = new TaskAlertInfo();
+ taskAlertInfo.setAlertGroupId(groupId);
+ taskAlertInfo.setContent(content);
+ taskAlertInfo.setTitle(title);
+ }
+
+ /**
+ * pre sql
+ *
+ * @param connection connection
+ * @param preStatementsBinds preStatementsBinds
+ */
+ private void preSql(Connection connection,
+ List<SqlBinds> preStatementsBinds) throws Exception {
+ for (SqlBinds sqlBind : preStatementsBinds) {
+ try (PreparedStatement pstmt = prepareStatementAndBind(connection,
sqlBind)) {
+ int result = pstmt.executeUpdate();
+ logger.info("pre statement execute result: {}, for sql: {}",
result, sqlBind.getSql());
+
+ }
+ }
+ }
+
+ /**
+ * post sql
+ *
+ * @param connection connection
+ * @param postStatementsBinds postStatementsBinds
+ */
+ private void postSql(Connection connection,
+ List<SqlBinds> postStatementsBinds) throws Exception {
+ for (SqlBinds sqlBind : postStatementsBinds) {
+ try (PreparedStatement pstmt = prepareStatementAndBind(connection,
sqlBind)) {
+ int result = pstmt.executeUpdate();
+ logger.info("post statement execute result: {},for sql: {}",
result, sqlBind.getSql());
+ }
+ }
+ }
+
+ /**
+ * create temp function
+ *
+ * @param connection connection
+ * @param createFuncs createFuncs
+ */
+ private void createTempFunction(Connection connection,
+ List<String> createFuncs) throws Exception
{
+ try (Statement funcStmt = connection.createStatement()) {
+ for (String createFunc : createFuncs) {
+ logger.info("hive create function sql: {}", createFunc);
+ funcStmt.execute(createFunc);
+ }
+ }
+ }
+
+ /**
+ * close jdbc resource
+ *
+ * @param resultSet resultSet
+ * @param pstmt pstmt
+ * @param connection connection
+ */
+ private void close(ResultSet resultSet,
+ PreparedStatement pstmt,
+ Connection connection) {
+ if (resultSet != null) {
+ try {
+ resultSet.close();
+ } catch (SQLException e) {
+ logger.error("close result set error : {}", e.getMessage(), e);
+ }
+ }
+
+ if (pstmt != null) {
+ try {
+ pstmt.close();
+ } catch (SQLException e) {
+ logger.error("close prepared statement error : {}",
e.getMessage(), e);
+ }
+ }
+
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ logger.error("close connection error : {}", e.getMessage(), e);
+ }
+ }
+ }
+
+ /**
+ * preparedStatement bind
+ *
+ * @param connection connection
+ * @param sqlBinds sqlBinds
+ * @return PreparedStatement
+ * @throws Exception Exception
+ */
+ private PreparedStatement prepareStatementAndBind(Connection connection,
SqlBinds sqlBinds) {
+ // is the timeout set
+ boolean timeoutFlag = taskExecutionContext.getTaskTimeoutStrategy() ==
TaskTimeoutStrategy.FAILED
+ || taskExecutionContext.getTaskTimeoutStrategy() ==
TaskTimeoutStrategy.WARNFAILED;
+ try (PreparedStatement stmt =
connection.prepareStatement(sqlBinds.getSql())) {
+ if (timeoutFlag) {
+ stmt.setQueryTimeout(taskExecutionContext.getTaskTimeout());
+ }
+ Map<Integer, Property> params = sqlBinds.getParamsMap();
+ if (params != null) {
+ for (Map.Entry<Integer, Property> entry : params.entrySet()) {
+ Property prop = entry.getValue();
+ ParameterUtils.setInParameter(entry.getKey(), stmt,
prop.getType(), prop.getValue());
+ }
+ }
+ logger.info("prepare statement replace sql : {} ", stmt);
+ return stmt;
+ } catch (Exception exception) {
+ throw new TaskException("SQL task prepareStatementAndBind error",
exception);
+ }
+
+ }
+
+ /**
+ * regular expressions match the contents between two specified strings
+ *
+ * @param content content
+ * @param rgex rgex
+ * @param sqlParamsMap sql params map
+ * @param paramsPropsMap params props map
+ */
+ public void setSqlParamsMap(String content, String rgex, Map<Integer,
Property> sqlParamsMap, Map<String, Property> paramsPropsMap) {
+ Pattern pattern = Pattern.compile(rgex);
+ Matcher m = pattern.matcher(content);
+ int index = 1;
+ while (m.find()) {
+
+ String paramName = m.group(1);
+ Property prop = paramsPropsMap.get(paramName);
+
+ if (prop == null) {
+ logger.error("setSqlParamsMap: No Property with paramName: {}
is found in paramsPropsMap of task instance"
+ + " with id: {}. So couldn't put Property in
sqlParamsMap.", paramName, taskExecutionContext.getTaskInstanceId());
+ } else {
+ sqlParamsMap.put(index, prop);
+ index++;
+ logger.info("setSqlParamsMap: Property with paramName: {} put
in sqlParamsMap of content {} successfully.", paramName, content);
+ }
+
+ }
+ }
+
+ /**
+ * print replace sql
+ *
+ * @param content content
+ * @param formatSql format sql
+ * @param rgex rgex
+ * @param sqlParamsMap sql params map
+ */
+ private void printReplacedSql(String content, String formatSql, String
rgex, Map<Integer, Property> sqlParamsMap) {
+ //parameter print style
+ logger.info("after replace sql , preparing : {}", formatSql);
+ StringBuilder logPrint = new StringBuilder("replaced sql ,
parameters:");
+ if (sqlParamsMap == null) {
+ logger.info("printReplacedSql: sqlParamsMap is null.");
+ } else {
+ for (int i = 1; i <= sqlParamsMap.size(); i++) {
+
logPrint.append(sqlParamsMap.get(i).getValue()).append("(").append(sqlParamsMap.get(i).getType()).append(")");
+ }
+ }
+ logger.info("Sql Params are {}", logPrint);
+ }
+
+ /**
+ * ready to execute SQL and parameter entity Map
+ *
+ * @return SqlBinds
+ */
+ private SqlBinds getSqlAndSqlParamsMap(String sql) {
+ Map<Integer, Property> sqlParamsMap = new HashMap<>();
+ StringBuilder sqlBuilder = new StringBuilder();
+
+ // combining local and global parameters
+ Map<String, Property> paramsMap =
ParamUtils.convert(taskExecutionContext, getParameters());
+
+ // spell SQL according to the final user-defined variable
+ if (paramsMap == null) {
+ sqlBuilder.append(sql);
+ return new SqlBinds(sqlBuilder.toString(), sqlParamsMap);
+ }
+
+ if (StringUtils.isNotEmpty(sqlParameters.getTitle())) {
+ String title =
ParameterUtils.convertParameterPlaceholders(sqlParameters.getTitle(),
+ ParamUtils.convert(paramsMap));
+ logger.info("SQL title : {}", title);
+ sqlParameters.setTitle(title);
+ }
+
+ //new
+ //replace variable TIME with $[YYYYmmddd...] in sql when history run
job and batch complement job
+ sql = ParameterUtils.replaceScheduleTime(sql,
taskExecutionContext.getScheduleTime());
+ // special characters need to be escaped, ${} needs to be escaped
+ String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
+ setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap);
+ //Replace the original value in sql !{...} ,Does not participate in
precompilation
+ String rgexo = "['\"]*\\!\\{(.*?)\\}['\"]*";
+ sql = replaceOriginalValue(sql, rgexo, paramsMap);
+ // replace the ${} of the SQL statement with the Placeholder
+ String formatSql = sql.replaceAll(rgex, "?");
+ sqlBuilder.append(formatSql);
+
+ // print repalce sql
+ printReplacedSql(sql, formatSql, rgex, sqlParamsMap);
+ return new SqlBinds(sqlBuilder.toString(), sqlParamsMap);
+ }
+
+ private String replaceOriginalValue(String content, String rgex,
Map<String, Property> sqlParamsMap) {
+ Pattern pattern = Pattern.compile(rgex);
+ while (true) {
+ Matcher m = pattern.matcher(content);
+ if (!m.find()) {
+ break;
+ }
+ String paramName = m.group(1);
+ String paramValue = sqlParamsMap.get(paramName).getValue();
+ content = m.replaceFirst(paramValue);
+ }
+ return content;
+ }
+
+ /**
+ * create function list
+ *
+ * @param udfFuncTenantCodeMap key is udf function,value is tenant code
+ * @param logger logger
+ * @return create function list
+ */
+ public static List<String> createFuncs(Map<UdfFuncRequest, String>
udfFuncTenantCodeMap, Logger logger) {
+
+ if (MapUtils.isEmpty(udfFuncTenantCodeMap)) {
+ logger.info("can't find udf function resource");
+ return null;
+ }
+ List<String> funcList = new ArrayList<>();
+ // build temp function sql
+ buildTempFuncSql(funcList, new
ArrayList<>(udfFuncTenantCodeMap.keySet()));
+
+ return funcList;
+ }
+
+ /**
+ * build temp function sql
+ *
+ * @param sqls sql list
+ * @param udfFuncRequests udf function list
+ */
+ private static void buildTempFuncSql(List<String> sqls,
List<UdfFuncRequest> udfFuncRequests) {
+ if (CollectionUtils.isNotEmpty(udfFuncRequests)) {
+ for (UdfFuncRequest udfFuncRequest : udfFuncRequests) {
+ sqls.add(MessageFormat
+ .format(CREATE_FUNCTION_FORMAT,
udfFuncRequest.getFuncName(), udfFuncRequest.getClassName()));
+ }
+ }
+ }
+
+}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskChannel.java
similarity index 88%
copy from
dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
copy to
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskChannel.java
index 01484c4..8da50f2 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskChannel.java
@@ -15,14 +15,13 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.plugin.task.http;
+package org.apache.dolphinscheduler.plugin.task.sql;
import org.apache.dolphinscheduler.spi.task.AbstractTask;
import org.apache.dolphinscheduler.spi.task.TaskChannel;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
-public class HttpTaskChannel implements TaskChannel {
-
+public class SqlTaskChannel implements TaskChannel {
@Override
public void cancelApplication(boolean status) {
@@ -30,7 +29,7 @@ public class HttpTaskChannel implements TaskChannel {
@Override
public AbstractTask createTask(TaskRequest taskRequest) {
- return new HttpTask(taskRequest);
+ return new SqlTask(taskRequest);
}
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskChannelFactory.java
similarity index 65%
copy from
dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
copy to
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskChannelFactory.java
index 01484c4..47091eb 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskChannelFactory.java
@@ -15,22 +15,27 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.plugin.task.http;
+package org.apache.dolphinscheduler.plugin.task.sql;
-import org.apache.dolphinscheduler.spi.task.AbstractTask;
+import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.task.TaskChannel;
-import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
+import org.apache.dolphinscheduler.spi.task.TaskChannelFactory;
-public class HttpTaskChannel implements TaskChannel {
+import java.util.List;
+public class SqlTaskChannelFactory implements TaskChannelFactory {
@Override
- public void cancelApplication(boolean status) {
-
+ public String getName() {
+ return "SQL";
}
@Override
- public AbstractTask createTask(TaskRequest taskRequest) {
- return new HttpTask(taskRequest);
+ public List<PluginParams> getParams() {
+ return null;
}
+ @Override
+ public TaskChannel create() {
+ return new SqlTaskChannel();
+ }
}
diff --git
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskChannel.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskPlugin.java
similarity index 64%
copy from
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskChannel.java
copy to
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskPlugin.java
index 7ab3b17..65a3977 100644
---
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskChannel.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskPlugin.java
@@ -1,4 +1,4 @@
-package org.apache.dolphinscheduler.spi.task;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,12 +15,17 @@ package org.apache.dolphinscheduler.spi.task;/*
* limitations under the License.
*/
-import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
+package org.apache.dolphinscheduler.plugin.task.sql;
-public interface TaskChannel {
+import org.apache.dolphinscheduler.spi.DolphinSchedulerPlugin;
+import org.apache.dolphinscheduler.spi.task.TaskChannelFactory;
- void cancelApplication(boolean status);
+import com.google.common.collect.ImmutableList;
- AbstractTask createTask(TaskRequest taskRequest);
+public class SqlTaskPlugin implements DolphinSchedulerPlugin {
+ @Override
+ public Iterable<TaskChannelFactory> getTaskChannelFactorys() {
+ return ImmutableList.of(new SqlTaskChannelFactory());
+ }
}
diff --git
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskChannel.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlType.java
similarity index 76%
copy from
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskChannel.java
copy to
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlType.java
index 7ab3b17..9db9268 100644
---
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskChannel.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlType.java
@@ -1,4 +1,4 @@
-package org.apache.dolphinscheduler.spi.task;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,12 +15,13 @@ package org.apache.dolphinscheduler.spi.task;/*
* limitations under the License.
*/
-import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
-
-public interface TaskChannel {
-
- void cancelApplication(boolean status);
-
- AbstractTask createTask(TaskRequest taskRequest);
+package org.apache.dolphinscheduler.plugin.task.sql;
+public enum SqlType {
+ /**
+ * sql type
+ * 0 query
+ * 1 NON_QUERY
+ */
+ QUERY, NON_QUERY
}