This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 1113cb9c3 [api-draft][connector] Add new http source (#2012)
1113cb9c3 is described below

commit 1113cb9c3aac6a40a3e491ca04f34fd07ca65ee1
Author: Kerwin <[email protected]>
AuthorDate: Wed Jun 15 14:36:51 2022 +0800

    [api-draft][connector] Add new http source (#2012)
    
    * add http source
    
    * update http source module
    
    * add license header
    
    * fix http source parameter
    
    * fix code style
    
    * merge api-draft
    
    * fix ci error
---
 seatunnel-connectors/plugin-mapping.properties     |   1 +
 .../seatunnel-connectors-seatunnel-dist/pom.xml    |   5 +
 .../seatunnel-connectors-seatunnel/pom.xml         |   1 +
 .../pom.xml                                        |  36 ++-
 .../seatunnel/http/client/HttpClientProvider.java  | 283 +++++++++++++++++++++
 .../seatunnel/http/client/HttpResponse.java        |  74 ++++++
 .../connectors/seatunnel/http/config/Config.java   |  30 +++
 .../seatunnel/http/source/HttpSource.java          | 123 +++++++++
 .../seatunnel/http/source/HttpSourceParameter.java |  70 +++++
 .../seatunnel/http/source/HttpSourceReader.java    |  99 +++++++
 .../seatunnel/http/source/HttpSourceSplit.java     |  34 +++
 .../http/source/HttpSourceSplitEnumerator.java     |  83 ++++++
 .../connectors/seatunnel/http/state/HttpState.java |  24 ++
 13 files changed, 849 insertions(+), 14 deletions(-)

diff --git a/seatunnel-connectors/plugin-mapping.properties 
b/seatunnel-connectors/plugin-mapping.properties
index e92ddbaa3..992c399a0 100644
--- a/seatunnel-connectors/plugin-mapping.properties
+++ b/seatunnel-connectors/plugin-mapping.properties
@@ -89,5 +89,6 @@ seatunnel.source.FakeSource = 
seatunnel-connector-seatunnel-fake
 seatunnel.sink.Console = seatunnel-connector-seatunnel-console
 seatunnel.source.Kafka = seatunnel-connector-seatunnel-kafka
 seatunnel.sink.Kafka = seatunnel-connector-seatunnel-kafka
+seatunnel.source.Http = seatunnel-connector-seatunnel-http
 seatunnel.source.Socket = seatunnel-connector-seatunnel-socket
 seatunnel.sink.Hive = seatunnel-connector-seatunnel-hive
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml 
b/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
index ba8c8be33..46444870f 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
@@ -45,6 +45,11 @@
             <artifactId>seatunnel-connector-seatunnel-kafka</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-connector-seatunnel-http</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-connector-seatunnel-hive</artifactId>
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
index 89ebf68d0..e7ae04ab2 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
@@ -35,6 +35,7 @@
         <module>seatunnel-connector-seatunnel-console</module>
         <module>seatunnel-connector-seatunnel-fake</module>
         <module>seatunnel-connector-seatunnel-kafka</module>
+        <module>seatunnel-connector-seatunnel-http</module>
         <module>seatunnel-connector-seatunnel-jdbc</module>
         <module>seatunnel-connector-seatunnel-socket</module>
         <module>seatunnel-connector-seatunnel-clickhouse</module>
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/pom.xml
similarity index 64%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
copy to 
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/pom.xml
index 89ebf68d0..1e78fb8a8 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/pom.xml
@@ -21,22 +21,30 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <parent>
-        <artifactId>seatunnel-connectors</artifactId>
+        <artifactId>seatunnel-connectors-seatunnel</artifactId>
         <groupId>org.apache.seatunnel</groupId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-    <packaging>pom</packaging>
-
-    <artifactId>seatunnel-connectors-seatunnel</artifactId>
-
-    <modules>
-        <module>seatunnel-connector-seatunnel-hive</module>
-        <module>seatunnel-connector-seatunnel-console</module>
-        <module>seatunnel-connector-seatunnel-fake</module>
-        <module>seatunnel-connector-seatunnel-kafka</module>
-        <module>seatunnel-connector-seatunnel-jdbc</module>
-        <module>seatunnel-connector-seatunnel-socket</module>
-        <module>seatunnel-connector-seatunnel-clickhouse</module>
-    </modules>
+
+    <artifactId>seatunnel-connector-seatunnel-http</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpcore</artifactId>
+        </dependency>
+    </dependencies>
+
 </project>
\ No newline at end of file
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java
new file mode 100644
index 000000000..e448bd92e
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.client;
+
+import org.apache.http.HttpStatus;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class HttpClientProvider implements AutoCloseable {
+    private final CloseableHttpClient httpClient;
+    private static final String ENCODING = "UTF-8";
+    private static final int CONNECT_TIMEOUT = 6000 * 2;
+    private static final int SOCKET_TIMEOUT = 6000 * 10;
+    private static final int INITIAL_CAPACITY = 16;
+
+    private HttpClientProvider() {
+        httpClient = HttpClients.createDefault();
+    }
+
+    public static HttpClientProvider getInstance() {
+        return Sigleton.INSTANCE;
+    }
+
+    public HttpResponse execute(String url, String method, Map<String, String> 
headers, Map<String, String> params) throws Exception {
+        if ("POST".equals(method)) {
+            return doPost(url, headers, params);
+        }
+        return doGet(url, headers, params);
+    }
+
+    /**
+     * Send a get request without request headers and request parameters
+     *
+     * @param url request address
+     * @return http response result
+     * @throws Exception information
+     */
+    public HttpResponse doGet(String url) throws Exception {
+        return doGet(url, null, null);
+    }
+
+    /**
+     * Send a get request with request parameters
+     *
+     * @param url    request address
+     * @param params request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public HttpResponse doGet(String url, Map<String, String> params) throws 
Exception {
+        return doGet(url, null, params);
+    }
+
+    /**
+     * Send a get request with request headers and request parameters
+     *
+     * @param url     request address
+     * @param headers request header map
+     * @param params  request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public HttpResponse doGet(String url, Map<String, String> headers, 
Map<String, String> params) throws Exception {
+        // Create access address
+        URIBuilder uriBuilder = new URIBuilder(url);
+        addParameters(uriBuilder, params);
+
+        /**
+         * setConnectTimeout:Set the connection timeout, in milliseconds.
+         * setSocketTimeout:The timeout period (ie response time) for 
requesting data acquisition, in milliseconds.
+         * If an interface is accessed, and the data cannot be returned within 
a certain amount of time, the call is simply abandoned.
+         */
+        RequestConfig requestConfig = 
RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        HttpGet httpGet = new HttpGet(uriBuilder.build());
+        httpGet.setConfig(requestConfig);
+
+        addHeaders(httpGet, headers);
+        return getResponse(httpGet);
+    }
+
+    /**
+     * Send a post request without request headers and request parameters
+     *
+     * @param url request address
+     * @return http response result
+     * @throws Exception information
+     */
+    public HttpResponse doPost(String url) throws Exception {
+        return doPost(url, null, null);
+    }
+
+    /**
+     * Send post request with request parameters
+     *
+     * @param url    request address
+     * @param params request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public HttpResponse doPost(String url, Map<String, String> params) throws 
Exception {
+        return doPost(url, null, params);
+    }
+
+    /**
+     * Send a post request with request headers and request parameters
+     *
+     * @param url     request address
+     * @param headers request header map
+     * @param params  request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public HttpResponse doPost(String url, Map<String, String> headers, 
Map<String, String> params) throws Exception {
+        HttpPost httpPost = new HttpPost(url);
+        /**
+         * setConnectTimeout:Set the connection timeout, in milliseconds.
+         * setSocketTimeout:The timeout period (ie response time) for 
requesting data acquisition, in milliseconds.
+         * If an interface is accessed, and the data cannot be returned within 
a certain amount of time, the call is simply abandoned.
+         */
+        RequestConfig requestConfig = 
RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        httpPost.setConfig(requestConfig);
+        // set request header
+        addHeaders(httpPost, headers);
+
+        // Encapsulate request parameters
+        addParameters(httpPost, params);
+        return getResponse(httpPost);
+    }
+
+    /**
+     * Send a put request without request parameters
+     *
+     * @param url request address
+     * @return http response result
+     * @throws Exception information
+     */
+    public HttpResponse doPut(String url) throws Exception {
+        return doPut(url, null);
+    }
+
+    /**
+     * Send a put request with request parameters
+     *
+     * @param url    request address
+     * @param params request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public HttpResponse doPut(String url, Map<String, String> params) throws 
Exception {
+
+        HttpPut httpPut = new HttpPut(url);
+        RequestConfig requestConfig = 
RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        httpPut.setConfig(requestConfig);
+
+        addParameters(httpPut, params);
+        return getResponse(httpPut);
+    }
+
+    /**
+     * Send delete request without request parameters
+     *
+     * @param url request address
+     * @return http response result
+     * @throws Exception information
+     */
+    public HttpResponse doDelete(String url) throws Exception {
+
+        HttpDelete httpDelete = new HttpDelete(url);
+        RequestConfig requestConfig = 
RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        httpDelete.setConfig(requestConfig);
+        return getResponse(httpDelete);
+    }
+
+    /**
+     * Send delete request with request parameters
+     *
+     * @param url    request address
+     * @param params request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public HttpResponse doDelete(String url, Map<String, String> params) 
throws Exception {
+        if (params == null) {
+            params = new HashMap<>(INITIAL_CAPACITY);
+        }
+
+        params.put("_method", "delete");
+        return doPost(url, params);
+    }
+
+    private HttpResponse getResponse(HttpRequestBase request) throws 
IOException {
+        // execute request
+        try (CloseableHttpResponse httpResponse = httpClient.execute(request)) 
{
+            // get return result
+            if (httpResponse != null && httpResponse.getStatusLine() != null) {
+                String content = "";
+                if (httpResponse.getEntity() != null) {
+                    content = EntityUtils.toString(httpResponse.getEntity(), 
ENCODING);
+                }
+                return new 
HttpResponse(httpResponse.getStatusLine().getStatusCode(), content);
+            }
+        }
+        return new HttpResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+    }
+
+    private void addParameters(URIBuilder builder, Map<String, String> params) 
{
+        if (Objects.isNull(params) || params.isEmpty()) {
+            return;
+        }
+        params.forEach((k, v) -> builder.setParameter(k, v));
+    }
+
+    private void addParameters(HttpEntityEnclosingRequestBase request, 
Map<String, String> params) throws UnsupportedEncodingException {
+        if (Objects.isNull(params) || params.isEmpty()) {
+            return;
+        }
+        List<NameValuePair> parameters = new ArrayList<>();
+        Set<Map.Entry<String, String>> entrySet = params.entrySet();
+        for (Map.Entry<String, String> e : entrySet) {
+            String name = e.getKey();
+            String value = e.getValue();
+            NameValuePair pair = new BasicNameValuePair(name, value);
+            parameters.add(pair);
+        }
+        // Set to the request's http object
+        request.setEntity(new UrlEncodedFormEntity(parameters, ENCODING));
+    }
+
+    private void addHeaders(HttpRequestBase request, Map<String, String> 
headers) {
+        if (Objects.isNull(headers) || headers.isEmpty()) {
+            return;
+        }
+        headers.forEach((k, v) -> request.addHeader(k, v));
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (Objects.nonNull(httpClient)) {
+            httpClient.close();
+        }
+    }
+
+    private static class Sigleton {
+        private static final HttpClientProvider INSTANCE = new 
HttpClientProvider();
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpResponse.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpResponse.java
new file mode 100644
index 000000000..5c46ee4c2
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpResponse.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.client;
+
+import java.io.Serializable;
+
+public class HttpResponse implements Serializable {
+
+    private static final long serialVersionUID = 2168152194164783950L;
+
+    public static final int STATUS_OK = 200;
+    /**
+     * response status code
+     */
+    private int code;
+
+    /**
+     * response body
+     */
+    private String content;
+
+    public HttpResponse() {
+    }
+
+    public HttpResponse(int code) {
+        this.code = code;
+    }
+
+    public HttpResponse(String content) {
+        this.content = content;
+    }
+
+    public HttpResponse(int code, String content) {
+        this.code = code;
+        this.content = content;
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+    public void setCode(int code) {
+        this.code = code;
+    }
+
+    public String getContent() {
+        return content;
+    }
+
+    public void setContent(String content) {
+        this.content = content;
+    }
+
+    @Override
+    public String toString() {
+        return "HttpClientResult [code=" + code + ", content=" + content + "]";
+    }
+
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/Config.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/Config.java
new file mode 100644
index 000000000..f62b81abc
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/Config.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.config;
+
+public class Config {
+    public static final String URL = "url";
+
+    public static final String METHOD = "method";
+    public static final String METHOD_DEFAULT_VALUE = "GET";
+
+    public static final String HEADERS = "headers";
+    public static final String PARAMS = "params";
+    public static final String BODY = "body";
+
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
new file mode 100644
index 000000000..91f983256
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.source;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.http.config.Config.BODY;
+import static 
org.apache.seatunnel.connectors.seatunnel.http.config.Config.HEADERS;
+import static 
org.apache.seatunnel.connectors.seatunnel.http.config.Config.METHOD;
+import static 
org.apache.seatunnel.connectors.seatunnel.http.config.Config.METHOD_DEFAULT_VALUE;
+import static 
org.apache.seatunnel.connectors.seatunnel.http.config.Config.PARAMS;
+import static org.apache.seatunnel.connectors.seatunnel.http.config.Config.URL;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.http.state.HttpState;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@AutoService(SeaTunnelSource.class)
+public class HttpSource implements SeaTunnelSource<SeaTunnelRow, 
HttpSourceSplit, HttpState> {
+    private final HttpSourceParameter parameter = new HttpSourceParameter();
+    private SeaTunnelRowType rowType;
+    private SeaTunnelContext seaTunnelContext;
+    @Override
+    public String getPluginName() {
+        return "Http";
+    }
+
+    @Override
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, URL);
+        if (!result.isSuccess()) {
+            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
+        }
+        this.parameter.setUrl(pluginConfig.getString(URL));
+
+        if (pluginConfig.hasPath(METHOD)) {
+            this.parameter.setMethod(pluginConfig.getString(METHOD));
+        } else {
+            this.parameter.setMethod(METHOD_DEFAULT_VALUE);
+        }
+
+        if (pluginConfig.hasPath(HEADERS)) {
+            
this.parameter.setHeaders(pluginConfig.getConfig(HEADERS).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
 entry -> String.valueOf(entry.getValue().unwrapped()), (v1, v2) -> v2)));
+        }
+
+        if (pluginConfig.hasPath(PARAMS)) {
+            
this.parameter.setHeaders(pluginConfig.getConfig(PARAMS).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
 entry -> String.valueOf(entry.getValue().unwrapped()), (v1, v2) -> v2)));
