This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch task-plugin
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/task-plugin by this push:
new 5a12e00 [Feature][Task]Task plugin spi (#6084)
5a12e00 is described below
commit 5a12e00cbe926b40162b459b94ed74b7e7f25903
Author: Kerwin <[email protected]>
AuthorDate: Thu Sep 2 10:08:38 2021 +0800
[Feature][Task]Task plugin spi (#6084)
---
.../dolphinscheduler/server/utils/ParamUtils.java | 4 +-
.../server/worker/runner/TaskExecuteThread.java | 14 +-
dolphinscheduler-service/pom.xml | 6 -
dolphinscheduler-spi/pom.xml | 11 +
.../spi/params/PluginParamsTransfer.java | 2 +-
.../spi/task/AbstractParameters.java | 106 +++++++
.../spi/utils/CollectionUtils.java | 313 +++++++++++++++++++++
.../dolphinscheduler/spi/utils/JSONUtils.java | 172 ++++++++++-
.../dolphinscheduler-task-api/pom.xml | 1 +
.../pom.xml | 22 +-
.../plugin/task/flink/FlinkTask.java | 1 -
.../plugin/task/flink/FlinkTaskChannelFactory.java | 2 +-
.../pom.xml | 24 +-
.../plugin/task/http/HttpCheckCondition.java} | 33 +--
.../plugin/task/http/HttpMethod.java} | 37 +--
.../plugin/task/http/HttpParameters.java | 134 +++++++++
.../plugin/task/http/HttpParametersType.java} | 34 +--
.../plugin/task/http/HttpProperty.java | 124 ++++++++
.../plugin/task/http/HttpTask.java | 47 ++++
.../plugin/task/http/HttpTaskChannel.java | 4 +
.../plugin/task/http/HttpTaskChannelFactory.java | 24 ++
.../plugin/task/http/HttpTaskPlugin.java | 13 +
.../pom.xml | 20 +-
.../pom.xml | 21 +-
.../task/python/PythonTaskChannelFactory.java | 2 +-
.../plugin/task/spark/SparkTaskChannel.java} | 22 +-
.../task/spark/SparkTaskChannelFanctory.java} | 18 +-
.../plugin/task/spark/SparkTaskPlugin.java} | 22 +-
.../pom.xml | 22 +-
.../pom.xml | 20 +-
dolphinscheduler-task-plugin/pom.xml | 7 +
31 files changed, 1066 insertions(+), 216 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
index 82c09d5..3dd8df0 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
@@ -29,8 +29,6 @@ import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
-import org.apache.logging.log4j.util.Strings;
-
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
@@ -81,7 +79,7 @@ public class ParamUtils {
params.putAll(globalParamsMap);
}
- if (Strings.isNotBlank(taskExecutionContext.getExecutePath())) {
+ if (StringUtils.isNotBlank(taskExecutionContext.getExecutePath())) {
params.put(Constants.PARAMETER_TASK_EXECUTE_PATH,taskExecutionContext.getExecutePath());
}
params.put(Constants.PARAMETER_TASK_INSTANCE_ID,Integer.toString(taskExecutionContext.getTaskInstanceId()));
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 5975294..c5fb466 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -37,9 +37,11 @@ import
org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCache
import
org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
-import org.apache.dolphinscheduler.server.worker.task.TaskManager;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.spi.task.AbstractTask;
+import org.apache.dolphinscheduler.spi.task.TaskChannel;
+import org.apache.dolphinscheduler.spi.task.TaskRequest;
import org.apache.commons.collections.MapUtils;
@@ -173,12 +175,12 @@ public class TaskExecuteThread implements Runnable,
Delayed {
//TODO Temporary operation, To be adjusted
TaskRequest taskRequest =
JSONUtils.parseObject(JSONUtils.toJsonString(taskExecutionContext),
TaskRequest.class);
+
task = taskChannel.createTask(taskRequest, taskLogger);
// task init
this.task.init();
//init varPool
- //TODO Temporary operation, To be adjusted
-//
this.task.getParameters().setVarPool(taskExecutionContext.getVarPool());
+
this.task.getParameters().setVarPool(taskExecutionContext.getVarPool());
// task handle
this.task.handle();
@@ -189,11 +191,10 @@ public class TaskExecuteThread implements Runnable,
Delayed {
responseCommand.setEndTime(new Date());
responseCommand.setProcessId(this.task.getProcessId());
responseCommand.setAppIds(this.task.getAppIds());
- //TODO Temporary operation, To be adjusted
-//
responseCommand.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
+
responseCommand.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
logger.info("task instance id : {},task final status : {}",
taskExecutionContext.getTaskInstanceId(), this.task.getExitStatus());
} catch (Throwable e) {
- e.printStackTrace();
+
logger.error("task scheduler failure", e);
kill();
responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
@@ -253,7 +254,6 @@ public class TaskExecuteThread implements Runnable, Delayed
{
return globalParamsMap;
}
-
/**
* kill task
*/
diff --git a/dolphinscheduler-service/pom.xml b/dolphinscheduler-service/pom.xml
index 8477139..dbdebb9 100644
--- a/dolphinscheduler-service/pom.xml
+++ b/dolphinscheduler-service/pom.xml
@@ -43,7 +43,6 @@
<artifactId>dolphinscheduler-spi</artifactId>
</dependency>
-
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
@@ -64,11 +63,6 @@
</dependency>
<dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-core</artifactId>
- </dependency>
-
- <dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz-jobs</artifactId>
</dependency>
diff --git a/dolphinscheduler-spi/pom.xml b/dolphinscheduler-spi/pom.xml
index 46115f8..79b7a58 100644
--- a/dolphinscheduler-spi/pom.xml
+++ b/dolphinscheduler-spi/pom.xml
@@ -47,6 +47,17 @@
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-collections4</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-beanutils</groupId>
+ <artifactId>commons-beanutils</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
diff --git
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/params/PluginParamsTransfer.java
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/params/PluginParamsTransfer.java
index 3e709ad..c5706d2 100644
---
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/params/PluginParamsTransfer.java
+++
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/params/PluginParamsTransfer.java
@@ -62,7 +62,7 @@ public class PluginParamsTransfer {
* @return return plugin params value
*/
public static List<Map<String, Object>> generatePluginParams(String
paramsJsonStr, String pluginParamsTemplate) {
- Map<String, Object> paramsMap = JSONUtils.toMap(paramsJsonStr);
+ Map<String, Object> paramsMap = JSONUtils.toMap(paramsJsonStr,
String.class, Object.class);
return generatePluginParams(paramsMap, pluginParamsTemplate);
}
diff --git
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractParameters.java
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractParameters.java
index e10b1e3..adb9136 100644
---
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractParameters.java
+++
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractParameters.java
@@ -15,10 +15,20 @@ package org.apache.dolphinscheduler.spi.task;/*
* limitations under the License.
*/
+import org.apache.dolphinscheduler.spi.utils.CollectionUtils;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+
/**
* job params related class
*/
@@ -30,6 +40,11 @@ public abstract class AbstractParameters implements
IParameters {
private List<Property> localParams;
/**
+ * var pool
+ */
+ public List<Property> varPool;
+
+ /**
* get local parameters list
* @return Property list
*/
@@ -57,4 +72,95 @@ public abstract class AbstractParameters implements
IParameters {
return null;
}
+ /**
+ * get varPool map
+ *
+ * @return parameters map
+ */
+ public Map<String, Property> getVarPoolMap() {
+ if (varPool != null) {
+ Map<String, Property> varPoolMap = new LinkedHashMap<>();
+ for (Property property : varPool) {
+ varPoolMap.put(property.getProp(), property);
+ }
+ return varPoolMap;
+ }
+ return null;
+ }
+
+ public List<Property> getVarPool() {
+ return varPool;
+ }
+
+ public void setVarPool(String varPool) {
+ if (StringUtils.isEmpty(varPool)) {
+ this.varPool = new ArrayList<>();
+ } else {
+ this.varPool = JSONUtils.toList(varPool, Property.class);
+ }
+ }
+
+ public void dealOutParam(String result) {
+ if (CollectionUtils.isEmpty(localParams)) {
+ return;
+ }
+ List<Property> outProperty = getOutProperty(localParams);
+ if (CollectionUtils.isEmpty(outProperty)) {
+ return;
+ }
+ if (StringUtils.isEmpty(result)) {
+ varPool.addAll(outProperty);
+ return;
+ }
+ Map<String, String> taskResult = getMapByString(result);
+ if (taskResult == null || taskResult.size() == 0) {
+ return;
+ }
+ for (Property info : outProperty) {
+ info.setValue(taskResult.get(info.getProp()));
+ varPool.add(info);
+ }
+ }
+
+ public List<Property> getOutProperty(List<Property> params) {
+ if (CollectionUtils.isEmpty(params)) {
+ return new ArrayList<>();
+ }
+ List<Property> result = new ArrayList<>();
+ for (Property info : params) {
+ if (info.getDirect() == Direct.OUT) {
+ result.add(info);
+ }
+ }
+ return result;
+ }
+
+ public List<Map<String, String>> getListMapByString(String json) {
+ List<Map<String, String>> allParams = new ArrayList<>();
+ ArrayNode paramsByJson = JSONUtils.parseArray(json);
+ Iterator<JsonNode> listIterator = paramsByJson.iterator();
+ while (listIterator.hasNext()) {
+ Map<String, String> param =
JSONUtils.toMap(listIterator.next().toString(), String.class, String.class);
+ allParams.add(param);
+ }
+ return allParams;
+ }
+
+ /**
+ * shell's result format is key=value$VarPool$key=value$VarPool$
+ * @param result
+ * @return
+ */
+ public static Map<String, String> getMapByString(String result) {
+ String[] formatResult = result.split("\\$VarPool\\$");
+ Map<String, String> format = new HashMap<>();
+ for (String info : formatResult) {
+ if (StringUtils.isNotEmpty(info) && info.contains("=")) {
+ String[] keyValue = info.split("=");
+ format.put(keyValue[0], keyValue[1]);
+ }
+ }
+ return format;
+ }
+
}
diff --git
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/CollectionUtils.java
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/CollectionUtils.java
new file mode 100644
index 0000000..cf2498e
--- /dev/null
+++
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/CollectionUtils.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.spi.utils;
+
+import org.apache.commons.beanutils.BeanMap;
+
+import java.util.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Provides utility methods and decorators for {@link Collection} instances.
+ * <p>
+ * Various utility methods might put the input objects into a Set/Map/Bag. In
case
+ * the input objects override {@link Object#equals(Object)}, it is mandatory
that
+ * the general contract of the {@link Object#hashCode()} method is maintained.
+ * <p>
+ * NOTE: From 4.0, method parameters will take {@link Iterable} objects when
possible.
+ *
+ * @version $Id: CollectionUtils.java 1686855 2015-06-22 13:00:27Z tn $
+ * @since 1.0
+ */
+public class CollectionUtils {
+
+ private CollectionUtils() {
+ throw new UnsupportedOperationException("Construct CollectionUtils");
+ }
+
+ /**
+ * The load factor used when none specified in constructor.
+ */
+ static final float DEFAULT_LOAD_FACTOR = 0.75f;
+
+ /**
+ * Returns a new {@link Collection} containing <i>a</i> minus a subset of
+ * <i>b</i>. Only the elements of <i>b</i> that satisfy the predicate
+ * condition, <i>p</i> are subtracted from <i>a</i>.
+ *
+ * <p>The cardinality of each element <i>e</i> in the returned {@link
Collection}
+ * that satisfies the predicate condition will be the cardinality of
<i>e</i> in <i>a</i>
+ * minus the cardinality of <i>e</i> in <i>b</i>, or zero, whichever is
greater.</p>
+ * <p>The cardinality of each element <i>e</i> in the returned {@link
Collection} that does <b>not</b>
+ * satisfy the predicate condition will be equal to the cardinality of
<i>e</i> in <i>a</i>.</p>
+ *
+ * @param a the collection to subtract from, must not be null
+ * @param b the collection to subtract, must not be null
+ * @param <T> T
+ * @return a new collection with the results
+ * @see Collection#removeAll
+ */
+ public static <T> Collection<T> subtract(Set<T> a, Set<T> b) {
+ return org.apache.commons.collections4.CollectionUtils.subtract(a, b);
+ }
+
+ public static boolean isNotEmpty(Collection coll) {
+ return !isEmpty(coll);
+ }
+
+ public static boolean isEmpty(Collection coll) {
+ return coll == null || coll.isEmpty();
+ }
+
+ /**
+ * String to map
+ *
+ * @param str string
+ * @param separator separator
+ * @return string to map
+ */
+ public static Map<String, String> stringToMap(String str, String
separator) {
+ return stringToMap(str, separator, "");
+ }
+
+ /**
+ * String to map
+ *
+ * @param str string
+ * @param separator separator
+ * @param keyPrefix prefix
+ * @return string to map
+ */
+ public static Map<String, String> stringToMap(String str, String
separator, String keyPrefix) {
+
+ Map<String, String> emptyMap = new HashMap<>(0);
+ if (StringUtils.isEmpty(str)) {
+ return emptyMap;
+ }
+ if (StringUtils.isEmpty(separator)) {
+ return emptyMap;
+ }
+ String[] strings = str.split(separator);
+ int initialCapacity = (int)(strings.length / DEFAULT_LOAD_FACTOR) + 1;
+ Map<String, String> map = new HashMap<>(initialCapacity);
+ for (int i = 0; i < strings.length; i++) {
+ String[] strArray = strings[i].split("=");
+ if (strArray.length != 2) {
+ return emptyMap;
+ }
+ //strArray[0] KEY strArray[1] VALUE
+ if (StringUtils.isEmpty(keyPrefix)) {
+ map.put(strArray[0], strArray[1]);
+ } else {
+ map.put(keyPrefix + strArray[0], strArray[1]);
+ }
+ }
+ return map;
+ }
+
+ /**
+ * Transform item in collection
+ *
+ * @param collection origin collection
+ * @param transformFunc transform function
+ * @param <R> origin item type
+ * @param <T> target type
+ * @return transform list
+ */
+ public static <R, T> List<T> transformToList(Collection<R> collection,
Function<R, T> transformFunc) {
+ if (isEmpty(collection)) {
+ return new ArrayList<>();
+ }
+ return
collection.stream().map(transformFunc).collect(Collectors.toList());
+ }
+
+ /**
+ * Collect collection to map
+ *
+ * @param collection origin collection
+ * @param keyTransformFunction key transform function
+ * @param <K> target k type
+ * @param <V> value
+ * @return map
+ */
+ public static <K, V> Map<K, V> collectionToMap(Collection<V> collection,
Function<V, K> keyTransformFunction) {
+ if (isEmpty(collection)) {
+ return new HashMap<>();
+ }
+ return
collection.stream().collect(Collectors.toMap(keyTransformFunction,
Function.identity()));
+ }
+
+ /**
+ * Helper class to easily access cardinality properties of two collections.
+ *
+ * @param <O> the element type
+ */
+ private static class CardinalityHelper<O> {
+
+ /**
+ * Contains the cardinality for each object in collection A.
+ */
+ final Map<O, Integer> cardinalityA;
+
+ /**
+ * Contains the cardinality for each object in collection B.
+ */
+ final Map<O, Integer> cardinalityB;
+
+ /**
+ * Create a new CardinalityHelper for two collections.
+ *
+ * @param a the first collection
+ * @param b the second collection
+ */
+ public CardinalityHelper(final Iterable<? extends O> a, final
Iterable<? extends O> b) {
+ cardinalityA = CollectionUtils.getCardinalityMap(a);
+ cardinalityB = CollectionUtils.getCardinalityMap(b);
+ }
+
+ /**
+ * Returns the frequency of this object in collection A.
+ *
+ * @param obj the object
+ * @return the frequency of the object in collection A
+ */
+ public int freqA(final Object obj) {
+ return getFreq(obj, cardinalityA);
+ }
+
+ /**
+ * Returns the frequency of this object in collection B.
+ *
+ * @param obj the object
+ * @return the frequency of the object in collection B
+ */
+ public int freqB(final Object obj) {
+ return getFreq(obj, cardinalityB);
+ }
+
+ private int getFreq(final Object obj, final Map<?, Integer> freqMap) {
+ final Integer count = freqMap.get(obj);
+ if (count != null) {
+ return count;
+ }
+ return 0;
+ }
+ }
+
+ /**
+ * returns {@code true} iff the given {@link Collection}s contain
+ * exactly the same elements with exactly the same cardinalities.
+ *
+ * @param a the first collection
+ * @param b the second collection
+ * @return Returns true iff the given Collections contain exactly the same
elements with exactly the same cardinalities.
+ * That is, iff the cardinality of e in a is equal to the cardinality of e
in b, for each element e in a or b.
+ */
+ public static boolean equalLists(Collection<?> a, Collection<?> b) {
+ if (a == null && b == null) {
+ return true;
+ }
+
+ if (a == null || b == null) {
+ return false;
+ }
+
+ return isEqualCollection(a, b);
+ }
+
+ /**
+ * Returns {@code true} iff the given {@link Collection}s contain
+ * exactly the same elements with exactly the same cardinalities.
+ * <p>
+ * That is, iff the cardinality of <i>e</i> in <i>a</i> is
+ * equal to the cardinality of <i>e</i> in <i>b</i>,
+ * for each element <i>e</i> in <i>a</i> or <i>b</i>.
+ *
+ * @param a the first collection, must not be null
+ * @param b the second collection, must not be null
+ * @return <code>true</code> iff the collections contain the same elements
with the same cardinalities.
+ */
+ public static boolean isEqualCollection(final Collection<?> a, final
Collection<?> b) {
+ if (a.size() != b.size()) {
+ return false;
+ }
+ final CardinalityHelper<Object> helper = new CardinalityHelper<>(a, b);
+ if (helper.cardinalityA.size() != helper.cardinalityB.size()) {
+ return false;
+ }
+ for (final Object obj : helper.cardinalityA.keySet()) {
+ if (helper.freqA(obj) != helper.freqB(obj)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Returns a {@link Map} mapping each unique element in the given
+ * {@link Collection} to an {@link Integer} representing the number
+ * of occurrences of that element in the {@link Collection}.
+ * <p>
+ * Only those elements present in the collection will appear as
+ * keys in the map.
+ *
+ * @param <O> the type of object in the returned {@link Map}. This is a
super type of O
+ * @param coll the collection to get the cardinality map for, must not be
null
+ * @return the populated cardinality map
+ */
+ public static <O> Map<O, Integer> getCardinalityMap(final Iterable<?
extends O> coll) {
+ final Map<O, Integer> count = new HashMap<>();
+ for (final O obj : coll) {
+ count.put(obj, count.getOrDefault(obj, 0) + 1);
+ }
+ return count;
+ }
+
+ /**
+ * Removes certain attributes of each object in the list
+ *
+ * @param originList origin list
+ * @param exclusionSet exclusion set
+ * @param <T> T
+ * @return removes certain attributes of each object in the list
+ */
+ public static <T extends Object> List<Map<String, Object>>
getListByExclusion(List<T> originList, Set<String> exclusionSet) {
+ List<Map<String, Object>> instanceList = new ArrayList<>();
+ if (exclusionSet == null) {
+ exclusionSet = new HashSet<>();
+ }
+ if (originList == null) {
+ return instanceList;
+ }
+ Map<String, Object> instanceMap;
+ for (T instance : originList) {
+ BeanMap beanMap = new BeanMap(instance);
+ instanceMap = new LinkedHashMap<>(16, 0.75f, true);
+ for (Map.Entry<Object, Object> entry : beanMap.entrySet()) {
+ if (exclusionSet.contains(entry.getKey())) {
+ continue;
+ }
+ instanceMap.put((String) entry.getKey(), entry.getValue());
+ }
+ instanceList.add(instanceMap);
+ }
+ return instanceList;
+ }
+
+}
diff --git
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/JSONUtils.java
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/JSONUtils.java
index e87e210..ed53cb2 100644
---
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/JSONUtils.java
+++
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/JSONUtils.java
@@ -17,10 +17,14 @@
package org.apache.dolphinscheduler.spi.utils;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
import static
com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
import static
com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
import static
com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
+import static
com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -30,12 +34,21 @@ import java.util.TimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
import com.fasterxml.jackson.databind.type.CollectionType;
/**
@@ -52,9 +65,23 @@ public class JSONUtils {
.configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
.configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
.configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
+ .configure(REQUIRE_SETTERS_FOR_GETTERS, true)
.setTimeZone(TimeZone.getDefault());
private JSONUtils() {
+ throw new UnsupportedOperationException("Construct JSONUtils");
+ }
+
+ public static ArrayNode createArrayNode() {
+ return objectMapper.createArrayNode();
+ }
+
+ public static ObjectNode createObjectNode() {
+ return objectMapper.createObjectNode();
+ }
+
+ public static JsonNode toJsonNode(Object obj) {
+ return objectMapper.valueToTree(obj);
}
/**
@@ -103,6 +130,22 @@ public class JSONUtils {
}
/**
+ * deserialize
+ *
+ * @param src byte array
+ * @param clazz class
+ * @param <T> deserialize type
+ * @return deserialize type
+ */
+ public static <T> T parseObject(byte[] src, Class<T> clazz) {
+ if (src == null) {
+ return null;
+ }
+ String json = new String(src, UTF_8);
+ return parseObject(json, clazz);
+ }
+
+ /**
* json to list
*
* @param json json string
@@ -127,12 +170,83 @@ public class JSONUtils {
}
/**
+ * check json object valid
+ *
+ * @param json json
+ * @return true if valid
+ */
+ public static boolean checkJsonValid(String json) {
+
+ if (StringUtils.isEmpty(json)) {
+ return false;
+ }
+
+ try {
+ objectMapper.readTree(json);
+ return true;
+ } catch (IOException e) {
+ logger.error("check json object valid exception!", e);
+ }
+
+ return false;
+ }
+
+ /**
+ * Method for finding a JSON Object field with specified name in this
+ * node or its child nodes, and returning value it has.
+ * If no matching field is found in this node or its descendants, returns
null.
+ *
+ * @param jsonNode json node
+ * @param fieldName Name of field to look for
+ * @return Value of first matching node found, if any; null if none
+ */
+ public static String findValue(JsonNode jsonNode, String fieldName) {
+ JsonNode node = jsonNode.findValue(fieldName);
+
+ if (node == null) {
+ return null;
+ }
+
+ return node.asText();
+ }
+
+ /**
* json to map
+ * {@link #toMap(String, Class, Class)}
*
* @param json json
* @return json to map
*/
- public static <K, V> Map<K, V> toMap(String json) {
+ public static Map<String, String> toMap(String json) {
+ return parseObject(json, new TypeReference<Map<String, String>>() {});
+ }
+
+ /**
+ * from the key-value generated json to get the str value no matter the
real type of value
+ * @param json the json str
+ * @param nodeName key
+ * @return the str value of key
+ */
+ public static String getNodeString(String json, String nodeName) {
+ try {
+ JsonNode rootNode = objectMapper.readTree(json);
+ return rootNode.has(nodeName) ? rootNode.get(nodeName).toString()
: "";
+ } catch (JsonProcessingException e) {
+ return "";
+ }
+ }
+
+ /**
+ * json to map
+ *
+ * @param json json
+ * @param classK classK
+ * @param classV classV
+ * @param <K> K
+ * @param <V> V
+ * @return to map
+ */
+ public static <K, V> Map<K, V> toMap(String json, Class<K> classK,
Class<V> classV) {
return parseObject(json, new TypeReference<Map<K, V>>() {});
}
@@ -172,9 +286,34 @@ public class JSONUtils {
}
}
+ /**
+ * serialize to json byte
+ *
+ * @param obj object
+ * @param <T> object type
+ * @return byte array
+ */
+ public static <T> byte[] toJsonByteArray(T obj) {
+ if (obj == null) {
+ return null;
+ }
+ String json = "";
+ try {
+ json = toJsonString(obj);
+ } catch (Exception e) {
+ logger.error("json serialize exception.", e);
+ }
+
+ return json.getBytes(UTF_8);
+ }
+
public static ObjectNode parseObject(String text) {
try {
- return (ObjectNode) objectMapper.readTree(text);
+ if (text.isEmpty()) {
+ return parseObject(text, ObjectNode.class);
+ } else {
+ return (ObjectNode) objectMapper.readTree(text);
+ }
} catch (Exception e) {
throw new RuntimeException("String json deserialization
exception.", e);
}
@@ -187,4 +326,33 @@ public class JSONUtils {
throw new RuntimeException("Json deserialization exception.", e);
}
}
+
+ /**
+ * json serializer
+ */
+ public static class JsonDataSerializer extends JsonSerializer<String> {
+
+ @Override
+ public void serialize(String value, JsonGenerator gen,
SerializerProvider provider) throws IOException {
+ gen.writeRawValue(value);
+ }
+
+ }
+
+ /**
+ * json data deserializer
+ */
+ public static class JsonDataDeserializer extends JsonDeserializer<String> {
+
+ @Override
+ public String deserialize(JsonParser p, DeserializationContext ctxt)
throws IOException {
+ JsonNode node = p.getCodec().readTree(p);
+ if (node instanceof TextNode) {
+ return node.asText();
+ } else {
+ return node.toString();
+ }
+ }
+
+ }
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
index 75f87cb..6ec6b39 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
@@ -49,6 +49,7 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
+ <scope>provided</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/pom.xml
similarity index 73%
copy from dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
copy to dolphinscheduler-task-plugin/dolphinscheduler-task-datax/pom.xml
index 75f87cb..99ac7cd 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/pom.xml
@@ -25,7 +25,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>dolphinscheduler-task-api</artifactId>
+ <artifactId>dolphinscheduler-task-datax</artifactId>
<dependencies>
<dependency>
@@ -34,21 +34,13 @@
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-task-api</artifactId>
+ <version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
</dependencies>
+
+
+
</project>
\ No newline at end of file
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
index 7a88ace..8d9bb7f 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
@@ -31,7 +31,6 @@ import org.slf4j.Logger;
public class FlinkTask extends AbstractYarnTask {
-
/**
* flink command
* usage: flink run [OPTIONS] <jar-file> <arguments>
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskChannelFactory.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskChannelFactory.java
index 6ea5a66..6d843c6 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskChannelFactory.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskChannelFactory.java
@@ -31,7 +31,7 @@ public class FlinkTaskChannelFactory implements
TaskChannelFactory {
@Override
public String getName() {
- return "Flink";
+ return "FLINK";
}
@Override
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/pom.xml
similarity index 73%
copy from dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
copy to dolphinscheduler-task-plugin/dolphinscheduler-task-http/pom.xml
index 75f87cb..0aaea8d 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/pom.xml
@@ -25,7 +25,8 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>dolphinscheduler-task-api</artifactId>
+ <artifactId>dolphinscheduler-task-http</artifactId>
+ <packaging>dolphinscheduler-plugin</packaging>
<dependencies>
<dependency>
@@ -34,21 +35,14 @@
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-task-api</artifactId>
+ <version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
</dependencies>
+
+ <build>
+ <finalName>dolphinscheduler-task-http-${project.version}</finalName>
+ </build>
</project>
\ No newline at end of file
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskChannelFactory.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpCheckCondition.java
similarity index 57%
copy from
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskChannelFactory.java
copy to
dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpCheckCondition.java
index 6ea5a66..b2f423b 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskChannelFactory.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpCheckCondition.java
@@ -15,27 +15,18 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.plugin.task.flink;
+package org.apache.dolphinscheduler.plugin.task.http;
-import org.apache.dolphinscheduler.spi.params.base.PluginParams;
-import org.apache.dolphinscheduler.spi.task.TaskChannel;
-import org.apache.dolphinscheduler.spi.task.TaskChannelFactory;
-
-import java.util.List;
-
-public class FlinkTaskChannelFactory implements TaskChannelFactory {
- @Override
- public TaskChannel create() {
- return new FlinkTaskChannel();
- }
-
- @Override
- public String getName() {
- return "Flink";
- }
+/**
+ * http check condition
+ */
+public enum HttpCheckCondition {
+ /**
+ * 0 status_code_default:200
+ * 1 status_code_custom
+ * 2 body_contains
+ * 3 body_not_contains
+ */
+ STATUS_CODE_DEFAULT,STATUS_CODE_CUSTOM, BODY_CONTAINS, BODY_NOT_CONTAINS
- @Override
- public List<PluginParams> getParams() {
- return null;
- }
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskChannelFactory.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpMethod.java
similarity index 57%
copy from
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskChannelFactory.java
copy to
dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpMethod.java
index 6ea5a66..d949f8a 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskChannelFactory.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpMethod.java
@@ -15,27 +15,18 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.plugin.task.flink;
+package org.apache.dolphinscheduler.plugin.task.http;
-import org.apache.dolphinscheduler.spi.params.base.PluginParams;
-import org.apache.dolphinscheduler.spi.task.TaskChannel;
-import org.apache.dolphinscheduler.spi.task.TaskChannelFactory;
-
-import java.util.List;
-
-public class FlinkTaskChannelFactory implements TaskChannelFactory {
- @Override
- public TaskChannel create() {
- return new FlinkTaskChannel();
- }
-
- @Override
- public String getName() {
- return "Flink";
- }
-
- @Override
- public List<PluginParams> getParams() {
- return null;
- }
-}
+/**
+ * http method
+ */
+public enum HttpMethod {
+ /**
+ * 0 get
+ * 1 post
+ * 2 head
+ * 3 put
+ * 4 delete
+ */
+ GET, POST, HEAD, PUT, DELETE
+}
\ No newline at end of file
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpParameters.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpParameters.java
new file mode 100644
index 0000000..efad7d3
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpParameters.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.http;
+
+import org.apache.dolphinscheduler.spi.task.AbstractParameters;
+import org.apache.dolphinscheduler.spi.task.ResourceInfo;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * http parameter
+ */
+public class HttpParameters extends AbstractParameters {
+ /**
+ * url
+ */
+ private String url;
+
+ /**
+ * httpMethod
+ */
+ private HttpMethod httpMethod;
+
+ /**
+ * http params
+ */
+ private List<HttpProperty> httpParams;
+
+ /**
+ * httpCheckCondition
+ */
+ private HttpCheckCondition httpCheckCondition =
HttpCheckCondition.STATUS_CODE_DEFAULT;
+
+ /**
+ * condition
+ */
+ private String condition;
+
+
+ /**
+ * Connect Timeout
+ * Unit: ms
+ */
+ private int connectTimeout ;
+
+ /**
+ * Socket Timeout
+ * Unit: ms
+ */
+ private int socketTimeout ;
+
+ @Override
+ public boolean checkParameters() {
+ return StringUtils.isNotEmpty(url);
+ }
+
+ @Override
+ public List<ResourceInfo> getResourceFilesList() {
+ return new ArrayList<>();
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public HttpMethod getHttpMethod() {
+ return httpMethod;
+ }
+
+ public void setHttpMethod(HttpMethod httpMethod) {
+ this.httpMethod = httpMethod;
+ }
+
+ public List<HttpProperty> getHttpParams() {
+ return httpParams;
+ }
+
+ public void setHttpParams(List<HttpProperty> httpParams) {
+ this.httpParams = httpParams;
+ }
+
+ public HttpCheckCondition getHttpCheckCondition() {
+ return httpCheckCondition;
+ }
+
+ public void setHttpCheckCondition(HttpCheckCondition httpCheckCondition) {
+ this.httpCheckCondition = httpCheckCondition;
+ }
+
+ public String getCondition() {
+ return condition;
+ }
+
+ public void setCondition(String condition) {
+ this.condition = condition;
+ }
+
+ public int getConnectTimeout() {
+ return connectTimeout;
+ }
+
+ public void setConnectTimeout(int connectTimeout) {
+ this.connectTimeout = connectTimeout;
+ }
+
+ public int getSocketTimeout() {
+ return socketTimeout;
+ }
+
+ public void setSocketTimeout(int socketTimeout) {
+ this.socketTimeout = socketTimeout;
+ }
+}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskChannelFactory.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpParametersType.java
similarity index 57%
copy from
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskChannelFactory.java
copy to
dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpParametersType.java
index 6ea5a66..83b6a74 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskChannelFactory.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpParametersType.java
@@ -14,28 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.dolphinscheduler.plugin.task.http;
-package org.apache.dolphinscheduler.plugin.task.flink;
-
-import org.apache.dolphinscheduler.spi.params.base.PluginParams;
-import org.apache.dolphinscheduler.spi.task.TaskChannel;
-import org.apache.dolphinscheduler.spi.task.TaskChannelFactory;
-
-import java.util.List;
-
-public class FlinkTaskChannelFactory implements TaskChannelFactory {
- @Override
- public TaskChannel create() {
- return new FlinkTaskChannel();
- }
-
- @Override
- public String getName() {
- return "Flink";
- }
-
- @Override
- public List<PluginParams> getParams() {
- return null;
- }
+/**
+ * http parameters type
+ */
+public enum HttpParametersType {
+ /**
+ * 0 parameter;
+ * 1 body;
+ * 2 headers;
+ */
+ PARAMETER,BODY,HEADERS
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpProperty.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpProperty.java
new file mode 100644
index 0000000..5aa44d1
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpProperty.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.http;
+
+import java.util.Objects;
+
+public class HttpProperty {
+ /**
+ * key
+ */
+ private String prop;
+
+ /**
+ * httpParametersType
+ */
+ private HttpParametersType httpParametersType;
+
+ /**
+ * value
+ */
+ private String value;
+
+ public HttpProperty() {
+ }
+
+ public HttpProperty(String prop, HttpParametersType httpParametersType,
String value) {
+ this.prop = prop;
+ this.httpParametersType = httpParametersType;
+ this.value = value;
+ }
+
+ /**
+ * getter method
+ *
+ * @return the prop
+ * @see HttpProperty#prop
+ */
+ public String getProp() {
+ return prop;
+ }
+
+ /**
+ * setter method
+ *
+ * @param prop the prop to set
+ * @see HttpProperty#prop
+ */
+ public void setProp(String prop) {
+ this.prop = prop;
+ }
+
+ /**
+ * getter method
+ *
+ * @return the value
+ * @see HttpProperty#value
+ */
+ public String getValue() {
+ return value;
+ }
+
+ /**
+ * setter method
+ *
+ * @param value the value to set
+ * @see HttpProperty#value
+ */
+ public void setValue(String value) {
+ this.value = value;
+ }
+
+ public HttpParametersType getHttpParametersType() {
+ return httpParametersType;
+ }
+
+ public void setHttpParametersType(HttpParametersType httpParametersType) {
+ this.httpParametersType = httpParametersType;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ HttpProperty property = (HttpProperty) o;
+ return Objects.equals(prop, property.prop) &&
+ Objects.equals(value, property.value);
+ }
+
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(prop, value);
+ }
+
+ @Override
+ public String toString() {
+ return "HttpProperty{" +
+ "prop='" + prop + '\'' +
+ ", httpParametersType=" + httpParametersType +
+ ", value='" + value + '\'' +
+ '}';
+ }
+
+
+}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java
new file mode 100644
index 0000000..f6273a6
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java
@@ -0,0 +1,47 @@
+package org.apache.dolphinscheduler.plugin.task.http;
+
+import org.apache.dolphinscheduler.spi.task.AbstractParameters;
+import org.apache.dolphinscheduler.spi.task.AbstractTask;
+import org.apache.dolphinscheduler.spi.task.TaskRequest;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.slf4j.Logger;
+
+public class HttpTask extends AbstractTask {
+
+ /**
+ * taskExecutionContext
+ */
+ private TaskRequest taskExecutionContext;
+
+ private HttpParameters httpParameters;
+
+ /**
+ * constructor
+ *
+ * @param taskExecutionContext taskExecutionContext
+ * @param logger logger
+ */
+ public HttpTask(TaskRequest taskExecutionContext, Logger logger) {
+ super(taskExecutionContext, logger);
+ }
+
+ @Override
+ public void init() {
+ logger.info("http task params {}",
taskExecutionContext.getTaskParams());
+ this.httpParameters =
JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
HttpParameters.class);
+
+ if (!httpParameters.checkParameters()) {
+ throw new RuntimeException("http task params is not valid");
+ }
+ }
+
+ @Override
+ public void handle() throws Exception {
+
+ }
+
+ @Override
+ public AbstractParameters getParameters() {
+ return null;
+ }
+}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
new file mode 100644
index 0000000..632a50f
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
@@ -0,0 +1,4 @@
+package org.apache.dolphinscheduler.plugin.task.http;
+
+public class HttpTaskChannel {
+}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannelFactory.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannelFactory.java
new file mode 100644
index 0000000..25ad810
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannelFactory.java
@@ -0,0 +1,24 @@
+package org.apache.dolphinscheduler.plugin.task.http;
+
+import org.apache.dolphinscheduler.spi.params.base.PluginParams;
+import org.apache.dolphinscheduler.spi.task.TaskChannel;
+import org.apache.dolphinscheduler.spi.task.TaskChannelFactory;
+
+import java.util.List;
+
+public class HttpTaskChannelFactory implements TaskChannelFactory {
+ @Override
+ public String getName() {
+ return null;
+ }
+
+ @Override
+ public List<PluginParams> getParams() {
+ return null;
+ }
+
+ @Override
+ public TaskChannel create() {
+ return null;
+ }
+}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskPlugin.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskPlugin.java
new file mode 100644
index 0000000..0158dcf
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskPlugin.java
@@ -0,0 +1,13 @@
+package org.apache.dolphinscheduler.plugin.task.http;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.dolphinscheduler.spi.DolphinSchedulerPlugin;
+import org.apache.dolphinscheduler.spi.task.TaskChannelFactory;
+
+public class HttpTaskPlugin implements DolphinSchedulerPlugin {
+
+ @Override
+ public Iterable<TaskChannelFactory> getTaskChannelFactorys() {
+ return ImmutableList.of(new HttpTaskChannelFactory());
+ }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/pom.xml
similarity index 73%
copy from dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
copy to dolphinscheduler-task-plugin/dolphinscheduler-task-mr/pom.xml
index 75f87cb..6582a31 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/pom.xml
@@ -25,7 +25,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>dolphinscheduler-task-api</artifactId>
+ <artifactId>dolphinscheduler-task-mr</artifactId>
<dependencies>
<dependency>
@@ -34,21 +34,11 @@
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-task-api</artifactId>
+ <version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
</dependencies>
+
</project>
\ No newline at end of file
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/pom.xml
similarity index 73%
copy from dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
copy to dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/pom.xml
index 75f87cb..9c13b8c 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/pom.xml
@@ -25,7 +25,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>dolphinscheduler-task-api</artifactId>
+ <artifactId>dolphinscheduler-task-procedure</artifactId>
<dependencies>
<dependency>
@@ -34,21 +34,12 @@
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-task-api</artifactId>
+ <version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
</dependencies>
+
+
</project>
\ No newline at end of file
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTaskChannelFactory.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTaskChannelFactory.java
index 04c59cd..c41938b 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTaskChannelFactory.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTaskChannelFactory.java
@@ -29,7 +29,7 @@ public class PythonTaskChannelFactory implements
TaskChannelFactory {
@Override
public String getName() {
- return "Python";
+ return "PYTHON";
}
@Override
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskChannelFactory.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskChannel.java
similarity index 65%
copy from
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskChannelFactory.java
copy to
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskChannel.java
index 6ea5a66..f4230c5 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskChannelFactory.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskChannel.java
@@ -15,27 +15,23 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.plugin.task.flink;
+package org.apache.dolphinscheduler.plugin.task.spark;
-import org.apache.dolphinscheduler.spi.params.base.PluginParams;
+import org.apache.dolphinscheduler.spi.task.AbstractTask;
import org.apache.dolphinscheduler.spi.task.TaskChannel;
-import org.apache.dolphinscheduler.spi.task.TaskChannelFactory;
+import org.apache.dolphinscheduler.spi.task.TaskRequest;
-import java.util.List;
+import org.slf4j.Logger;
-public class FlinkTaskChannelFactory implements TaskChannelFactory {
- @Override
- public TaskChannel create() {
- return new FlinkTaskChannel();
- }
+public class SparkTaskChannel implements TaskChannel {
@Override
- public String getName() {
- return "Flink";
+ public void cancelApplication(boolean status) {
+
}
@Override
- public List<PluginParams> getParams() {
- return null;
+ public AbstractTask createTask(TaskRequest taskRequest, Logger logger) {
+ return new SparkTask(taskRequest, logger);
}
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTaskChannelFactory.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskChannelFanctory.java
similarity index 86%
copy from
dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTaskChannelFactory.java
copy to
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskChannelFanctory.java
index 04c59cd..3169fac 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTaskChannelFactory.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskChannelFanctory.java
@@ -1,4 +1,4 @@
-package org.apache.dolphinscheduler.plugin.task.python;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,25 +15,27 @@ package org.apache.dolphinscheduler.plugin.task.python;/*
* limitations under the License.
*/
+package org.apache.dolphinscheduler.plugin.task.spark;
+
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.task.TaskChannel;
import org.apache.dolphinscheduler.spi.task.TaskChannelFactory;
import java.util.List;
-public class PythonTaskChannelFactory implements TaskChannelFactory {
- @Override
- public TaskChannel create() {
- return new PythonTaskChannel();
- }
-
+public class SparkTaskChannelFanctory implements TaskChannelFactory {
@Override
public String getName() {
- return "Python";
+ return "SPARK";
}
@Override
public List<PluginParams> getParams() {
return null;
}
+
+ @Override
+ public TaskChannel create() {
+ return new SparkTaskChannel();
+ }
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskChannelFactory.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskPlugin.java
similarity index 64%
copy from
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskChannelFactory.java
copy to
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskPlugin.java
index 6ea5a66..aa18897 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskChannelFactory.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskPlugin.java
@@ -15,27 +15,17 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.plugin.task.flink;
+package org.apache.dolphinscheduler.plugin.task.spark;
-import org.apache.dolphinscheduler.spi.params.base.PluginParams;
-import org.apache.dolphinscheduler.spi.task.TaskChannel;
+import org.apache.dolphinscheduler.spi.DolphinSchedulerPlugin;
import org.apache.dolphinscheduler.spi.task.TaskChannelFactory;
-import java.util.List;
+import com.google.common.collect.ImmutableList;
-public class FlinkTaskChannelFactory implements TaskChannelFactory {
- @Override
- public TaskChannel create() {
- return new FlinkTaskChannel();
- }
-
- @Override
- public String getName() {
- return "Flink";
- }
+public class SparkTaskPlugin implements DolphinSchedulerPlugin {
@Override
- public List<PluginParams> getParams() {
- return null;
+ public Iterable<TaskChannelFactory> getTaskChannelFactorys() {
+ return ImmutableList.of(new SparkTaskChannelFanctory());
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/pom.xml
similarity index 73%
copy from dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
copy to dolphinscheduler-task-plugin/dolphinscheduler-task-sql/pom.xml
index 75f87cb..6f56744 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/pom.xml
@@ -25,7 +25,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>dolphinscheduler-task-api</artifactId>
+ <artifactId>dolphinscheduler-task-sql</artifactId>
<dependencies>
<dependency>
@@ -34,21 +34,13 @@
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-task-api</artifactId>
+ <version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
</dependencies>
+
+
+
</project>
\ No newline at end of file
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/pom.xml
similarity index 73%
copy from dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
copy to dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/pom.xml
index 75f87cb..fb52e95 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/pom.xml
@@ -25,7 +25,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>dolphinscheduler-task-api</artifactId>
+ <artifactId>dolphinscheduler-task-sqoop</artifactId>
<dependencies>
<dependency>
@@ -34,21 +34,11 @@
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-task-api</artifactId>
+ <version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
</dependencies>
+
</project>
\ No newline at end of file
diff --git a/dolphinscheduler-task-plugin/pom.xml
b/dolphinscheduler-task-plugin/pom.xml
index d6459fe..e8ba5aa 100644
--- a/dolphinscheduler-task-plugin/pom.xml
+++ b/dolphinscheduler-task-plugin/pom.xml
@@ -34,7 +34,14 @@
<module>dolphinscheduler-task-flink</module>
<module>dolphinscheduler-task-python</module>
<module>dolphinscheduler-task-spark</module>
+ <module>dolphinscheduler-task-http</module>
+ <module>dolphinscheduler-task-sql</module>
+ <module>dolphinscheduler-task-sqoop</module>
+ <module>dolphinscheduler-task-datax</module>
+ <module>dolphinscheduler-task-mr</module>
+ <module>dolphinscheduler-task-procedure</module>
<module>dolphinscheduler-task-tis</module>
+
</modules>