This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 94da3cba [Feature#1270][Connectors] add source plugin:
seatunnel-connector-spark-http (#1678)
94da3cba is described below
commit 94da3cba9e071b1fb006ca001cd6ca96b6662cd9
Author: tmljob <[email protected]>
AuthorDate: Tue Apr 12 13:37:05 2022 +0800
[Feature#1270][Connectors] add source plugin:
seatunnel-connector-spark-http (#1678)
* [Feature#1270][Connectors] add plugin: seatunnel-connector-http
---
docs/en/connector/source/Http.md | 61 ++++
.../seatunnel-connectors-spark/pom.xml | 1 +
.../seatunnel-connector-spark-http/pom.xml | 51 ++++
.../apache/seatunnel/spark/http/source/Http.java | 140 +++++++++
.../spark/http/source/constant/Settings.java | 30 ++
.../spark/http/source/util/HttpClientResult.java | 73 +++++
.../spark/http/source/util/HttpClientUtils.java | 338 +++++++++++++++++++++
.../org.apache.seatunnel.spark.BaseSparkSource | 18 ++
8 files changed, 712 insertions(+)
diff --git a/docs/en/connector/source/Http.md b/docs/en/connector/source/Http.md
new file mode 100644
index 00000000..84b74c3d
--- /dev/null
+++ b/docs/en/connector/source/Http.md
@@ -0,0 +1,61 @@
+# Http
+
+## Description
+
+Get data from http or https interface
+
+:::tip
+
+Engine Supported and plugin name
+
+* [x] Spark: Http
+* [ ] Flink
+
+:::
+
+## Options
+
+| name | type | required | default vale |
+| -------------- | ------ | -------- | ------------ |
+| url | string | yes | - |
+| method | string | no | GET |
+| header | string | no | |
+| request_params | string | no | |
+| sync_path | string | no | |
+
+### url [string]
+
+HTTP request path, starting with http:// or https://.
+
+### method[string]
+
+HTTP request method, GET or POST, default GET.
+
+### header[string]
+
+HTTP request header, json format.
+
+### request_params[string]
+
+HTTP request parameters, json format.
+
+### sync_path[string]
+
+HTTP multiple requests, the storage path of parameters used for
synchronization (hdfs).
+
+### common options [string]
+
+Source plugin common parameters, please refer to [Source
Plugin](common-options.mdx) for details.
+
+## Example
+
+```bash
+ Http {
+ url = "http://date.jsontest.com/"
+ result_table_name= "response_body"
+ }
+```
+
+## Notes
+
+According to the processing result of the http call, to determine whether the
synchronization parameters need to be updated, it needs to be written to hdfs
through the hdfs sink plugin after the judgment is made outside the http source
plugin.
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/pom.xml
b/seatunnel-connectors/seatunnel-connectors-spark/pom.xml
index 3f86de60..67db31c9 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-spark/pom.xml
@@ -52,6 +52,7 @@
<module>seatunnel-connector-spark-neo4j</module>
<module>seatunnel-connector-spark-iceberg</module>
<module>seatunnel-connector-spark-feishu</module>
+ <module>seatunnel-connector-spark-http</module>
</modules>
</project>
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-http/pom.xml
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-http/pom.xml
new file mode 100644
index 00000000..df3ef7b6
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-http/pom.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connectors-spark</artifactId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>seatunnel-connector-spark-http</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api-spark</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ </dependency>
+
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-http/src/main/java/org/apache/seatunnel/spark/http/source/Http.java
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-http/src/main/java/org/apache/seatunnel/spark/http/source/Http.java
new file mode 100644
index 00000000..d82299fc
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-http/src/main/java/org/apache/seatunnel/spark/http/source/Http.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.http.source;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.spark.SparkEnvironment;
+import org.apache.seatunnel.spark.batch.SparkBatchSource;
+import org.apache.seatunnel.spark.http.source.constant.Settings;
+import org.apache.seatunnel.spark.http.source.util.HttpClientResult;
+import org.apache.seatunnel.spark.http.source.util.HttpClientUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Http extends SparkBatchSource {
+
+ private static final String GET = "GET";
+ private static final String POST = "POST";
+ private static final int INITIAL_CAPACITY = 16;
+
+ private static Logger LOG = LoggerFactory.getLogger(Http.class);
+
+ @Override
+ public CheckResult checkConfig() {
+ return CheckConfigUtil.checkAllExists(config, "url");
+ }
+
+ @Override
+ public void prepare(SparkEnvironment prepareEnv) {
+ }
+
+ @Override
+ public Dataset<Row> getData(SparkEnvironment env) {
+ SparkSession spark = env.getSparkSession();
+ String url = config.getString(Settings.SOURCE_HTTP_URL);
+ String method = TypesafeConfigUtils.getConfig(config,
Settings.SOURCE_HTTP_METHOD, GET);
+ String header = TypesafeConfigUtils.getConfig(config,
Settings.SOURCE_HTTP_HEADER, "");
+ String requestParams = TypesafeConfigUtils.getConfig(config,
Settings.SOURCE_HTTP_REQUEST_PARAMS, "");
+ String syncPath = TypesafeConfigUtils.getConfig(config,
Settings.SOURCE_HTTP_SYNC_PATH, "");
+
+ JavaSparkContext jsc =
JavaSparkContext.fromSparkContext(spark.sparkContext());
+
+ Map requestMap = jsonToMap(requestParams);
+ String syncValues = getSyncValues(jsc, syncPath);
+ LOG.info("sync values->{}", syncValues);
+ Map syncMap = jsonToMap(syncValues);
+ if (!syncMap.isEmpty()) {
+ requestMap.putAll(syncMap);
+ }
+
+ HttpClientResult response = new HttpClientResult();
+ try {
+ Map headerMap = jsonToMap(header);
+ if (POST.equals(method)) {
+ response = HttpClientUtils.doPost(url, headerMap, requestMap);
+ } else {
+ response = HttpClientUtils.doGet(url, headerMap, requestMap);
+ }
+ } catch (Exception e) {
+ LOG.error("http call error!", e);
+ }
+
+ LOG.info("http respond code->{}", response.getCode());
+
+ List<String> array = new ArrayList<>();
+ array.add(response.getContent());
+ JavaRDD<String> javaRDD = jsc.parallelize(array);
+ DataFrameReader reader = spark.read().format("json");
+ return reader.json(javaRDD);
+ }
+
+ private String getSyncValues(JavaSparkContext jsc, String syncPath) {
+ if (null == syncPath || syncPath.isEmpty()) {
+ return "";
+ }
+ Configuration hadoopConf = jsc.hadoopConfiguration();
+ List<String> values = new ArrayList<>();
+ try {
+ FileSystem fs = FileSystem.get(hadoopConf);
+ Path path = new Path(syncPath);
+ boolean exists = fs.exists(path);
+ if (exists) {
+ JavaRDD<String> checkPoint = jsc.textFile(syncPath);
+ values = checkPoint.collect();
+
+ }
+ } catch (IOException e) {
+ LOG.error("getSyncValues error, syncPath is {}", syncPath, e);
+ }
+ return values.isEmpty() ? "" : values.iterator().next();
+ }
+
+ private Map jsonToMap(String content) {
+ Map map = new HashMap<>(INITIAL_CAPACITY);
+ if (null == content || content.isEmpty()) {
+ return map;
+ }
+
+ try {
+ return new ObjectMapper().readValue(content, HashMap.class);
+ } catch (IOException e) {
+ //only records the log, does not handle it, and does not affect
the main process.
+ LOG.error("{} json to map error!", content, e);
+ }
+ return map;
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-http/src/main/java/org/apache/seatunnel/spark/http/source/constant/Settings.java
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-http/src/main/java/org/apache/seatunnel/spark/http/source/constant/Settings.java
new file mode 100644
index 00000000..1ad299d8
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-http/src/main/java/org/apache/seatunnel/spark/http/source/constant/Settings.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.http.source.constant;
+
+public class Settings {
+
+ public static final String SOURCE_HTTP_URL = "url";
+ public static final String SOURCE_HTTP_METHOD = "method";
+ public static final String SOURCE_HTTP_HEADER = "header";
+ public static final String SOURCE_HTTP_REQUEST_PARAMS = "request_params";
+
+ public static final String SOURCE_HTTP_SYNC_PATH = "sync_path";
+
+
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-http/src/main/java/org/apache/seatunnel/spark/http/source/util/HttpClientResult.java
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-http/src/main/java/org/apache/seatunnel/spark/http/source/util/HttpClientResult.java
new file mode 100644
index 00000000..a205cf23
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-http/src/main/java/org/apache/seatunnel/spark/http/source/util/HttpClientResult.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.http.source.util;
+
+import java.io.Serializable;
+
+public class HttpClientResult implements Serializable {
+
+ private static final long serialVersionUID = 2168152194164783950L;
+
+ /**
+ * response status code
+ */
+ private int code;
+
+ /**
+ * response body
+ */
+ private String content;
+
+ public HttpClientResult() {
+ }
+
+ public HttpClientResult(int code) {
+ this.code = code;
+ }
+
+ public HttpClientResult(String content) {
+ this.content = content;
+ }
+
+ public HttpClientResult(int code, String content) {
+ this.code = code;
+ this.content = content;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public void setCode(int code) {
+ this.code = code;
+ }
+
+ public String getContent() {
+ return content;
+ }
+
+ public void setContent(String content) {
+ this.content = content;
+ }
+
+ @Override
+ public String toString() {
+ return "HttpClientResult [code=" + code + ", content=" + content + "]";
+ }
+
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-http/src/main/java/org/apache/seatunnel/spark/http/source/util/HttpClientUtils.java
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-http/src/main/java/org/apache/seatunnel/spark/http/source/util/HttpClientUtils.java
new file mode 100644
index 00000000..ffd1947b
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-http/src/main/java/org/apache/seatunnel/spark/http/source/util/HttpClientUtils.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.http.source.util;
+
+import org.apache.http.HttpStatus;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+public class HttpClientUtils {
+
+ private static final String ENCODING = "UTF-8";
+ private static final int CONNECT_TIMEOUT = 6000 * 2;
+ private static final int SOCKET_TIMEOUT = 6000 * 10;
+ private static final int INITIAL_CAPACITY = 16;
+
+ /**
+ * Send a get request without request headers and request parameters
+ *
+ * @param url request address
+ * @return http response result
+ * @throws Exception information
+ */
+ public static HttpClientResult doGet(String url) throws Exception {
+ return doGet(url, null, null);
+ }
+
+ /**
+ * Send a get request with request parameters
+ *
+ * @param url request address
+ * @param params request parameter map
+ * @return http response result
+ * @throws Exception information
+ */
+ public static HttpClientResult doGet(String url, Map<String, String>
params) throws Exception {
+ return doGet(url, null, params);
+ }
+
+ /**
+ * Send a get request with request headers and request parameters
+ *
+ * @param url request address
+ * @param headers request header map
+ * @param params request parameter map
+ * @return http response result
+ * @throws Exception information
+ */
+ public static HttpClientResult doGet(String url, Map<String, String>
headers, Map<String, String> params) throws Exception {
+ // Create httpClient object
+ CloseableHttpClient httpClient = HttpClients.createDefault();
+
+ // Create access address
+ URIBuilder uriBuilder = new URIBuilder(url);
+ if (params != null) {
+ Set<Entry<String, String>> entrySet = params.entrySet();
+ for (Entry<String, String> entry : entrySet) {
+ uriBuilder.setParameter(entry.getKey(), entry.getValue());
+ }
+ }
+
+ HttpGet httpGet = new HttpGet(uriBuilder.build());
+ /**
+ * setConnectTimeout:Set the connection timeout, in milliseconds.
+ * setSocketTimeout:The timeout period (ie response time) for
requesting data acquisition, in milliseconds.
+ * If an interface is accessed, and the data cannot be returned within
a certain amount of time, the call is simply abandoned.
+ */
+ RequestConfig requestConfig =
RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+ httpGet.setConfig(requestConfig);
+
+ // set request header
+ packageHeader(headers, httpGet);
+
+ // Create httpResponse object
+ CloseableHttpResponse httpResponse = null;
+
+ try {
+ // Execute the request and get the response result
+ return getHttpClientResult(httpResponse, httpClient, httpGet);
+ } finally {
+ // release resources
+ release(httpResponse, httpClient);
+ }
+ }
+
+ /**
+ * Send a post request without request headers and request parameters
+ *
+ * @param url request address
+ * @return http response result
+ * @throws Exception information
+ */
+ public static HttpClientResult doPost(String url) throws Exception {
+ return doPost(url, null, null);
+ }
+
+ /**
+ * Send post request with request parameters
+ *
+ * @param url request address
+ * @param params request parameter map
+ * @return http response result
+ * @throws Exception information
+ */
+ public static HttpClientResult doPost(String url, Map<String, String>
params) throws Exception {
+ return doPost(url, null, params);
+ }
+
+ /**
+ * Send a post request with request headers and request parameters
+ *
+ * @param url request address
+ * @param headers request header map
+ * @param params request parameter map
+ * @return http response result
+ * @throws Exception information
+ */
+ public static HttpClientResult doPost(String url, Map<String, String>
headers, Map<String, String> params) throws Exception {
+ // Create httpClient object
+ CloseableHttpClient httpClient = HttpClients.createDefault();
+ HttpPost httpPost = new HttpPost(url);
+ /**
+ * setConnectTimeout:Set the connection timeout, in milliseconds.
+ * setSocketTimeout:The timeout period (ie response time) for
requesting data acquisition, in milliseconds.
+ * If an interface is accessed, and the data cannot be returned within
a certain amount of time, the call is simply abandoned.
+ */
+ RequestConfig requestConfig =
RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+ httpPost.setConfig(requestConfig);
+ // set request header
+ packageHeader(headers, httpPost);
+
+ // Encapsulate request parameters
+ packageParam(params, httpPost);
+
+ // Create httpResponse object
+ CloseableHttpResponse httpResponse = null;
+
+ try {
+ // Execute the request and get the response result
+ return getHttpClientResult(httpResponse, httpClient, httpPost);
+ } finally {
+ // release resources
+ release(httpResponse, httpClient);
+ }
+ }
+
+ /**
+ * Send a put request without request parameters
+ *
+ * @param url request address
+ * @return http response result
+ * @throws Exception information
+ */
+ public static HttpClientResult doPut(String url) throws Exception {
+ return doPut(url, null);
+ }
+
+ /**
+ *Send a put request with request parameters
+ *
+ * @param url request address
+ * @param params request parameter map
+ * @return http response result
+ * @throws Exception information
+ */
+ public static HttpClientResult doPut(String url, Map<String, String>
params) throws Exception {
+ CloseableHttpClient httpClient = HttpClients.createDefault();
+ HttpPut httpPut = new HttpPut(url);
+ RequestConfig requestConfig =
RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+ httpPut.setConfig(requestConfig);
+
+ packageParam(params, httpPut);
+
+ CloseableHttpResponse httpResponse = null;
+
+ try {
+ return getHttpClientResult(httpResponse, httpClient, httpPut);
+ } finally {
+ release(httpResponse, httpClient);
+ }
+ }
+
+ /**
+ * Send delete request without request parameters
+ *
+ * @param url request address
+ * @return http response result
+ * @throws Exception information
+ */
+ public static HttpClientResult doDelete(String url) throws Exception {
+ CloseableHttpClient httpClient = HttpClients.createDefault();
+ HttpDelete httpDelete = new HttpDelete(url);
+ RequestConfig requestConfig =
RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+ httpDelete.setConfig(requestConfig);
+
+ CloseableHttpResponse httpResponse = null;
+ try {
+ return getHttpClientResult(httpResponse, httpClient, httpDelete);
+ } finally {
+ release(httpResponse, httpClient);
+ }
+ }
+
+ /**
+ * Send delete request with request parameters
+ *
+ * @param url request address
+ * @param params request parameter map
+ * @return http response result
+ * @throws Exception information
+ */
+ public static HttpClientResult doDelete(String url, Map<String, String>
params) throws Exception {
+ if (params == null) {
+ params = new HashMap<String, String>(INITIAL_CAPACITY);
+ }
+
+ params.put("_method", "delete");
+ return doPost(url, params);
+ }
+
+ /**
+ *encapsulate request header
+ *
+ * @param params request header map
+ * @param httpMethod http request method
+ */
+ public static void packageHeader(Map<String, String> params,
HttpRequestBase httpMethod) {
+ // encapsulate request header
+ if (params != null) {
+ Set<Entry<String, String>> entrySet = params.entrySet();
+ for (Entry<String, String> entry : entrySet) {
+ // Set to the request header to the HttpRequestBase object
+ httpMethod.setHeader(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ /**
+ * Encapsulate request parameters
+ *
+ * @param params request parameter map
+ * @param httpMethod http request method
+ * @throws UnsupportedEncodingException exception information
+ */
+ public static void packageParam(Map<String, String> params,
HttpEntityEnclosingRequestBase httpMethod)
+ throws UnsupportedEncodingException {
+ // Encapsulate request parameters
+ if (params != null) {
+ List<NameValuePair> nvps = new ArrayList<NameValuePair>();
+ Set<Entry<String, String>> entrySet = params.entrySet();
+ for (Entry<String, String> entry : entrySet) {
+ nvps.add(new BasicNameValuePair(entry.getKey(),
entry.getValue()));
+ }
+
+ // Set to the request's http object
+ httpMethod.setEntity(new UrlEncodedFormEntity(nvps, ENCODING));
+ }
+ }
+
+ /**
+ * get response result
+ *
+ * @param httpResponse http response object
+ * @param httpClient http client object
+ * @param httpMethod http method onject
+ * @return http response result
+ * @throws Exception information
+ */
+ public static HttpClientResult getHttpClientResult(CloseableHttpResponse
httpResponse,
+ CloseableHttpClient httpClient, HttpRequestBase httpMethod) throws
Exception {
+ // execute request
+ httpResponse = httpClient.execute(httpMethod);
+
+ // get return result
+ if (httpResponse != null && httpResponse.getStatusLine() != null) {
+ String content = "";
+ if (httpResponse.getEntity() != null) {
+ content = EntityUtils.toString(httpResponse.getEntity(),
ENCODING);
+ }
+ return new
HttpClientResult(httpResponse.getStatusLine().getStatusCode(), content);
+ }
+ return new HttpClientResult(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+ }
+
+ /**
+ * release resources
+ *
+ * @param httpResponse http response object
+ * @param httpClient http client objet
+ * @throws IOException information
+ */
+ public static void release(CloseableHttpResponse httpResponse,
CloseableHttpClient httpClient) throws IOException {
+ // release resources
+ if (httpResponse != null) {
+ httpResponse.close();
+ }
+ if (httpClient != null) {
+ httpClient.close();
+ }
+ }
+
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-http/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSource
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-http/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSource
new file mode 100644
index 00000000..c25d032f
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-http/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSource
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.seatunnel.spark.http.source.Http
\ No newline at end of file