This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 1113cb9c3 [api-draft][connector] Add new http source (#2012)
1113cb9c3 is described below
commit 1113cb9c3aac6a40a3e491ca04f34fd07ca65ee1
Author: Kerwin <[email protected]>
AuthorDate: Wed Jun 15 14:36:51 2022 +0800
[api-draft][connector] Add new http source (#2012)
* add http source
* update http source module
* add license header
* fix http source parameter
* fix code style
* merge api-draft
* fix ci error
---
seatunnel-connectors/plugin-mapping.properties | 1 +
.../seatunnel-connectors-seatunnel-dist/pom.xml | 5 +
.../seatunnel-connectors-seatunnel/pom.xml | 1 +
.../pom.xml | 36 ++-
.../seatunnel/http/client/HttpClientProvider.java | 283 +++++++++++++++++++++
.../seatunnel/http/client/HttpResponse.java | 74 ++++++
.../connectors/seatunnel/http/config/Config.java | 30 +++
.../seatunnel/http/source/HttpSource.java | 123 +++++++++
.../seatunnel/http/source/HttpSourceParameter.java | 70 +++++
.../seatunnel/http/source/HttpSourceReader.java | 99 +++++++
.../seatunnel/http/source/HttpSourceSplit.java | 34 +++
.../http/source/HttpSourceSplitEnumerator.java | 83 ++++++
.../connectors/seatunnel/http/state/HttpState.java | 24 ++
13 files changed, 849 insertions(+), 14 deletions(-)
diff --git a/seatunnel-connectors/plugin-mapping.properties
b/seatunnel-connectors/plugin-mapping.properties
index e92ddbaa3..992c399a0 100644
--- a/seatunnel-connectors/plugin-mapping.properties
+++ b/seatunnel-connectors/plugin-mapping.properties
@@ -89,5 +89,6 @@ seatunnel.source.FakeSource =
seatunnel-connector-seatunnel-fake
seatunnel.sink.Console = seatunnel-connector-seatunnel-console
seatunnel.source.Kafka = seatunnel-connector-seatunnel-kafka
seatunnel.sink.Kafka = seatunnel-connector-seatunnel-kafka
+seatunnel.source.Http = seatunnel-connector-seatunnel-http
seatunnel.source.Socket = seatunnel-connector-seatunnel-socket
seatunnel.sink.Hive = seatunnel-connector-seatunnel-hive
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
b/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
index ba8c8be33..46444870f 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
@@ -45,6 +45,11 @@
<artifactId>seatunnel-connector-seatunnel-kafka</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connector-seatunnel-http</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-connector-seatunnel-hive</artifactId>
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
index 89ebf68d0..e7ae04ab2 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
@@ -35,6 +35,7 @@
<module>seatunnel-connector-seatunnel-console</module>
<module>seatunnel-connector-seatunnel-fake</module>
<module>seatunnel-connector-seatunnel-kafka</module>
+ <module>seatunnel-connector-seatunnel-http</module>
<module>seatunnel-connector-seatunnel-jdbc</module>
<module>seatunnel-connector-seatunnel-socket</module>
<module>seatunnel-connector-seatunnel-clickhouse</module>
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/pom.xml
similarity index 64%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
copy to
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/pom.xml
index 89ebf68d0..1e78fb8a8 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/pom.xml
@@ -21,22 +21,30 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <artifactId>seatunnel-connectors</artifactId>
+ <artifactId>seatunnel-connectors-seatunnel</artifactId>
<groupId>org.apache.seatunnel</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <packaging>pom</packaging>
-
- <artifactId>seatunnel-connectors-seatunnel</artifactId>
-
- <modules>
- <module>seatunnel-connector-seatunnel-hive</module>
- <module>seatunnel-connector-seatunnel-console</module>
- <module>seatunnel-connector-seatunnel-fake</module>
- <module>seatunnel-connector-seatunnel-kafka</module>
- <module>seatunnel-connector-seatunnel-jdbc</module>
- <module>seatunnel-connector-seatunnel-socket</module>
- <module>seatunnel-connector-seatunnel-clickhouse</module>
- </modules>
+
+ <artifactId>seatunnel-connector-seatunnel-http</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ </dependency>
+ </dependencies>
+
</project>
\ No newline at end of file
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java
new file mode 100644
index 000000000..e448bd92e
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.client;
+
+import org.apache.http.HttpStatus;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class HttpClientProvider implements AutoCloseable {
+ private final CloseableHttpClient httpClient;
+ private static final String ENCODING = "UTF-8";
+ private static final int CONNECT_TIMEOUT = 6000 * 2;
+ private static final int SOCKET_TIMEOUT = 6000 * 10;
+ private static final int INITIAL_CAPACITY = 16;
+
+ private HttpClientProvider() {
+ httpClient = HttpClients.createDefault();
+ }
+
+ public static HttpClientProvider getInstance() {
+ return Sigleton.INSTANCE;
+ }
+
+ public HttpResponse execute(String url, String method, Map<String, String>
headers, Map<String, String> params) throws Exception {
+ if ("POST".equals(method)) {
+ return doPost(url, headers, params);
+ }
+ return doGet(url, headers, params);
+ }
+
+ /**
+ * Send a get request without request headers and request parameters
+ *
+ * @param url request address
+ * @return http response result
+ * @throws Exception information
+ */
+ public HttpResponse doGet(String url) throws Exception {
+ return doGet(url, null, null);
+ }
+
+ /**
+ * Send a get request with request parameters
+ *
+ * @param url request address
+ * @param params request parameter map
+ * @return http response result
+ * @throws Exception information
+ */
+ public HttpResponse doGet(String url, Map<String, String> params) throws
Exception {
+ return doGet(url, null, params);
+ }
+
+ /**
+ * Send a get request with request headers and request parameters
+ *
+ * @param url request address
+ * @param headers request header map
+ * @param params request parameter map
+ * @return http response result
+ * @throws Exception information
+ */
+ public HttpResponse doGet(String url, Map<String, String> headers,
Map<String, String> params) throws Exception {
+ // Create access address
+ URIBuilder uriBuilder = new URIBuilder(url);
+ addParameters(uriBuilder, params);
+
+ /**
+ * setConnectTimeout:Set the connection timeout, in milliseconds.
+ * setSocketTimeout:The timeout period (ie response time) for
requesting data acquisition, in milliseconds.
+ * If an interface is accessed, and the data cannot be returned within
a certain amount of time, the call is simply abandoned.
+ */
+ RequestConfig requestConfig =
RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+ HttpGet httpGet = new HttpGet(uriBuilder.build());
+ httpGet.setConfig(requestConfig);
+
+ addHeaders(httpGet, headers);
+ return getResponse(httpGet);
+ }
+
+ /**
+ * Send a post request without request headers and request parameters
+ *
+ * @param url request address
+ * @return http response result
+ * @throws Exception information
+ */
+ public HttpResponse doPost(String url) throws Exception {
+ return doPost(url, null, null);
+ }
+
+ /**
+ * Send post request with request parameters
+ *
+ * @param url request address
+ * @param params request parameter map
+ * @return http response result
+ * @throws Exception information
+ */
+ public HttpResponse doPost(String url, Map<String, String> params) throws
Exception {
+ return doPost(url, null, params);
+ }
+
+ /**
+ * Send a post request with request headers and request parameters
+ *
+ * @param url request address
+ * @param headers request header map
+ * @param params request parameter map
+ * @return http response result
+ * @throws Exception information
+ */
+ public HttpResponse doPost(String url, Map<String, String> headers,
Map<String, String> params) throws Exception {
+ HttpPost httpPost = new HttpPost(url);
+ /**
+ * setConnectTimeout:Set the connection timeout, in milliseconds.
+ * setSocketTimeout:The timeout period (ie response time) for
requesting data acquisition, in milliseconds.
+ * If an interface is accessed, and the data cannot be returned within
a certain amount of time, the call is simply abandoned.
+ */
+ RequestConfig requestConfig =
RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+ httpPost.setConfig(requestConfig);
+ // set request header
+ addHeaders(httpPost, headers);
+
+ // Encapsulate request parameters
+ addParameters(httpPost, params);
+ return getResponse(httpPost);
+ }
+
+ /**
+ * Send a put request without request parameters
+ *
+ * @param url request address
+ * @return http response result
+ * @throws Exception information
+ */
+ public HttpResponse doPut(String url) throws Exception {
+ return doPut(url, null);
+ }
+
+ /**
+ * Send a put request with request parameters
+ *
+ * @param url request address
+ * @param params request parameter map
+ * @return http response result
+ * @throws Exception information
+ */
+ public HttpResponse doPut(String url, Map<String, String> params) throws
Exception {
+
+ HttpPut httpPut = new HttpPut(url);
+ RequestConfig requestConfig =
RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+ httpPut.setConfig(requestConfig);
+
+ addParameters(httpPut, params);
+ return getResponse(httpPut);
+ }
+
+ /**
+ * Send delete request without request parameters
+ *
+ * @param url request address
+ * @return http response result
+ * @throws Exception information
+ */
+ public HttpResponse doDelete(String url) throws Exception {
+
+ HttpDelete httpDelete = new HttpDelete(url);
+ RequestConfig requestConfig =
RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+ httpDelete.setConfig(requestConfig);
+ return getResponse(httpDelete);
+ }
+
+ /**
+ * Send delete request with request parameters
+ *
+ * @param url request address
+ * @param params request parameter map
+ * @return http response result
+ * @throws Exception information
+ */
+ public HttpResponse doDelete(String url, Map<String, String> params)
throws Exception {
+ if (params == null) {
+ params = new HashMap<>(INITIAL_CAPACITY);
+ }
+
+ params.put("_method", "delete");
+ return doPost(url, params);
+ }
+
+ private HttpResponse getResponse(HttpRequestBase request) throws
IOException {
+ // execute request
+ try (CloseableHttpResponse httpResponse = httpClient.execute(request))
{
+ // get return result
+ if (httpResponse != null && httpResponse.getStatusLine() != null) {
+ String content = "";
+ if (httpResponse.getEntity() != null) {
+ content = EntityUtils.toString(httpResponse.getEntity(),
ENCODING);
+ }
+ return new
HttpResponse(httpResponse.getStatusLine().getStatusCode(), content);
+ }
+ }
+ return new HttpResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+ }
+
+ private void addParameters(URIBuilder builder, Map<String, String> params)
{
+ if (Objects.isNull(params) || params.isEmpty()) {
+ return;
+ }
+ params.forEach((k, v) -> builder.setParameter(k, v));
+ }
+
+ private void addParameters(HttpEntityEnclosingRequestBase request,
Map<String, String> params) throws UnsupportedEncodingException {
+ if (Objects.isNull(params) || params.isEmpty()) {
+ return;
+ }
+ List<NameValuePair> parameters = new ArrayList<>();
+ Set<Map.Entry<String, String>> entrySet = params.entrySet();
+ for (Map.Entry<String, String> e : entrySet) {
+ String name = e.getKey();
+ String value = e.getValue();
+ NameValuePair pair = new BasicNameValuePair(name, value);
+ parameters.add(pair);
+ }
+ // Set to the request's http object
+ request.setEntity(new UrlEncodedFormEntity(parameters, ENCODING));
+ }
+
+ private void addHeaders(HttpRequestBase request, Map<String, String>
headers) {
+ if (Objects.isNull(headers) || headers.isEmpty()) {
+ return;
+ }
+ headers.forEach((k, v) -> request.addHeader(k, v));
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (Objects.nonNull(httpClient)) {
+ httpClient.close();
+ }
+ }
+
+ private static class Sigleton {
+ private static final HttpClientProvider INSTANCE = new
HttpClientProvider();
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpResponse.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpResponse.java
new file mode 100644
index 000000000..5c46ee4c2
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpResponse.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.client;
+
+import java.io.Serializable;
+
+public class HttpResponse implements Serializable {
+
+ private static final long serialVersionUID = 2168152194164783950L;
+
+ public static final int STATUS_OK = 200;
+ /**
+ * response status code
+ */
+ private int code;
+
+ /**
+ * response body
+ */
+ private String content;
+
+ public HttpResponse() {
+ }
+
+ public HttpResponse(int code) {
+ this.code = code;
+ }
+
+ public HttpResponse(String content) {
+ this.content = content;
+ }
+
+ public HttpResponse(int code, String content) {
+ this.code = code;
+ this.content = content;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public void setCode(int code) {
+ this.code = code;
+ }
+
+ public String getContent() {
+ return content;
+ }
+
+ public void setContent(String content) {
+ this.content = content;
+ }
+
+ @Override
+ public String toString() {
+ return "HttpClientResult [code=" + code + ", content=" + content + "]";
+ }
+
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/Config.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/Config.java
new file mode 100644
index 000000000..f62b81abc
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/Config.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.config;
+
+public class Config {
+ public static final String URL = "url";
+
+ public static final String METHOD = "method";
+ public static final String METHOD_DEFAULT_VALUE = "GET";
+
+ public static final String HEADERS = "headers";
+ public static final String PARAMS = "params";
+ public static final String BODY = "body";
+
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
new file mode 100644
index 000000000..91f983256
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.source;
+
+import static
org.apache.seatunnel.connectors.seatunnel.http.config.Config.BODY;
+import static
org.apache.seatunnel.connectors.seatunnel.http.config.Config.HEADERS;
+import static
org.apache.seatunnel.connectors.seatunnel.http.config.Config.METHOD;
+import static
org.apache.seatunnel.connectors.seatunnel.http.config.Config.METHOD_DEFAULT_VALUE;
+import static
org.apache.seatunnel.connectors.seatunnel.http.config.Config.PARAMS;
+import static org.apache.seatunnel.connectors.seatunnel.http.config.Config.URL;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.http.state.HttpState;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@AutoService(SeaTunnelSource.class)
+public class HttpSource implements SeaTunnelSource<SeaTunnelRow,
HttpSourceSplit, HttpState> {
+ private final HttpSourceParameter parameter = new HttpSourceParameter();
+ private SeaTunnelRowType rowType;
+ private SeaTunnelContext seaTunnelContext;
+ @Override
+ public String getPluginName() {
+ return "Http";
+ }
+
+ @Override
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, URL);
+ if (!result.isSuccess()) {
+ throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
result.getMsg());
+ }
+ this.parameter.setUrl(pluginConfig.getString(URL));
+
+ if (pluginConfig.hasPath(METHOD)) {
+ this.parameter.setMethod(pluginConfig.getString(METHOD));
+ } else {
+ this.parameter.setMethod(METHOD_DEFAULT_VALUE);
+ }
+
+ if (pluginConfig.hasPath(HEADERS)) {
+
this.parameter.setHeaders(pluginConfig.getConfig(HEADERS).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
entry -> String.valueOf(entry.getValue().unwrapped()), (v1, v2) -> v2)));
+ }
+
+ if (pluginConfig.hasPath(PARAMS)) {
+
this.parameter.setHeaders(pluginConfig.getConfig(PARAMS).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
entry -> String.valueOf(entry.getValue().unwrapped()), (v1, v2) -> v2)));
+ }
+
+ if (pluginConfig.hasPath(BODY)) {
+ this.parameter.setBody(pluginConfig.getString(BODY));
+ }
+ // TODO support user custom row type
+ this.rowType = new SeaTunnelRowType(new String[]{"content"}, new
SeaTunnelDataType<?>[]{BasicType.STRING_TYPE});
+ }
+
+ @Override
+ public SeaTunnelContext getSeaTunnelContext() {
+ return this.seaTunnelContext;
+ }
+
+ @Override
+ public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+ this.seaTunnelContext = seaTunnelContext;
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+ return this.rowType;
+ }
+
+ @Override
+ public SourceReader<SeaTunnelRow, HttpSourceSplit>
createReader(SourceReader.Context readerContext) throws Exception {
+ return new HttpSourceReader(this.parameter, readerContext);
+ }
+
+ @Override
+ public SourceSplitEnumerator<HttpSourceSplit, HttpState>
createEnumerator(SourceSplitEnumerator.Context<HttpSourceSplit>
enumeratorContext) throws Exception {
+ return new HttpSourceSplitEnumerator(enumeratorContext);
+ }
+
+ @Override
+ public SourceSplitEnumerator<HttpSourceSplit, HttpState>
restoreEnumerator(SourceSplitEnumerator.Context<HttpSourceSplit>
enumeratorContext, HttpState checkpointState) throws Exception {
+ return new HttpSourceSplitEnumerator(enumeratorContext,
checkpointState);
+ }
+
+ @Override
+ public Serializer<HttpState> getEnumeratorStateSerializer() {
+ return new DefaultSerializer<>();
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceParameter.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceParameter.java
new file mode 100644
index 000000000..5ec14f872
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceParameter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.source;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public class HttpSourceParameter implements Serializable {
+
+ private String url;
+ private String method;
+ private Map<String, String> headers;
+ private Map<String, String> params;
+ private String body;
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public String getMethod() {
+ return method;
+ }
+
+ public void setMethod(String method) {
+ this.method = method;
+ }
+
+ public Map<String, String> getHeaders() {
+ return headers;
+ }
+
+ public void setHeaders(Map<String, String> headers) {
+ this.headers = headers;
+ }
+
+ public Map<String, String> getParams() {
+ return params;
+ }
+
+ public void setParams(Map<String, String> params) {
+ this.params = params;
+ }
+
+ public String getBody() {
+ return body;
+ }
+
+ public void setBody(String body) {
+ this.body = body;
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
new file mode 100644
index 000000000..cc31b2761
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.source;
+
+import static
org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse.STATUS_OK;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.seatunnel.http.client.HttpClientProvider;
+import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+
+public class HttpSourceReader implements SourceReader<SeaTunnelRow,
HttpSourceSplit> {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(HttpSourceReader.class);
+ private final SourceReader.Context context;
+ private final HttpSourceParameter parameter;
+ private HttpClientProvider httpClient;
+
+ public HttpSourceReader(HttpSourceParameter parameter,
SourceReader.Context context) {
+ this.context = context;
+ this.parameter = parameter;
+ }
+
+ @Override
+ public void open() {
+ httpClient = HttpClientProvider.getInstance();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (Objects.nonNull(httpClient)) {
+ httpClient.close();
+ }
+ }
+
+ @Override
+ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+ try {
+ HttpResponse response =
httpClient.execute(this.parameter.getUrl(), this.parameter.getMethod(),
this.parameter.getHeaders(), this.parameter.getParams());
+ if (STATUS_OK == response.getCode()) {
+ output.collect(new SeaTunnelRow(new Object[]
{response.getContent()}));
+ return;
+ }
+ LOGGER.error("http client execute exception, http response status
code:[{}], content:[{}]", response.getCode(), response.getContent());
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ } finally {
+ if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
+ // signal to the source that we have reached the end of the
data.
+ LOGGER.info("Closed the bounded fake source");
+ context.signalNoMoreElement();
+ }
+ }
+ }
+
+ @Override
+ public List<HttpSourceSplit> snapshotState(long checkpointId) throws
Exception {
+ return null;
+ }
+
+ @Override
+ public void addSplits(List<HttpSourceSplit> splits) {
+
+ }
+
+ @Override
+ public void handleNoMoreSplits() {
+
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceSplit.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceSplit.java
new file mode 100644
index 000000000..3496c546c
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceSplit.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+public class HttpSourceSplit implements SourceSplit {
+
+ private final String splitId;
+
+ public HttpSourceSplit(String splitId) {
+ this.splitId = splitId;
+ }
+
+ @Override
+ public String splitId() {
+ return this.splitId;
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceSplitEnumerator.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceSplitEnumerator.java
new file mode 100644
index 000000000..f67d3335a
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceSplitEnumerator.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.source;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.connectors.seatunnel.http.state.HttpState;
+
+import java.io.IOException;
+import java.util.List;
+
+public class HttpSourceSplitEnumerator implements
SourceSplitEnumerator<HttpSourceSplit, HttpState> {
+ private SourceSplitEnumerator.Context<HttpSourceSplit> enumeratorContext;
+ private HttpState httpState;
+
+ public
HttpSourceSplitEnumerator(SourceSplitEnumerator.Context<HttpSourceSplit>
enumeratorContext) {
+ this.enumeratorContext = enumeratorContext;
+ }
+
+ public
HttpSourceSplitEnumerator(SourceSplitEnumerator.Context<HttpSourceSplit>
enumeratorContext, HttpState httpState) {
+ this.enumeratorContext = enumeratorContext;
+ this.httpState = httpState;
+ }
+
+ @Override
+ public void open() {
+
+ }
+
+ @Override
+ public void run() throws Exception {
+
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public void addSplitsBack(List<HttpSourceSplit> splits, int subtaskId) {
+
+ }
+
+ @Override
+ public int currentUnassignedSplitSize() {
+ return 0;
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId) {
+
+ }
+
+ @Override
+ public void registerReader(int subtaskId) {
+
+ }
+
+ @Override
+ public HttpState snapshotState(long checkpointId) throws Exception {
+ return null;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/state/HttpState.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/state/HttpState.java
new file mode 100644
index 000000000..5332f67c1
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/state/HttpState.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.state;
+
+import java.io.Serializable;
+
+public class HttpState implements Serializable {
+
+}