+        }
+
+        if (pluginConfig.hasPath(BODY)) {
+            this.parameter.setBody(pluginConfig.getString(BODY));
+        }
+        // TODO support user custom row type
+        this.rowType = new SeaTunnelRowType(new String[]{"content"}, new 
SeaTunnelDataType<?>[]{BasicType.STRING_TYPE});
+    }
+
+    @Override
+    public SeaTunnelContext getSeaTunnelContext() {
+        return this.seaTunnelContext;
+    }
+
+    @Override
+    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+        this.seaTunnelContext = seaTunnelContext;
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+        return this.rowType;
+    }
+
+    @Override
+    public SourceReader<SeaTunnelRow, HttpSourceSplit> 
createReader(SourceReader.Context readerContext) throws Exception {
+        return new HttpSourceReader(this.parameter, readerContext);
+    }
+
+    @Override
+    public SourceSplitEnumerator<HttpSourceSplit, HttpState> 
createEnumerator(SourceSplitEnumerator.Context<HttpSourceSplit> 
enumeratorContext) throws Exception {
+        return new HttpSourceSplitEnumerator(enumeratorContext);
+    }
+
+    @Override
+    public SourceSplitEnumerator<HttpSourceSplit, HttpState> 
restoreEnumerator(SourceSplitEnumerator.Context<HttpSourceSplit> 
enumeratorContext, HttpState checkpointState) throws Exception {
+        return new HttpSourceSplitEnumerator(enumeratorContext, 
checkpointState);
+    }
+
+    @Override
+    public Serializer<HttpState> getEnumeratorStateSerializer() {
+        return new DefaultSerializer<>();
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceParameter.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceParameter.java
new file mode 100644
index 000000000..5ec14f872
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceParameter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.source;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public class HttpSourceParameter implements Serializable {
+
+    private String url;
+    private String method;
+    private Map<String, String> headers;
+    private Map<String, String> params;
+    private String body;
+
+    public String getUrl() {
+        return url;
+    }
+
+    public void setUrl(String url) {
+        this.url = url;
+    }
+
+    public String getMethod() {
+        return method;
+    }
+
+    public void setMethod(String method) {
+        this.method = method;
+    }
+
+    public Map<String, String> getHeaders() {
+        return headers;
+    }
+
+    public void setHeaders(Map<String, String> headers) {
+        this.headers = headers;
+    }
+
+    public Map<String, String> getParams() {
+        return params;
+    }
+
+    public void setParams(Map<String, String> params) {
+        this.params = params;
+    }
+
+    public String getBody() {
+        return body;
+    }
+
+    public void setBody(String body) {
+        this.body = body;
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
new file mode 100644
index 000000000..cc31b2761
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.source;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse.STATUS_OK;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.http.client.HttpClientProvider;
+import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+
+public class HttpSourceReader implements SourceReader<SeaTunnelRow, 
HttpSourceSplit> {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(HttpSourceReader.class);
+    private final SourceReader.Context context;
+    private final HttpSourceParameter parameter;
+    private HttpClientProvider httpClient;
+
+    public HttpSourceReader(HttpSourceParameter parameter, 
SourceReader.Context context) {
+        this.context = context;
+        this.parameter = parameter;
+    }
+
+    @Override
+    public void open() {
+        httpClient = HttpClientProvider.getInstance();
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (Objects.nonNull(httpClient)) {
+            httpClient.close();
+        }
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+        try {
+            HttpResponse response = 
httpClient.execute(this.parameter.getUrl(), this.parameter.getMethod(), 
this.parameter.getHeaders(), this.parameter.getParams());
+            if (STATUS_OK == response.getCode()) {
+                output.collect(new SeaTunnelRow(new Object[] 
{response.getContent()}));
+                return;
+            }
+            LOGGER.error("http client execute exception, http response status 
code:[{}], content:[{}]", response.getCode(), response.getContent());
+        } catch (Exception e) {
+            LOGGER.error(e.getMessage(), e);
+        } finally {
+            if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
+                // signal to the source that we have reached the end of the 
data.
+                LOGGER.info("Closed the bounded fake source");
+                context.signalNoMoreElement();
+            }
+        }
+    }
+
+    @Override
+    public List<HttpSourceSplit> snapshotState(long checkpointId) throws 
Exception {
+        return null;
+    }
+
+    @Override
+    public void addSplits(List<HttpSourceSplit> splits) {
+
+    }
+
+    @Override
+    public void handleNoMoreSplits() {
+
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceSplit.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceSplit.java
new file mode 100644
index 000000000..3496c546c
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceSplit.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+public class HttpSourceSplit implements SourceSplit {
+
+    private final String splitId;
+
+    public HttpSourceSplit(String splitId) {
+        this.splitId = splitId;
+    }
+
+    @Override
+    public String splitId() {
+        return this.splitId;
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceSplitEnumerator.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceSplitEnumerator.java
new file mode 100644
index 000000000..f67d3335a
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceSplitEnumerator.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.source;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.connectors.seatunnel.http.state.HttpState;
+
+import java.io.IOException;
+import java.util.List;
+
+public class HttpSourceSplitEnumerator implements 
SourceSplitEnumerator<HttpSourceSplit, HttpState> {
+    private SourceSplitEnumerator.Context<HttpSourceSplit> enumeratorContext;
+    private HttpState httpState;
+
+    public 
HttpSourceSplitEnumerator(SourceSplitEnumerator.Context<HttpSourceSplit> 
enumeratorContext) {
+        this.enumeratorContext = enumeratorContext;
+    }
+
+    public 
HttpSourceSplitEnumerator(SourceSplitEnumerator.Context<HttpSourceSplit> 
enumeratorContext, HttpState httpState) {
+        this.enumeratorContext = enumeratorContext;
+        this.httpState = httpState;
+    }
+
+    @Override
+    public void open() {
+
+    }
+
+    @Override
+    public void run() throws Exception {
+
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public void addSplitsBack(List<HttpSourceSplit> splits, int subtaskId) {
+
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return 0;
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId) {
+
+    }
+
+    @Override
+    public void registerReader(int subtaskId) {
+
+    }
+
+    @Override
+    public HttpState snapshotState(long checkpointId) throws Exception {
+        return null;
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/state/HttpState.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/state/HttpState.java
new file mode 100644
index 000000000..5332f67c1
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/state/HttpState.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.state;
+
+import java.io.Serializable;
+
+public class HttpState implements Serializable {
+
+}

Reply via email to