This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new b90152f  [SPI][Task]add SQL Task (#6237)
b90152f is described below

commit b90152fc2e09ce1567a10ec929f02b24faa72065
Author: Kirs <[email protected]>
AuthorDate: Fri Sep 17 13:10:44 2021 +0800

    [SPI][Task]add SQL Task (#6237)
    
    Add Sql Task And Fix Task Parse Err
---
 .../dolphinscheduler-alert-email/pom.xml           |   6 -
 .../api/service/impl/UdfFuncServiceImpl.java       |   2 +-
 .../api/service/UdfFuncServiceTest.java            |   2 +-
 .../src/main/provisio/dolphinscheduler.xml         |   5 +
 .../builder/TaskExecutionContextBuilder.java       |   6 +-
 .../server/entity/TaskExecutionContext.java        |   3 +
 .../master/consumer/TaskPriorityQueueConsumer.java |  16 +-
 .../server/worker/runner/TaskExecuteThread.java    |  12 +-
 .../server/entity/SQLTaskExecutionContextTest.java | 189 -------
 .../consumer/TaskPriorityQueueConsumerTest.java    |   2 +-
 .../spi/common/UiChannelFactory.java               |   4 +-
 .../spi/task/AbstractParameters.java               |   6 +-
 .../dolphinscheduler/spi/task/AbstractTask.java    |  28 +-
 .../dolphinscheduler/spi/task/TaskAlertInfo.java   |  35 +-
 .../dolphinscheduler/spi/task/TaskChannel.java     |   4 +-
 .../dolphinscheduler/spi/task/TaskConstants.java   |   5 +
 .../task/request}/DataxTaskExecutionContext.java   |   2 +-
 .../spi/task/request}/SQLTaskExecutionContext.java |  12 +-
 .../task/request}/SqoopTaskExecutionContext.java   |   2 +-
 .../spi/task/request/TaskRequest.java              |  39 ++
 .../spi/task/request/UdfFuncRequest.java           | 231 +++++++++
 .../request/UdfType.java}                          |  52 +-
 .../plugin/task/api/AbstractYarnTask.java          |   2 -
 .../plugin/task/http/HttpTaskChannel.java          |   1 -
 .../dolphinscheduler-task-sql/pom.xml              |  12 +-
 .../plugin/task/sql/SqlBinds.java}                 |  27 +-
 .../plugin/task/sql/SqlParameters.java             | 294 +++++++++++
 .../dolphinscheduler/plugin/task/sql/SqlTask.java  | 547 +++++++++++++++++++++
 .../plugin/task/sql/SqlTaskChannel.java}           |   7 +-
 .../plugin/task/sql/SqlTaskChannelFactory.java}    |  21 +-
 .../plugin/task/sql/SqlTaskPlugin.java             |  15 +-
 .../dolphinscheduler/plugin/task/sql/SqlType.java  |  17 +-
 32 files changed, 1305 insertions(+), 301 deletions(-)

diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/pom.xml 
b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/pom.xml
index 079185c..9181e0d 100644
--- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/pom.xml
+++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/pom.xml
@@ -72,12 +72,6 @@
         </dependency>
 
         <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-databind</artifactId>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java
index 62b69fe..f156ead 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java
@@ -204,7 +204,7 @@ public class UdfFuncServiceImpl extends BaseServiceImpl 
implements UdfFuncServic
         // verify udfFuncName is exist
         if (!funcName.equals(udf.getFuncName())) {
             if (checkUdfFuncNameExists(funcName)) {
-                logger.error("UdfFunc {} has exist, can't create again.", 
funcName);
+                logger.error("UdfFuncRequest {} has exist, can't create 
again.", funcName);
                 result.put(Constants.STATUS, Status.UDF_FUNCTION_EXISTS);
                 result.put(Constants.MSG, Status.UDF_FUNCTION_EXISTS.getMsg());
                 return result;
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java
index dddfb6d..23ac7b0 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java
@@ -230,7 +230,7 @@ public class UdfFuncServiceTest {
     }
 
     /**
-     *  get UdfFunc id
+     *  get UdfFuncRequest id
      */
     private UdfFunc getUdfFunc() {
         UdfFunc udfFunc = new UdfFunc();
diff --git a/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml 
b/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml
index 53e3afb..bc2be45 100644
--- a/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml
+++ b/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml
@@ -106,5 +106,10 @@
             <unpack/>
         </artifact>
     </artifactSet>
+    <artifactSet to="lib/plugin/task/sql">
+        <artifact 
id="${project.groupId}:dolphinscheduler-task-sql:zip:${project.version}">
+            <unpack/>
+        </artifact>
+    </artifactSet>
 
 </runtime>
\ No newline at end of file
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
index f37af08..1865d3e 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
@@ -25,11 +25,11 @@ import 
org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
 import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext;
-import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
-import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
+import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext;
+import org.apache.dolphinscheduler.spi.task.request.SqoopTaskExecutionContext;
 
 /**
  *  TaskExecutionContext builder
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
index c32c383..52353be 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
@@ -21,6 +21,9 @@ import 
org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
 import org.apache.dolphinscheduler.common.process.Property;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
+import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext;
+import org.apache.dolphinscheduler.spi.task.request.SqoopTaskExecutionContext;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index bab83db..fc10a29 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -42,10 +42,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.dao.entity.UdfFunc;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
-import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
 import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext;
-import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
-import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
@@ -55,6 +52,10 @@ import 
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.queue.TaskPriority;
 import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
+import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
+import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext;
+import org.apache.dolphinscheduler.spi.task.request.SqoopTaskExecutionContext;
+import org.apache.dolphinscheduler.spi.task.request.UdfFuncRequest;
 
 import org.apache.commons.lang.StringUtils;
 
@@ -345,13 +346,14 @@ public class TaskPriorityQueueConsumer extends Thread {
             }
 
             List<UdfFunc> udfFuncList = 
processService.queryUdfFunListByIds(udfFunIdsArray);
-            Map<UdfFunc, String> udfFuncMap = new HashMap<>();
+            UdfFuncRequest udfFuncRequest;
+            Map<UdfFuncRequest, String> udfFuncRequestMap = new HashMap<>();
             for (UdfFunc udfFunc : udfFuncList) {
+                udfFuncRequest = 
JSONUtils.parseObject(JSONUtils.toJsonString(udfFunc), UdfFuncRequest.class);
                 String tenantCode = 
processService.queryTenantCodeByResName(udfFunc.getResourceName(), 
ResourceType.UDF);
-                udfFuncMap.put(udfFunc, tenantCode);
+                udfFuncRequestMap.put(udfFuncRequest, tenantCode);
             }
-
-            sqlTaskExecutionContext.setUdfFuncTenantCodeMap(udfFuncMap);
+            sqlTaskExecutionContext.setUdfFuncTenantCodeMap(udfFuncRequestMap);
         }
     }
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 5a164e8..b0a5625 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -37,6 +37,7 @@ import 
org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
 import org.apache.dolphinscheduler.service.alert.AlertClientService;
 import org.apache.dolphinscheduler.spi.exception.PluginNotFoundException;
 import org.apache.dolphinscheduler.spi.task.AbstractTask;
+import org.apache.dolphinscheduler.spi.task.TaskAlertInfo;
 import org.apache.dolphinscheduler.spi.task.TaskChannel;
 import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
@@ -166,8 +167,6 @@ public class TaskExecuteThread implements Runnable, Delayed 
{
             if (null == taskChannel) {
                 throw new PluginNotFoundException(String.format("%s Task 
Plugin Not Found,Please Check Config File.", 
taskExecutionContext.getTaskType()));
             }
-
-            //TODO Temporary operation, To be adjusted
             TaskRequest taskRequest = 
JSONUtils.parseObject(JSONUtils.toJsonString(taskExecutionContext), 
TaskRequest.class);
 
             task = taskChannel.createTask(taskRequest);
@@ -179,8 +178,9 @@ public class TaskExecuteThread implements Runnable, Delayed 
{
             this.task.handle();
 
             // task result process
-            this.task.after();
-
+            if (this.task.getNeedAlert()) {
+                sendAlert(this.task.getTaskAlertInfo());
+            }
             responseCommand.setStatus(this.task.getExitStatus().getCode());
             responseCommand.setEndTime(new Date());
             responseCommand.setProcessId(this.task.getProcessId());
@@ -203,6 +203,10 @@ public class TaskExecuteThread implements Runnable, 
Delayed {
         }
     }
 
+    private void sendAlert(TaskAlertInfo taskAlertInfo) {
+        alertClientService.sendAlert(taskAlertInfo.getAlertGroupId(), 
taskAlertInfo.getTitle(), taskAlertInfo.getContent());
+    }
+
     /**
      * when task finish, clear execute path.
      */
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContextTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContextTest.java
deleted file mode 100644
index a2ca6be..0000000
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContextTest.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.entity;
-
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.UdfFunc;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.junit.Test;
-
-public class SQLTaskExecutionContextTest {
-
-  /**
-   * test parse  josn String to TaskExecutionContext
-   */
-  @Test
-  public void testTaskExecutionContext() {
-    String contextJson = "{\n"
-        + "    \"taskInstanceId\":32,\n"
-        + "    \"taskName\":\"test-hive-func\",\n"
-        + "    \"startTime\":\"2020-07-19 16:45:46\",\n"
-        + "    \"taskType\":\"SQL\",\n"
-        + "    \"host\":null,\n"
-        + "    
\"executePath\":\"/tmp/dolphinscheduler/exec/process/1/5/14/32\",\n"
-        + "    \"logPath\":null,\n"
-        + "    
\"taskJson\":\"{\\\"id\\\":\\\"tasks-70999\\\",\\\"name\\\":\\\"test-hive-func\\\""
-        + 
",\\\"desc\\\":null,\\\"type\\\":\\\"SQL\\\",\\\"runFlag\\\":\\\"NORMAL\\\","
-        + "\\\"loc\\\":null,\\\"maxRetryTimes\\\":0,\\\"retryInterval\\\":1,"
-        + "\\\"params\\\":{\\\"type\\\":\\\"HIVE\\\",\\\"datasource\\\":2,"
-        + "\\\"sql\\\":\\\"select mid_id, user_id,"
-        + " version_code, version_name, lang, source, os, area, model, "
-        + "brand, sdk_version, gmail, height_width, app_time, network,"
-        + " lng, lat, dt,\\\\n       Lower(model)\\\\nfrom dws_uv_detail_day 
limit 5;"
-        + 
"\\\",\\\"udfs\\\":\\\"1\\\",\\\"sqlType\\\":\\\"0\\\",\\\"title\\\":\\\""
-        + "test-hive-user-func\\\",\\\"receivers\\\":\\\"[email protected]\\\","
-        + 
"\\\"receiversCc\\\":\\\"\\\",\\\"showType\\\":\\\"TABLE\\\",\\\"localParams\\\":[],"
-        + 
"\\\"connParams\\\":\\\"\\\",\\\"preStatements\\\":[],\\\"postStatements\\\":[]},"
-        + 
"\\\"preTasks\\\":[],\\\"extras\\\":null,\\\"depList\\\":[],\\\"dependence\\\":{},"
-        + 
"\\\"conditionResult\\\":{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]},"
-        + 
"\\\"taskInstancePriority\\\":\\\"MEDIUM\\\",\\\"workerGroup\\\":\\\"default\\\","
-        + 
"\\\"workerGroupId\\\":null,\\\"timeout\\\":{\\\"strategy\\\":\\\"\\\",\\\"interval\\\":null,"
-        + 
"\\\"enable\\\":false},\\\"conditionsTask\\\":false,\\\"forbidden\\\":false,"
-        + 
"\\\"taskTimeoutParameter\\\":{\\\"enable\\\":false,\\\"strategy\\\":null,"
-        + "\\\"interval\\\":0}}\",\n"
-        + "    \"processId\":0,\n"
-        + "    \"appIds\":null,\n"
-        + "    \"processInstanceId\":14,\n"
-        + "    \"scheduleTime\":null,\n"
-        + "    \"globalParams\":null,\n"
-        + "    \"executorId\":2,\n"
-        + "    \"cmdTypeIfComplement\":2,\n"
-        + "    \"tenantCode\":\"sl\",\n"
-        + "    \"queue\":\"sl\",\n"
-        + "    \"processDefineId\":5,\n"
-        + "    \"projectId\":1,\n"
-        + "    \"taskParams\":null,\n"
-        + "    \"envFile\":null,\n"
-        + "    \"definedParams\":null,\n"
-        + "    \"taskAppId\":null,\n"
-        + "    \"taskTimeoutStrategy\":0,\n"
-        + "    \"taskTimeout\":0,\n"
-        + "    \"workerGroup\":\"default\",\n"
-        + "    \"resources\":{\n"
-        + "    },\n"
-        + "    \"sqlTaskExecutionContext\":{\n"
-        + "        \"warningGroupId\":0,\n"
-        + "        \"connectionParams\":\"{\\\"type\\\":null,\\\"address\\\":"
-        + 
"\\\"jdbc:hive2://localhost:10000\\\",\\\"database\\\":\\\"gmall\\\","
-        + "\\\"jdbcUrl\\\":\\\"jdbc:hive2://localhost:10000/gmall\\\","
-        + 
"\\\"user\\\":\\\"sl-test\\\",\\\"password\\\":\\\"123456sl\\\"}\",\n"
-        + "        \"udfFuncTenantCodeMap\": null"
-        + "    },\n"
-        + "    \"dataxTaskExecutionContext\":{\n"
-        + "        \"dataSourceId\":0,\n"
-        + "        \"sourcetype\":0,\n"
-        + "        \"sourceConnectionParams\":null,\n"
-        + "        \"dataTargetId\":0,\n"
-        + "        \"targetType\":0,\n"
-        + "        \"targetConnectionParams\":null\n"
-        + "    },\n"
-        + "    \"dependenceTaskExecutionContext\":null,\n"
-        + "    \"sqoopTaskExecutionContext\":{\n"
-        + "        \"dataSourceId\":0,\n"
-        + "        \"sourcetype\":0,\n"
-        + "        \"sourceConnectionParams\":null,\n"
-        + "        \"dataTargetId\":0,\n"
-        + "        \"targetType\":0,\n"
-        + "        \"targetConnectionParams\":null\n"
-        + "    },\n"
-        + "    \"procedureTaskExecutionContext\":{\n"
-        + "        \"connectionParams\":null\n"
-        + "    }\n"
-        + "}\n";
-
-    TaskExecutionContext taskExecutionContext = 
JSONUtils.parseObject(contextJson, TaskExecutionContext.class);
-
-    assertNotNull(taskExecutionContext);
-  }
-
-  @Test
-  public void testSqlTaskExecutionContext() {
-
-    SQLTaskExecutionContext sqlTaskExecutionContext = new 
SQLTaskExecutionContext();
-    sqlTaskExecutionContext.setWarningGroupId(0);
-
-    Map<UdfFunc, String> udfmap = new HashMap<>();
-
-    UdfFunc udfFunc = new UdfFunc();
-    udfFunc.setArgTypes("1");
-    udfFunc.setId(1);
-    udfFunc.setResourceName("name1");
-    udfmap.put(udfFunc, "map1");
-
-    UdfFunc udfFunc2 = new UdfFunc();
-    udfFunc2.setArgTypes("2");
-    udfFunc2.setId(2);
-    udfFunc2.setResourceName("name2");
-    udfmap.put(udfFunc2, "map2");
-
-    sqlTaskExecutionContext.setUdfFuncTenantCodeMap(udfmap);
-
-    String contextJson = JSONUtils.toJsonString(sqlTaskExecutionContext);
-    SQLTaskExecutionContext parseSqlTask = JSONUtils.parseObject(contextJson, 
SQLTaskExecutionContext.class);
-
-    assertNotNull(parseSqlTask);
-    assertEquals(sqlTaskExecutionContext.getWarningGroupId(), 
parseSqlTask.getWarningGroupId());
-    assertEquals(sqlTaskExecutionContext.getUdfFuncTenantCodeMap().size(), 
parseSqlTask.getUdfFuncTenantCodeMap().size());
-  }
-
-  /**
-   * test the SQLTaskExecutionContext
-   */
-  @Test
-  public void testSqlTaskExecutionContextParse() {
-
-    // SQLTaskExecutionContext.udfFuncTenantCodeMap is null
-    String contextJson = "{\n"
-        + "    \"warningGroupId\":0,\n"
-        + "    \"connectionParams\":null,\n"
-        + "    \"udfFuncTenantCodeMap\":null"
-        + "}\n}";
-    SQLTaskExecutionContext parseSqlTask = JSONUtils.parseObject(contextJson, 
SQLTaskExecutionContext.class);
-
-    assertNotNull(parseSqlTask);
-    assertEquals(0,parseSqlTask.getWarningGroupId());
-    assertNull(parseSqlTask.getUdfFuncTenantCodeMap());
-
-    // SQLTaskExecutionContext.udfFuncTenantCodeMap is not null
-    contextJson = "{\"warningGroupId\":0,"
-        + "\"connectionParams\":null,"
-        + "\"udfFuncTenantCodeMap\":{\""
-        + "{\\\"id\\\":2,\\\"userId\\\":0,"
-        + 
"\\\"funcName\\\":null,\\\"className\\\":null,\\\"argTypes\\\":\\\"2\\\",\\\"database\\\":null,"
-        + 
"\\\"description\\\":null,\\\"resourceId\\\":0,\\\"resourceName\\\":\\\"name2\\\",\\\"type\\\":null,"
-        + "\\\"createTime\\\":null,\\\"updateTime\\\":null}\":\"map2\","
-        + "\"{\\\"id\\\":1,\\\"userId\\\":0,\\\"funcName\\\":null,"
-        + "\\\"className\\\":null,\\\"argTypes\\\":\\\"1\\\","
-        + "\\\"database\\\":null,\\\"description\\\":null,"
-        + "\\\"resourceId\\\":0,\\\"resourceName\\\":\\\"name1\\\","
-        + 
"\\\"type\\\":null,\\\"createTime\\\":null,\\\"updateTime\\\":null}\":\"map1\"}}\n";
-
-    SQLTaskExecutionContext parseSqlTask2 = JSONUtils.parseObject(contextJson, 
SQLTaskExecutionContext.class);
-
-    assertNotNull(parseSqlTask2);
-    assertEquals(0,parseSqlTask2.getWarningGroupId());
-    assertEquals(2,parseSqlTask2.getUdfFuncTenantCodeMap().size());
-  }
-
-}
\ No newline at end of file
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
index 2d4a1fe..976e058 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
@@ -32,12 +32,12 @@ import org.apache.dolphinscheduler.dao.entity.Resource;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
-import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.queue.TaskPriority;
 import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
+import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
 
 import java.util.ArrayList;
 import java.util.Date;
diff --git 
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/common/UiChannelFactory.java
 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/common/UiChannelFactory.java
index d0e822b..8b89215 100644
--- 
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/common/UiChannelFactory.java
+++ 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/common/UiChannelFactory.java
@@ -1,4 +1,4 @@
-package org.apache.dolphinscheduler.spi.common;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,6 +15,8 @@ package org.apache.dolphinscheduler.spi.common;/*
  * limitations under the License.
  */
 
+package org.apache.dolphinscheduler.spi.common;
+
 import org.apache.dolphinscheduler.spi.params.base.PluginParams;
 
 import java.util.List;
diff --git 
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractParameters.java
 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractParameters.java
index adb9136..55f5203 100644
--- 
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractParameters.java
+++ 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractParameters.java
@@ -1,4 +1,4 @@
-package org.apache.dolphinscheduler.spi.task;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,6 +15,8 @@ package org.apache.dolphinscheduler.spi.task;/*
  * limitations under the License.
  */
 
+package org.apache.dolphinscheduler.spi.task;
+
 import org.apache.dolphinscheduler.spi.utils.CollectionUtils;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
@@ -37,7 +39,7 @@ public abstract class AbstractParameters implements 
IParameters {
     /**
      * local parameters
      */
-    private  List<Property> localParams;
+    protected   List<Property> localParams;
 
     /**
      * var pool
diff --git 
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractTask.java
 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractTask.java
index b9c2632..5a6cfd2 100644
--- 
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractTask.java
+++ 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractTask.java
@@ -60,6 +60,10 @@ public abstract class AbstractTask {
      */
     protected volatile int exitStatusCode = -1;
 
+    protected boolean needAlert = false;
+
+    protected TaskAlertInfo taskAlertInfo;
+
     /**
      * constructor
      *
@@ -79,7 +83,6 @@ public abstract class AbstractTask {
         return null;
     }
 
-
     /**
      * task handle
      *
@@ -142,6 +145,22 @@ public abstract class AbstractTask {
         this.resultString = resultString;
     }
 
+    public boolean getNeedAlert() {
+        return needAlert;
+    }
+
+    public void setNeedAlert(boolean needAlert) {
+        this.needAlert = needAlert;
+    }
+
+    public TaskAlertInfo getTaskAlertInfo() {
+        return taskAlertInfo;
+    }
+
+    public void setTaskAlertInfo(TaskAlertInfo taskAlertInfo) {
+        this.taskAlertInfo = taskAlertInfo;
+    }
+
     /**
      * get task parameters
      *
@@ -150,13 +169,6 @@ public abstract class AbstractTask {
     public abstract AbstractParameters getParameters();
 
     /**
-     * result processing maybe
-     */
-    public void after() {
-
-    }
-
-    /**
      * get exit status according to exitCode
      *
      * @return exit status
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskAlertInfo.java
similarity index 57%
copy from 
dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
copy to 
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskAlertInfo.java
index 01484c4..75faad5 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
+++ 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskAlertInfo.java
@@ -15,22 +15,37 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.plugin.task.http;
+package org.apache.dolphinscheduler.spi.task;
 
-import org.apache.dolphinscheduler.spi.task.AbstractTask;
-import org.apache.dolphinscheduler.spi.task.TaskChannel;
-import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
+public class TaskAlertInfo {
 
-public class HttpTaskChannel implements TaskChannel {
+    private String title;
 
-    @Override
-    public void cancelApplication(boolean status) {
+    private String content;
 
+    private Integer alertGroupId;
+
+    public String getTitle() {
+        return title;
+    }
+
+    public void setTitle(String title) {
+        this.title = title;
+    }
+
+    public String getContent() {
+        return content;
+    }
+
+    public void setContent(String content) {
+        this.content = content;
     }
 
-    @Override
-    public AbstractTask createTask(TaskRequest taskRequest) {
-        return new HttpTask(taskRequest);
+    public Integer getAlertGroupId() {
+        return alertGroupId;
     }
 
+    public void setAlertGroupId(Integer alertGroupId) {
+        this.alertGroupId = alertGroupId;
+    }
 }
diff --git 
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskChannel.java
 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskChannel.java
index 7ab3b17..681970f 100644
--- 
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskChannel.java
+++ 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskChannel.java
@@ -1,4 +1,4 @@
-package org.apache.dolphinscheduler.spi.task;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,6 +15,8 @@ package org.apache.dolphinscheduler.spi.task;/*
  * limitations under the License.
  */
 
+package org.apache.dolphinscheduler.spi.task;
+
 import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
 
 public interface TaskChannel {
diff --git 
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskConstants.java
 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskConstants.java
index b7d5cbb..5b7ae95 100644
--- 
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskConstants.java
+++ 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskConstants.java
@@ -223,6 +223,11 @@ public class TaskConstants {
     public static final int LOG_QUERY_LIMIT = 4096;
 
     /**
+     * default display rows
+     */
+    public static final int DEFAULT_DISPLAY_ROWS = 10;
+
+    /**
      * jar
      */
     public static final String JAR = "jar";
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/DataxTaskExecutionContext.java
 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/DataxTaskExecutionContext.java
similarity index 98%
rename from 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/DataxTaskExecutionContext.java
rename to 
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/DataxTaskExecutionContext.java
index dd8d646..276923d 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/DataxTaskExecutionContext.java
+++ 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/DataxTaskExecutionContext.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.entity;
+package org.apache.dolphinscheduler.spi.task.request;
 
 import java.io.Serializable;
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java
 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SQLTaskExecutionContext.java
similarity index 84%
rename from 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java
rename to 
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SQLTaskExecutionContext.java
index fa670e2..b712b50 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java
+++ 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SQLTaskExecutionContext.java
@@ -15,10 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.entity;
+package org.apache.dolphinscheduler.spi.task.request;
+
+import org.apache.dolphinscheduler.spi.task.UdfFuncBean.UdfFuncDeserializer;
 
-import org.apache.dolphinscheduler.dao.entity.UdfFunc;
-import org.apache.dolphinscheduler.dao.entity.UdfFunc.UdfFuncDeserializer;
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import java.io.Serializable;
 import java.util.Map;
@@ -42,7 +42,7 @@ public class SQLTaskExecutionContext implements Serializable {
      * udf function tenant code map
      */
     @JsonDeserialize(keyUsing = UdfFuncDeserializer.class)
-    private Map<UdfFunc,String> udfFuncTenantCodeMap;
+    private Map<UdfFuncRequest,String> udfFuncTenantCodeMap;
 
 
     public int getWarningGroupId() {
@@ -53,11 +53,11 @@ public class SQLTaskExecutionContext implements 
Serializable {
         this.warningGroupId = warningGroupId;
     }
 
-    public Map<UdfFunc, String> getUdfFuncTenantCodeMap() {
+    public Map<UdfFuncRequest, String> getUdfFuncTenantCodeMap() {
         return udfFuncTenantCodeMap;
     }
 
-    public void setUdfFuncTenantCodeMap(Map<UdfFunc, String> 
udfFuncTenantCodeMap) {
+    public void setUdfFuncTenantCodeMap(Map<UdfFuncRequest, String> 
udfFuncTenantCodeMap) {
         this.udfFuncTenantCodeMap = udfFuncTenantCodeMap;
     }
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SqoopTaskExecutionContext.java
 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SqoopTaskExecutionContext.java
similarity index 98%
rename from 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SqoopTaskExecutionContext.java
rename to 
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SqoopTaskExecutionContext.java
index c74414b..720892c 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SqoopTaskExecutionContext.java
+++ 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SqoopTaskExecutionContext.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.entity;
+package org.apache.dolphinscheduler.spi.task.request;
 
 import java.io.Serializable;
 
diff --git 
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/TaskRequest.java
 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/TaskRequest.java
index 0de96e1..3e131f2 100644
--- 
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/TaskRequest.java
+++ 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/TaskRequest.java
@@ -188,6 +188,22 @@ public class TaskRequest {
 
     private Map<String, Property> paramsMap;
 
+
+    /**
+     * sql TaskExecutionContext
+     */
+    private SQLTaskExecutionContext sqlTaskExecutionContext;
+
+    /**
+     * datax TaskExecutionContext
+     */
+    private DataxTaskExecutionContext dataxTaskExecutionContext;
+
+    /**
+     * sqoop TaskExecutionContext
+     */
+    private SqoopTaskExecutionContext sqoopTaskExecutionContext;
+
     public Map<String, String> getResources() {
         return resources;
     }
@@ -428,4 +444,27 @@ public class TaskRequest {
         this.delayTime = delayTime;
     }
 
+    public SQLTaskExecutionContext getSqlTaskExecutionContext() {
+        return sqlTaskExecutionContext;
+    }
+
+    public void setSqlTaskExecutionContext(SQLTaskExecutionContext 
sqlTaskExecutionContext) {
+        this.sqlTaskExecutionContext = sqlTaskExecutionContext;
+    }
+
+    public DataxTaskExecutionContext getDataxTaskExecutionContext() {
+        return dataxTaskExecutionContext;
+    }
+
+    public void setDataxTaskExecutionContext(DataxTaskExecutionContext 
dataxTaskExecutionContext) {
+        this.dataxTaskExecutionContext = dataxTaskExecutionContext;
+    }
+
+    public SqoopTaskExecutionContext getSqoopTaskExecutionContext() {
+        return sqoopTaskExecutionContext;
+    }
+
+    public void setSqoopTaskExecutionContext(SqoopTaskExecutionContext 
sqoopTaskExecutionContext) {
+        this.sqoopTaskExecutionContext = sqoopTaskExecutionContext;
+    }
 }
diff --git 
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/UdfFuncRequest.java
 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/UdfFuncRequest.java
new file mode 100644
index 0000000..cbbd9e2
--- /dev/null
+++ 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/UdfFuncRequest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.spi.task.request;
+
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.io.IOException;
+import java.util.Date;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.KeyDeserializer;
+
+/**
+ * udf function
+ */
+public class UdfFuncRequest {
+    /**
+     * id
+     */
+    private int id;
+    /**
+     * user id
+     */
+    private int userId;
+
+    /**
+     * udf function name
+     */
+    private String funcName;
+
+    /**
+     * udf class name
+     */
+    private String className;
+
+    /**
+     * udf argument types
+     */
+    private String argTypes;
+
+    /**
+     * udf data base
+     */
+    private String database;
+
+    /**
+     * udf description
+     */
+    private String description;
+
+    /**
+     * resource id
+     */
+    private int resourceId;
+
+    /**
+     * resource name
+     */
+    private String resourceName;
+
+    /**
+     * udf function type: hive / spark
+     */
+    private UdfType type;
+
+    /**
+     * create time
+     */
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
+    private Date createTime;
+
+    /**
+     * update time
+     */
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
+    private Date updateTime;
+
+    public int getId() {
+        return id;
+    }
+
+    public void setId(int id) {
+        this.id = id;
+    }
+
+    public int getUserId() {
+        return userId;
+    }
+
+    public void setUserId(int userId) {
+        this.userId = userId;
+    }
+
+    public String getFuncName() {
+        return funcName;
+    }
+
+    public void setFuncName(String funcName) {
+        this.funcName = funcName;
+    }
+
+    public String getClassName() {
+        return className;
+    }
+
+    public void setClassName(String className) {
+        this.className = className;
+    }
+
+    public String getArgTypes() {
+        return argTypes;
+    }
+
+    public void setArgTypes(String argTypes) {
+        this.argTypes = argTypes;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public void setDatabase(String database) {
+        this.database = database;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public void setDescription(String description) {
+        this.description = description;
+    }
+
+    public int getResourceId() {
+        return resourceId;
+    }
+
+    public void setResourceId(int resourceId) {
+        this.resourceId = resourceId;
+    }
+
+    public String getResourceName() {
+        return resourceName;
+    }
+
+    public void setResourceName(String resourceName) {
+        this.resourceName = resourceName;
+    }
+
+    public UdfType getType() {
+        return type;
+    }
+
+    public void setType(UdfType type) {
+        this.type = type;
+    }
+
+    public Date getCreateTime() {
+        return createTime;
+    }
+
+    public void setCreateTime(Date createTime) {
+        this.createTime = createTime;
+    }
+
+    public Date getUpdateTime() {
+        return updateTime;
+    }
+
+    public void setUpdateTime(Date updateTime) {
+        this.updateTime = updateTime;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        UdfFuncRequest udfFuncRequest = (UdfFuncRequest) o;
+
+        if (id != udfFuncRequest.id) {
+            return false;
+        }
+        return !(funcName != null ? !funcName.equals(udfFuncRequest.funcName) 
: udfFuncRequest.funcName != null);
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = id;
+        result = 31 * result + (funcName != null ? funcName.hashCode() : 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return JSONUtils.toJsonString(this);
+    }
+
+    public static class UdfFuncDeserializer extends KeyDeserializer {
+
+        @Override
+        public Object deserializeKey(String key, DeserializationContext ctxt) 
throws IOException {
+            if (StringUtils.isBlank(key)) {
+                return null;
+            }
+            return JSONUtils.parseObject(key, UdfFuncRequest.class);
+        }
+    }
+}
diff --git 
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/common/UiChannelFactory.java
 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/UdfType.java
similarity index 54%
copy from 
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/common/UiChannelFactory.java
copy to 
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/UdfType.java
index d0e822b..2b31087 100644
--- 
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/common/UiChannelFactory.java
+++ 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/UdfType.java
@@ -1,4 +1,4 @@
-package org.apache.dolphinscheduler.spi.common;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,27 +15,41 @@ package org.apache.dolphinscheduler.spi.common;/*
  * limitations under the License.
  */
 
-import org.apache.dolphinscheduler.spi.params.base.PluginParams;
-
-import java.util.List;
-
-public interface UiChannelFactory {
+package org.apache.dolphinscheduler.spi.task.request;
 
+/**
+ * UDF type
+ */
+public enum UdfType {
     /**
-     * plugin name
-     * Must be UNIQUE .
-     * This alert plugin name eg: email , message ...
-     * Name can often be displayed on the page ui eg : email , message , MR , 
spark , hive ...
-     *
-     * @return this alert plugin name
+     * 0 hive; 1 spark
      */
-    String getName();
+    HIVE(0, "hive"),
+    SPARK(1, "spark");
 
-    /**
-     * Returns the configurable parameters that this plugin needs to display 
on the web ui
-     *
-     * @return this alert plugin params
-     */
-    List<PluginParams> getParams();
+    UdfType(int code, String descp) {
+        this.code = code;
+        this.descp = descp;
+    }
+
+    private final int code;
+    private final String descp;
+
+    public int getCode() {
+        return code;
+    }
+
+    public String getDescp() {
+        return descp;
+    }
+
+    public static UdfType of(int type) {
+        for (UdfType ut : values()) {
+            if (ut.getCode() == type) {
+                return ut;
+            }
+        }
+        throw new IllegalArgumentException("invalid type : " + type);
+    }
 
 }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
index d5e3e0d..d2c7c55 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
@@ -65,8 +65,6 @@ public abstract class AbstractYarnTask extends 
AbstractTaskExecutor {
     public void cancelApplication(boolean status) throws Exception {
         cancel = true;
         // cancel process
-
-        //todo 交给上层处理
         shellCommandExecutor.cancelApplication();
         //  TaskInstance taskInstance = 
processService.findTaskInstanceById(taskExecutionContext.getTaskInstanceId());
         // if (status && taskInstance != null){
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
index 01484c4..6f78063 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
@@ -32,5 +32,4 @@ public class HttpTaskChannel implements TaskChannel {
     public AbstractTask createTask(TaskRequest taskRequest) {
         return new HttpTask(taskRequest);
     }
-
 }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/pom.xml 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/pom.xml
index 6f56744..09ddf41 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/pom.xml
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/pom.xml
@@ -26,6 +26,7 @@
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>dolphinscheduler-task-sql</artifactId>
+    <packaging>dolphinscheduler-plugin</packaging>
 
     <dependencies>
         <dependency>
@@ -39,8 +40,15 @@
             <version>${project.version}</version>
         </dependency>
 
-    </dependencies>
-
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+            <scope>provided</scope>
+        </dependency>
 
+    </dependencies>
 
+    <build>
+        <finalName>dolphinscheduler-task-sql-${project.version}</finalName>
+    </build>
 </project>
\ No newline at end of file
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlBinds.java
similarity index 58%
copy from 
dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
copy to 
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlBinds.java
index 01484c4..a90c8db 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlBinds.java
@@ -15,22 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.plugin.task.http;
+package org.apache.dolphinscheduler.plugin.task.sql;
 
-import org.apache.dolphinscheduler.spi.task.AbstractTask;
-import org.apache.dolphinscheduler.spi.task.TaskChannel;
-import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
+import org.apache.dolphinscheduler.spi.task.Property;
 
-public class HttpTaskChannel implements TaskChannel {
+import java.util.Map;
 
-    @Override
-    public void cancelApplication(boolean status) {
+/**
+ * Used to contains both prepared sql string and its to-be-bind parameters
+ */
+public class SqlBinds {
+    private final String sql;
+    private final Map<Integer, Property> paramsMap;
 
+    public SqlBinds(String sql, Map<Integer, Property> paramsMap) {
+        this.sql = sql;
+        this.paramsMap = paramsMap;
     }
 
-    @Override
-    public AbstractTask createTask(TaskRequest taskRequest) {
-        return new HttpTask(taskRequest);
+    public String getSql() {
+        return sql;
     }
 
+    public Map<Integer, Property> getParamsMap() {
+        return paramsMap;
+    }
 }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlParameters.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlParameters.java
new file mode 100644
index 0000000..487c5bd
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlParameters.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.sql;
+
+import org.apache.dolphinscheduler.spi.enums.DataType;
+import org.apache.dolphinscheduler.spi.task.AbstractParameters;
+import org.apache.dolphinscheduler.spi.task.Property;
+import org.apache.dolphinscheduler.spi.task.ResourceInfo;
+import org.apache.dolphinscheduler.spi.utils.CollectionUtils;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Sql/Hql parameter
+ */
+public class SqlParameters extends AbstractParameters {
+    /**
+     * data source type,eg  MYSQL, POSTGRES, HIVE ...
+     */
+    private String type;
+
+    /**
+     * datasource id
+     */
+    private int datasource;
+
+    /**
+     * sql
+     */
+    private String sql;
+
+    /**
+     * sql type
+     * 0 query
+     * 1 NON_QUERY
+     */
+    private int sqlType;
+
+    /**
+     * send email
+     */
+    private Boolean sendEmail;
+
+    /**
+     * display rows
+     */
+    private int displayRows;
+
+    /**
+     * udf list
+     */
+    private String udfs;
+    /**
+     * show type
+     * 0 TABLE
+     * 1 TEXT
+     * 2 attachment
+     * 3 TABLE+attachment
+     */
+    private String showType;
+    /**
+     * SQL connection parameters
+     */
+    private String connParams;
+    /**
+     * Pre Statements
+     */
+    private List<String> preStatements;
+    /**
+     * Post Statements
+     */
+    private List<String> postStatements;
+
+    /**
+     * groupId
+     */
+    private int groupId;
+    /**
+     * title
+     */
+    private String title;
+
+    private int limit;
+
+    public int getLimit() {
+        return limit;
+    }
+
+    public void setLimit(int limit) {
+        this.limit = limit;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public int getDatasource() {
+        return datasource;
+    }
+
+    public void setDatasource(int datasource) {
+        this.datasource = datasource;
+    }
+
+    public String getSql() {
+        return sql;
+    }
+
+    public void setSql(String sql) {
+        this.sql = sql;
+    }
+
+    public String getUdfs() {
+        return udfs;
+    }
+
+    public void setUdfs(String udfs) {
+        this.udfs = udfs;
+    }
+
+    public int getSqlType() {
+        return sqlType;
+    }
+
+    public void setSqlType(int sqlType) {
+        this.sqlType = sqlType;
+    }
+
+    public Boolean getSendEmail() {
+        return sendEmail;
+    }
+
+    public void setSendEmail(Boolean sendEmail) {
+        this.sendEmail = sendEmail;
+    }
+
+    public int getDisplayRows() {
+        return displayRows;
+    }
+
+    public void setDisplayRows(int displayRows) {
+        this.displayRows = displayRows;
+    }
+
+    public String getShowType() {
+        return showType;
+    }
+
+    public void setShowType(String showType) {
+        this.showType = showType;
+    }
+
+    public String getConnParams() {
+        return connParams;
+    }
+
+    public void setConnParams(String connParams) {
+        this.connParams = connParams;
+    }
+
+    public String getTitle() {
+        return title;
+    }
+
+    public void setTitle(String title) {
+        this.title = title;
+    }
+
+    public List<String> getPreStatements() {
+        return preStatements;
+    }
+
+    public void setPreStatements(List<String> preStatements) {
+        this.preStatements = preStatements;
+    }
+
+    public List<String> getPostStatements() {
+        return postStatements;
+    }
+
+    public void setPostStatements(List<String> postStatements) {
+        this.postStatements = postStatements;
+    }
+
+    public int getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(int groupId) {
+        this.groupId = groupId;
+    }
+
+    @Override
+    public boolean checkParameters() {
+        return datasource != 0 && StringUtils.isNotEmpty(type) && 
StringUtils.isNotEmpty(sql);
+    }
+
+    @Override
+    public List<ResourceInfo> getResourceFilesList() {
+        return new ArrayList<>();
+    }
+
+    @Override
+    public void dealOutParam(String result) {
+        if (CollectionUtils.isEmpty(localParams)) {
+            return;
+        }
+        List<Property> outProperty = getOutProperty(localParams);
+        if (CollectionUtils.isEmpty(outProperty)) {
+            return;
+        }
+        if (StringUtils.isEmpty(result)) {
+            varPool.addAll(outProperty);
+            return;
+        }
+        List<Map<String, String>> sqlResult = getListMapByString(result);
+        if (CollectionUtils.isEmpty(sqlResult)) {
+            return;
+        }
+        //if sql return more than one line
+        if (sqlResult.size() > 1) {
+            Map<String, List<String>> sqlResultFormat = new HashMap<>();
+            //init sqlResultFormat
+            Set<String> keySet = sqlResult.get(0).keySet();
+            for (String key : keySet) {
+                sqlResultFormat.put(key, new ArrayList<>());
+            }
+            for (Map<String, String> info : sqlResult) {
+                for (String key : info.keySet()) {
+                    
sqlResultFormat.get(key).add(String.valueOf(info.get(key)));
+                }
+            }
+            for (Property info : outProperty) {
+                if (info.getType() == DataType.LIST) {
+                    
info.setValue(JSONUtils.toJsonString(sqlResultFormat.get(info.getProp())));
+                    varPool.add(info);
+                }
+            }
+        } else {
+            //result only one line
+            Map<String, String> firstRow = sqlResult.get(0);
+            for (Property info : outProperty) {
+                info.setValue(String.valueOf(firstRow.get(info.getProp())));
+                varPool.add(info);
+            }
+        }
+
+    }
+
+    @Override
+    public String toString() {
+        return "SqlParameters{"
+                + "type='" + type + '\''
+                + ", datasource=" + datasource
+                + ", sql='" + sql + '\''
+                + ", sqlType=" + sqlType
+                + ", sendEmail=" + sendEmail
+                + ", displayRows=" + displayRows
+                + ", limit=" + limit
+                + ", udfs='" + udfs + '\''
+                + ", showType='" + showType + '\''
+                + ", connParams='" + connParams + '\''
+                + ", groupId='" + groupId + '\''
+                + ", title='" + title + '\''
+                + ", preStatements=" + preStatements
+                + ", postStatements=" + postStatements
+                + '}';
+    }
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
new file mode 100644
index 0000000..6fd4397
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.sql;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.datasource.BaseConnectionParam;
+import org.apache.dolphinscheduler.plugin.task.datasource.DatasourceUtil;
+import org.apache.dolphinscheduler.plugin.task.util.MapUtils;
+import org.apache.dolphinscheduler.spi.enums.DbType;
+import org.apache.dolphinscheduler.spi.enums.TaskTimeoutStrategy;
+import org.apache.dolphinscheduler.spi.task.AbstractParameters;
+import org.apache.dolphinscheduler.spi.task.Direct;
+import org.apache.dolphinscheduler.spi.task.Property;
+import org.apache.dolphinscheduler.spi.task.TaskAlertInfo;
+import org.apache.dolphinscheduler.spi.task.TaskConstants;
+import org.apache.dolphinscheduler.spi.task.paramparser.ParamUtils;
+import org.apache.dolphinscheduler.spi.task.paramparser.ParameterUtils;
+import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext;
+import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
+import org.apache.dolphinscheduler.spi.task.request.UdfFuncRequest;
+import org.apache.dolphinscheduler.spi.utils.CollectionUtils;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class SqlTask extends AbstractYarnTask {
+
+    /**
+     * taskExecutionContext
+     */
+    private TaskRequest taskExecutionContext;
+
+    /**
+     * sql parameters
+     */
+    private SqlParameters sqlParameters;
+
+    /**
+     * base datasource
+     */
+    private BaseConnectionParam baseConnectionParam;
+
+    /**
+     * create function format
+     */
+    private static final String CREATE_FUNCTION_FORMAT = "create temporary 
function {0} as ''{1}''";
+
+    /**
+     * Abstract Yarn Task
+     *
+     * @param taskRequest taskRequest
+     */
+    public SqlTask(TaskRequest taskRequest) {
+        super(taskRequest);
+        this.taskExecutionContext = taskRequest;
+        this.sqlParameters = 
JSONUtils.parseObject(taskExecutionContext.getTaskParams(), 
SqlParameters.class);
+
+        assert sqlParameters != null;
+        if (!sqlParameters.checkParameters()) {
+            throw new RuntimeException("sql task params is not valid");
+        }
+    }
+
+    @Override
+    public AbstractParameters getParameters() {
+        return null;
+    }
+
+    @Override
+    protected String buildCommand() {
+        return null;
+    }
+
+    @Override
+    protected void setMainJarName() {
+
+    }
+
+    @Override
+    public void handle() throws Exception {
+        // set the name of the current thread
+        String threadLoggerInfoName = 
String.format(TaskConstants.TASK_LOG_INFO_FORMAT, 
taskExecutionContext.getTaskAppId());
+        Thread.currentThread().setName(threadLoggerInfoName);
+
+        logger.info("Full sql parameters: {}", sqlParameters);
+        logger.info("sql type : {}, datasource : {}, sql : {} , localParams : 
{},udfs : {},showType : {},connParams : {},varPool : {} ,query max result limit 
 {}",
+                sqlParameters.getType(),
+                sqlParameters.getDatasource(),
+                sqlParameters.getSql(),
+                sqlParameters.getLocalParams(),
+                sqlParameters.getUdfs(),
+                sqlParameters.getShowType(),
+                sqlParameters.getConnParams(),
+                sqlParameters.getVarPool(),
+                sqlParameters.getLimit());
+        try {
+            SQLTaskExecutionContext sqlTaskExecutionContext = 
taskExecutionContext.getSqlTaskExecutionContext();
+
+            // get datasource
+            baseConnectionParam = (BaseConnectionParam) 
DatasourceUtil.buildConnectionParams(
+                    DbType.valueOf(sqlParameters.getType()),
+                    sqlTaskExecutionContext.getConnectionParams());
+
+            // ready to execute SQL and parameter entity Map
+            SqlBinds mainSqlBinds = 
getSqlAndSqlParamsMap(sqlParameters.getSql());
+            List<SqlBinds> preStatementSqlBinds = 
Optional.ofNullable(sqlParameters.getPreStatements())
+                    .orElse(new ArrayList<>())
+                    .stream()
+                    .map(this::getSqlAndSqlParamsMap)
+                    .collect(Collectors.toList());
+            List<SqlBinds> postStatementSqlBinds = 
Optional.ofNullable(sqlParameters.getPostStatements())
+                    .orElse(new ArrayList<>())
+                    .stream()
+                    .map(this::getSqlAndSqlParamsMap)
+                    .collect(Collectors.toList());
+
+            List<String> createFuncs = 
createFuncs(sqlTaskExecutionContext.getUdfFuncTenantCodeMap(),
+                    logger);
+
+            // execute sql task
+            executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, 
postStatementSqlBinds, createFuncs);
+
+            setExitStatusCode(TaskConstants.EXIT_CODE_SUCCESS);
+
+        } catch (Exception e) {
+            setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+            logger.error("sql task error: {}", e.toString());
+            throw e;
+        }
+    }
+
+    /**
+     * execute function and sql
+     *
+     * @param mainSqlBinds main sql binds
+     * @param preStatementsBinds pre statements binds
+     * @param postStatementsBinds post statements binds
+     * @param createFuncs create functions
+     */
+    public void executeFuncAndSql(SqlBinds mainSqlBinds,
+                                  List<SqlBinds> preStatementsBinds,
+                                  List<SqlBinds> postStatementsBinds,
+                                  List<String> createFuncs) throws Exception {
+        Connection connection = null;
+        PreparedStatement stmt = null;
+        ResultSet resultSet = null;
+        try {
+
+            // create connection
+            connection = 
DatasourceUtil.getConnection(DbType.valueOf(sqlParameters.getType()), 
baseConnectionParam);
+            // create temp function
+            if (CollectionUtils.isNotEmpty(createFuncs)) {
+                createTempFunction(connection, createFuncs);
+            }
+
+            // pre sql
+            preSql(connection, preStatementsBinds);
+            stmt = prepareStatementAndBind(connection, mainSqlBinds);
+
+            String result = null;
+            // decide whether to executeQuery or executeUpdate based on sqlType
+            if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) {
+                // query statements need to be convert to JsonArray and 
inserted into Alert to send
+                resultSet = stmt.executeQuery();
+                result = resultProcess(resultSet);
+
+            } else if (sqlParameters.getSqlType() == 
SqlType.NON_QUERY.ordinal()) {
+                // non query statement
+                String updateResult = String.valueOf(stmt.executeUpdate());
+                result = setNonQuerySqlReturn(updateResult, 
sqlParameters.getLocalParams());
+            }
+            //deal out params
+            sqlParameters.dealOutParam(result);
+            postSql(connection, postStatementsBinds);
+        } catch (Exception e) {
+            logger.error("execute sql error: {}", e.getMessage());
+            throw e;
+        } finally {
+            close(resultSet, stmt, connection);
+        }
+    }
+
+    private String setNonQuerySqlReturn(String updateResult, List<Property> 
properties) {
+        String result = null;
+        for (Property info : properties) {
+            if (Direct.OUT == info.getDirect()) {
+                List<Map<String, String>> updateRL = new ArrayList<>();
+                Map<String, String> updateRM = new HashMap<>();
+                updateRM.put(info.getProp(), updateResult);
+                updateRL.add(updateRM);
+                result = JSONUtils.toJsonString(updateRL);
+                break;
+            }
+        }
+        return result;
+    }
+
+    /**
+     * result process
+     *
+     * @param resultSet resultSet
+     * @throws Exception Exception
+     */
+    private String resultProcess(ResultSet resultSet) throws Exception {
+        ArrayNode resultJSONArray = JSONUtils.createArrayNode();
+        if (resultSet != null) {
+            ResultSetMetaData md = resultSet.getMetaData();
+            int num = md.getColumnCount();
+
+            int rowCount = 0;
+
+            while (rowCount < sqlParameters.getLimit() && resultSet.next()) {
+                ObjectNode mapOfColValues = JSONUtils.createObjectNode();
+                for (int i = 1; i <= num; i++) {
+                    mapOfColValues.set(md.getColumnLabel(i), 
JSONUtils.toJsonNode(resultSet.getObject(i)));
+                }
+                resultJSONArray.add(mapOfColValues);
+                rowCount++;
+            }
+
+            int displayRows = sqlParameters.getDisplayRows() > 0 ? 
sqlParameters.getDisplayRows() : TaskConstants.DEFAULT_DISPLAY_ROWS;
+            displayRows = Math.min(displayRows, resultJSONArray.size());
+            logger.info("display sql result {} rows as follows:", displayRows);
+            for (int i = 0; i < displayRows; i++) {
+                String row = JSONUtils.toJsonString(resultJSONArray.get(i));
+                logger.info("row {} : {}", i + 1, row);
+            }
+        }
+        String result = JSONUtils.toJsonString(resultJSONArray);
+        if (sqlParameters.getSendEmail() == null || 
sqlParameters.getSendEmail()) {
+            sendAttachment(sqlParameters.getGroupId(), 
StringUtils.isNotEmpty(sqlParameters.getTitle())
+                    ? sqlParameters.getTitle()
+                    : taskExecutionContext.getTaskName() + " query result 
sets", result);
+        }
+        logger.debug("execute sql result : {}", result);
+        return result;
+    }
+
+    /**
+     * send alert as an attachment
+     *
+     * @param title title
+     * @param content content
+     */
+    private void sendAttachment(int groupId, String title, String content) {
+        setNeedAlert(Boolean.TRUE);
+        TaskAlertInfo taskAlertInfo = new TaskAlertInfo();
+        taskAlertInfo.setAlertGroupId(groupId);
+        taskAlertInfo.setContent(content);
+        taskAlertInfo.setTitle(title);
+    }
+
+    /**
+     * pre sql
+     *
+     * @param connection connection
+     * @param preStatementsBinds preStatementsBinds
+     */
+    private void preSql(Connection connection,
+                        List<SqlBinds> preStatementsBinds) throws Exception {
+        for (SqlBinds sqlBind : preStatementsBinds) {
+            try (PreparedStatement pstmt = prepareStatementAndBind(connection, 
sqlBind)) {
+                int result = pstmt.executeUpdate();
+                logger.info("pre statement execute result: {}, for sql: {}", 
result, sqlBind.getSql());
+
+            }
+        }
+    }
+
+    /**
+     * post sql
+     *
+     * @param connection connection
+     * @param postStatementsBinds postStatementsBinds
+     */
+    private void postSql(Connection connection,
+                         List<SqlBinds> postStatementsBinds) throws Exception {
+        for (SqlBinds sqlBind : postStatementsBinds) {
+            try (PreparedStatement pstmt = prepareStatementAndBind(connection, 
sqlBind)) {
+                int result = pstmt.executeUpdate();
+                logger.info("post statement execute result: {},for sql: {}", 
result, sqlBind.getSql());
+            }
+        }
+    }
+
+    /**
+     * create temp function
+     *
+     * @param connection connection
+     * @param createFuncs createFuncs
+     */
+    private void createTempFunction(Connection connection,
+                                    List<String> createFuncs) throws Exception 
{
+        try (Statement funcStmt = connection.createStatement()) {
+            for (String createFunc : createFuncs) {
+                logger.info("hive create function sql: {}", createFunc);
+                funcStmt.execute(createFunc);
+            }
+        }
+    }
+
+    /**
+     * close jdbc resource
+     *
+     * @param resultSet resultSet
+     * @param pstmt pstmt
+     * @param connection connection
+     */
+    private void close(ResultSet resultSet,
+                       PreparedStatement pstmt,
+                       Connection connection) {
+        if (resultSet != null) {
+            try {
+                resultSet.close();
+            } catch (SQLException e) {
+                logger.error("close result set error : {}", e.getMessage(), e);
+            }
+        }
+
+        if (pstmt != null) {
+            try {
+                pstmt.close();
+            } catch (SQLException e) {
+                logger.error("close prepared statement error : {}", 
e.getMessage(), e);
+            }
+        }
+
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (SQLException e) {
+                logger.error("close connection error : {}", e.getMessage(), e);
+            }
+        }
+    }
+
+    /**
+     * preparedStatement bind
+     *
+     * @param connection connection
+     * @param sqlBinds sqlBinds
+     * @return PreparedStatement
+     * @throws Exception Exception
+     */
+    private PreparedStatement prepareStatementAndBind(Connection connection, 
SqlBinds sqlBinds) {
+        // is the timeout set
+        boolean timeoutFlag = taskExecutionContext.getTaskTimeoutStrategy() == 
TaskTimeoutStrategy.FAILED
+                || taskExecutionContext.getTaskTimeoutStrategy() == 
TaskTimeoutStrategy.WARNFAILED;
+        try (PreparedStatement stmt = 
connection.prepareStatement(sqlBinds.getSql())) {
+            if (timeoutFlag) {
+                stmt.setQueryTimeout(taskExecutionContext.getTaskTimeout());
+            }
+            Map<Integer, Property> params = sqlBinds.getParamsMap();
+            if (params != null) {
+                for (Map.Entry<Integer, Property> entry : params.entrySet()) {
+                    Property prop = entry.getValue();
+                    ParameterUtils.setInParameter(entry.getKey(), stmt, 
prop.getType(), prop.getValue());
+                }
+            }
+            logger.info("prepare statement replace sql : {} ", stmt);
+            return stmt;
+        } catch (Exception exception) {
+            throw new TaskException("SQL task prepareStatementAndBind error", 
exception);
+        }
+
+    }
+
+    /**
+     * regular expressions match the contents between two specified strings
+     *
+     * @param content content
+     * @param rgex rgex
+     * @param sqlParamsMap sql params map
+     * @param paramsPropsMap params props map
+     */
+    public void setSqlParamsMap(String content, String rgex, Map<Integer, 
Property> sqlParamsMap, Map<String, Property> paramsPropsMap) {
+        Pattern pattern = Pattern.compile(rgex);
+        Matcher m = pattern.matcher(content);
+        int index = 1;
+        while (m.find()) {
+
+            String paramName = m.group(1);
+            Property prop = paramsPropsMap.get(paramName);
+
+            if (prop == null) {
+                logger.error("setSqlParamsMap: No Property with paramName: {} 
is found in paramsPropsMap of task instance"
+                        + " with id: {}. So couldn't put Property in 
sqlParamsMap.", paramName, taskExecutionContext.getTaskInstanceId());
+            } else {
+                sqlParamsMap.put(index, prop);
+                index++;
+                logger.info("setSqlParamsMap: Property with paramName: {} put 
in sqlParamsMap of content {} successfully.", paramName, content);
+            }
+
+        }
+    }
+
+    /**
+     * print replace sql
+     *
+     * @param content content
+     * @param formatSql format sql
+     * @param rgex rgex
+     * @param sqlParamsMap sql params map
+     */
+    private void printReplacedSql(String content, String formatSql, String 
rgex, Map<Integer, Property> sqlParamsMap) {
+        //parameter print style
+        logger.info("after replace sql , preparing : {}", formatSql);
+        StringBuilder logPrint = new StringBuilder("replaced sql , 
parameters:");
+        if (sqlParamsMap == null) {
+            logger.info("printReplacedSql: sqlParamsMap is null.");
+        } else {
+            for (int i = 1; i <= sqlParamsMap.size(); i++) {
+                
logPrint.append(sqlParamsMap.get(i).getValue()).append("(").append(sqlParamsMap.get(i).getType()).append(")");
+            }
+        }
+        logger.info("Sql Params are {}", logPrint);
+    }
+
+    /**
+     * ready to execute SQL and parameter entity Map
+     *
+     * @return SqlBinds
+     */
+    private SqlBinds getSqlAndSqlParamsMap(String sql) {
+        Map<Integer, Property> sqlParamsMap = new HashMap<>();
+        StringBuilder sqlBuilder = new StringBuilder();
+
+        // combining local and global parameters
+        Map<String, Property> paramsMap = 
ParamUtils.convert(taskExecutionContext, getParameters());
+
+        // spell SQL according to the final user-defined variable
+        if (paramsMap == null) {
+            sqlBuilder.append(sql);
+            return new SqlBinds(sqlBuilder.toString(), sqlParamsMap);
+        }
+
+        if (StringUtils.isNotEmpty(sqlParameters.getTitle())) {
+            String title = 
ParameterUtils.convertParameterPlaceholders(sqlParameters.getTitle(),
+                    ParamUtils.convert(paramsMap));
+            logger.info("SQL title : {}", title);
+            sqlParameters.setTitle(title);
+        }
+
+        //new
+        //replace variable TIME with $[YYYYmmddd...] in sql when history run 
job and batch complement job
+        sql = ParameterUtils.replaceScheduleTime(sql, 
taskExecutionContext.getScheduleTime());
+        // special characters need to be escaped, ${} needs to be escaped
+        String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
+        setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap);
+        //Replace the original value in sql !{...} ,Does not participate in 
precompilation
+        String rgexo = "['\"]*\\!\\{(.*?)\\}['\"]*";
+        sql = replaceOriginalValue(sql, rgexo, paramsMap);
+        // replace the ${} of the SQL statement with the Placeholder
+        String formatSql = sql.replaceAll(rgex, "?");
+        sqlBuilder.append(formatSql);
+
+        // print repalce sql
+        printReplacedSql(sql, formatSql, rgex, sqlParamsMap);
+        return new SqlBinds(sqlBuilder.toString(), sqlParamsMap);
+    }
+
+    private String replaceOriginalValue(String content, String rgex, 
Map<String, Property> sqlParamsMap) {
+        Pattern pattern = Pattern.compile(rgex);
+        while (true) {
+            Matcher m = pattern.matcher(content);
+            if (!m.find()) {
+                break;
+            }
+            String paramName = m.group(1);
+            String paramValue = sqlParamsMap.get(paramName).getValue();
+            content = m.replaceFirst(paramValue);
+        }
+        return content;
+    }
+
+    /**
+     * create function list
+     *
+     * @param udfFuncTenantCodeMap key is udf function,value is tenant code
+     * @param logger logger
+     * @return create function list
+     */
+    public static List<String> createFuncs(Map<UdfFuncRequest, String> 
udfFuncTenantCodeMap, Logger logger) {
+
+        if (MapUtils.isEmpty(udfFuncTenantCodeMap)) {
+            logger.info("can't find udf function resource");
+            return null;
+        }
+        List<String> funcList = new ArrayList<>();
+        // build temp function sql
+        buildTempFuncSql(funcList, new 
ArrayList<>(udfFuncTenantCodeMap.keySet()));
+
+        return funcList;
+    }
+
+    /**
+     * build temp function sql
+     *
+     * @param sqls sql list
+     * @param udfFuncRequests udf function list
+     */
+    private static void buildTempFuncSql(List<String> sqls, 
List<UdfFuncRequest> udfFuncRequests) {
+        if (CollectionUtils.isNotEmpty(udfFuncRequests)) {
+            for (UdfFuncRequest udfFuncRequest : udfFuncRequests) {
+                sqls.add(MessageFormat
+                        .format(CREATE_FUNCTION_FORMAT, 
udfFuncRequest.getFuncName(), udfFuncRequest.getClassName()));
+            }
+        }
+    }
+
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskChannel.java
similarity index 88%
copy from 
dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
copy to 
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskChannel.java
index 01484c4..8da50f2 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskChannel.java
@@ -15,14 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.plugin.task.http;
+package org.apache.dolphinscheduler.plugin.task.sql;
 
 import org.apache.dolphinscheduler.spi.task.AbstractTask;
 import org.apache.dolphinscheduler.spi.task.TaskChannel;
 import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
 
-public class HttpTaskChannel implements TaskChannel {
-
+public class SqlTaskChannel implements TaskChannel {
     @Override
     public void cancelApplication(boolean status) {
 
@@ -30,7 +29,7 @@ public class HttpTaskChannel implements TaskChannel {
 
     @Override
     public AbstractTask createTask(TaskRequest taskRequest) {
-        return new HttpTask(taskRequest);
+        return new SqlTask(taskRequest);
     }
 
 }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskChannelFactory.java
similarity index 65%
copy from 
dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
copy to 
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskChannelFactory.java
index 01484c4..47091eb 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskChannelFactory.java
@@ -15,22 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.plugin.task.http;
+package org.apache.dolphinscheduler.plugin.task.sql;
 
-import org.apache.dolphinscheduler.spi.task.AbstractTask;
+import org.apache.dolphinscheduler.spi.params.base.PluginParams;
 import org.apache.dolphinscheduler.spi.task.TaskChannel;
-import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
+import org.apache.dolphinscheduler.spi.task.TaskChannelFactory;
 
-public class HttpTaskChannel implements TaskChannel {
+import java.util.List;
 
+public class SqlTaskChannelFactory implements TaskChannelFactory {
     @Override
-    public void cancelApplication(boolean status) {
-
+    public String getName() {
+        return "SQL";
     }
 
     @Override
-    public AbstractTask createTask(TaskRequest taskRequest) {
-        return new HttpTask(taskRequest);
+    public List<PluginParams> getParams() {
+        return null;
     }
 
+    @Override
+    public TaskChannel create() {
+        return new SqlTaskChannel();
+    }
 }
diff --git 
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskChannel.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskPlugin.java
similarity index 64%
copy from 
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskChannel.java
copy to 
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskPlugin.java
index 7ab3b17..65a3977 100644
--- 
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskChannel.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskPlugin.java
@@ -1,4 +1,4 @@
-package org.apache.dolphinscheduler.spi.task;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,12 +15,17 @@ package org.apache.dolphinscheduler.spi.task;/*
  * limitations under the License.
  */
 
-import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
+package org.apache.dolphinscheduler.plugin.task.sql;
 
-public interface TaskChannel {
+import org.apache.dolphinscheduler.spi.DolphinSchedulerPlugin;
+import org.apache.dolphinscheduler.spi.task.TaskChannelFactory;
 
-    void cancelApplication(boolean status);
+import com.google.common.collect.ImmutableList;
 
-    AbstractTask createTask(TaskRequest taskRequest);
+public class SqlTaskPlugin implements DolphinSchedulerPlugin {
 
+    @Override
+    public Iterable<TaskChannelFactory> getTaskChannelFactorys() {
+        return ImmutableList.of(new SqlTaskChannelFactory());
+    }
 }
diff --git 
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskChannel.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlType.java
similarity index 76%
copy from 
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskChannel.java
copy to 
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlType.java
index 7ab3b17..9db9268 100644
--- 
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskChannel.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlType.java
@@ -1,4 +1,4 @@
-package org.apache.dolphinscheduler.spi.task;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,12 +15,13 @@ package org.apache.dolphinscheduler.spi.task;/*
  * limitations under the License.
  */
 
-import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
-
-public interface TaskChannel {
-
-    void cancelApplication(boolean status);
-
-    AbstractTask createTask(TaskRequest taskRequest);
+package org.apache.dolphinscheduler.plugin.task.sql;
 
+public enum SqlType {
+    /**
+     * sql type
+     * 0 query
+     * 1 NON_QUERY
+     */
+    QUERY, NON_QUERY
 }

Reply via email to