This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6f49ec6ead [Feature][Connector-V2] Support 
TableSourceFactory/TableSinkFactory on http (#5816)
6f49ec6ead is described below

commit 6f49ec6ead82f842c6d93563ffce64853deb1bed
Author: lizhenglei <[email protected]>
AuthorDate: Wed Nov 22 10:05:58 2023 +0800

    [Feature][Connector-V2] Support TableSourceFactory/TableSinkFactory on http 
(#5816)
    
    
    
    ---------
    
    Co-authored-by: 80597928 <[email protected]>
---
 .../seatunnel/http/config/HttpConfig.java          |  2 +
 .../connectors/seatunnel/http/sink/HttpSink.java   | 18 ++---
 .../seatunnel/http/sink/HttpSinkFactory.java       | 10 +++
 .../seatunnel/http/source/HttpSource.java          | 78 +++++++++++++++-------
 .../seatunnel/http/source/HttpSourceFactory.java   | 16 ++++-
 .../seatunnel/feishu/sink/FeishuSink.java          | 11 +--
 .../seatunnel/feishu/sink/FeishuSinkFactory.java   | 11 +++
 .../seatunnel/github/source/GithubSource.java      | 19 ++----
 .../github/source/GithubSourceFactory.java         | 14 ++++
 .../seatunnel/gitlab/source/GitlabSource.java      | 37 +++++-----
 .../gitlab/source/GitlabSourceFactory.java         | 14 ++++
 .../seatunnel/jira/source/JiraSource.java          | 36 +++++-----
 .../seatunnel/jira/source/JiraSourceFactory.java   | 14 ++++
 .../seatunnel/klaviyo/source/KlaviyoSource.java    | 19 ++----
 .../klaviyo/source/KlaviyoSourceFactory.java       | 14 ++++
 .../seatunnel/lemlist/source/LemlistSource.java    | 19 ++----
 .../lemlist/source/LemlistSourceFactory.java       | 14 ++++
 .../seatunnel/myhours/source/MyHoursSource.java    | 19 ++----
 .../myhours/source/MyHoursSourceFactory.java       | 14 ++++
 .../seatunnel/notion/source/NotionSource.java      | 19 ++----
 .../notion/source/NotionSourceFactory.java         | 14 ++++
 .../onesignal/source/OneSignalSource.java          | 19 ++----
 .../onesignal/source/OneSignalSourceFactory.java   | 14 ++++
 .../persistiq/source/PersistiqSource.java          | 18 ++---
 .../persistiq/source/PersistiqSourceFactory.java   | 14 ++++
 .../seatunnel/wechat/sink/WeChatSink.java          | 11 +--
 .../connector-http-e2e/pom.xml                     |  6 ++
 .../seatunnel/e2e/connector/http/HttpIT.java       |  5 ++
 .../resources/http_jsonrequestbody_to_feishu.conf  | 43 ++++++++++++
 .../src/test/resources/mockserver-config.json      | 21 ++++++
 30 files changed, 391 insertions(+), 172 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
index 2a68249b86..de21b73c7c 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
@@ -24,6 +24,8 @@ import java.util.Map;
 
 public class HttpConfig {
     public static final String BASIC = "Basic";
+    public static final String CONNECTOR_IDENTITY = "Http";
+
     public static final int DEFAULT_RETRY_BACKOFF_MULTIPLIER_MS = 100;
     public static final int DEFAULT_RETRY_BACKOFF_MAX_MS = 10000;
     public static final boolean DEFAULT_ENABLE_MULTI_LINES = false;
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
index d8a1c5fafe..140e24a4f0 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
@@ -19,9 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.http.sink;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -34,25 +32,16 @@ import 
org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;
 import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
 import 
org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException;
 
-import com.google.auto.service.AutoService;
-
 import java.io.IOException;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-@AutoService(SeaTunnelSink.class)
 public class HttpSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
     protected final HttpParameter httpParameter = new HttpParameter();
     protected SeaTunnelRowType seaTunnelRowType;
     protected Config pluginConfig;
 
-    @Override
-    public String getPluginName() {
-        return "Http";
-    }
-
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
+    public HttpSink(Config pluginConfig, SeaTunnelRowType rowType) {
         this.pluginConfig = pluginConfig;
         CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
HttpConfig.URL.key());
         if (!result.isSuccess()) {
@@ -81,11 +70,12 @@ public class HttpSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
                                             entry -> 
String.valueOf(entry.getValue().unwrapped()),
                                             (v1, v2) -> v2)));
         }
+        this.seaTunnelRowType = rowType;
     }
 
     @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.seaTunnelRowType = seaTunnelRowType;
+    public String getPluginName() {
+        return "Http";
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
index 8411001fff..539563ecb6 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
@@ -18,8 +18,11 @@
 package org.apache.seatunnel.connectors.seatunnel.http.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;
 
 import com.google.auto.service.AutoService;
@@ -31,6 +34,13 @@ public class HttpSinkFactory implements TableSinkFactory {
         return "Http";
     }
 
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        CatalogTable catalogTable = context.getCatalogTable();
+        return () ->
+                new HttpSink(context.getOptions().toConfig(), 
catalogTable.getSeaTunnelRowType());
+    }
+
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
index 747b1ed4ab..314deded06 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
@@ -21,13 +21,16 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
 
 import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Boundedness;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -47,34 +50,23 @@ import 
org.apache.seatunnel.connectors.seatunnel.http.config.PageInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException;
 import org.apache.seatunnel.format.json.JsonDeserializationSchema;
 
-import com.google.auto.service.AutoService;
+import com.google.common.collect.Lists;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.Locale;
 
-@AutoService(SeaTunnelSource.class)
 public class HttpSource extends AbstractSingleSplitSource<SeaTunnelRow> {
     protected final HttpParameter httpParameter = new HttpParameter();
     protected PageInfo pageInfo;
-    protected SeaTunnelRowType rowType;
     protected JsonField jsonField;
     protected String contentField;
     protected JobContext jobContext;
     protected DeserializationSchema<SeaTunnelRow> deserializationSchema;
 
-    @Override
-    public String getPluginName() {
-        return "Http";
-    }
+    protected CatalogTable catalogTable;
 
-    @Override
-    public Boundedness getBoundedness() {
-        return JobMode.BATCH.equals(jobContext.getJobMode())
-                ? Boundedness.BOUNDED
-                : Boundedness.UNBOUNDED;
-    }
-
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
+    public HttpSource(Config pluginConfig) {
         CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
HttpConfig.URL.key());
         if (!result.isSuccess()) {
             throw new HttpConnectorException(
@@ -88,6 +80,18 @@ public class HttpSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
         buildPagingWithConfig(pluginConfig);
     }
 
+    @Override
+    public String getPluginName() {
+        return HttpConfig.CONNECTOR_IDENTITY;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return JobMode.BATCH.equals(jobContext.getJobMode())
+                ? Boundedness.BOUNDED
+                : Boundedness.UNBOUNDED;
+    }
+
     private void buildPagingWithConfig(Config pluginConfig) {
         if (pluginConfig.hasPath(HttpConfig.PAGEING.key())) {
             pageInfo = new PageInfo();
@@ -114,7 +118,7 @@ public class HttpSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
 
     protected void buildSchemaWithConfig(Config pluginConfig) {
         if (pluginConfig.hasPath(TableSchemaOptions.SCHEMA.key())) {
-            this.rowType = 
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
+            this.catalogTable = CatalogTableUtil.buildWithConfig(pluginConfig);
             // default use json format
             HttpConfig.ResponseFormat format = 
HttpConfig.FORMAT.defaultValue();
             if (pluginConfig.hasPath(HttpConfig.FORMAT.key())) {
@@ -127,7 +131,8 @@ public class HttpSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
             switch (format) {
                 case JSON:
                     this.deserializationSchema =
-                            new JsonDeserializationSchema(false, false, 
rowType);
+                            new JsonDeserializationSchema(
+                                    false, false, 
catalogTable.getSeaTunnelRowType());
                     if (pluginConfig.hasPath(HttpConfig.JSON_FIELD.key())) {
                         jsonField =
                                 
getJsonField(pluginConfig.getConfig(HttpConfig.JSON_FIELD.key()));
@@ -145,8 +150,33 @@ public class HttpSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
                                     format));
             }
         } else {
-            this.rowType = CatalogTableUtil.buildSimpleTextSchema();
-            this.deserializationSchema = new 
SimpleTextDeserializationSchema(this.rowType);
+            TableIdentifier tableIdentifier =
+                    TableIdentifier.of(HttpConfig.CONNECTOR_IDENTITY, null, 
null);
+            TableSchema tableSchema =
+                    TableSchema.builder()
+                            .column(
+                                    PhysicalColumn.of(
+                                            "content",
+                                            new SeaTunnelRowType(
+                                                    new String[] {"content"},
+                                                    new SeaTunnelDataType<?>[] 
{
+                                                        BasicType.STRING_TYPE
+                                                    }),
+                                            0,
+                                            false,
+                                            null,
+                                            null))
+                            .build();
+
+            this.catalogTable =
+                    CatalogTable.of(
+                            tableIdentifier,
+                            tableSchema,
+                            Collections.emptyMap(),
+                            Collections.emptyList(),
+                            null);
+            this.deserializationSchema =
+                    new 
SimpleTextDeserializationSchema(catalogTable.getSeaTunnelRowType());
         }
     }
 
@@ -156,8 +186,8 @@ public class HttpSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
     }
 
     @Override
-    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
-        return this.rowType;
+    public List<CatalogTable> getProducedCatalogTables() {
+        return Lists.newArrayList(catalogTable);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceFactory.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceFactory.java
index 21bc3940e1..c0a276d723 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceFactory.java
@@ -19,14 +19,18 @@ package 
org.apache.seatunnel.connectors.seatunnel.http.source;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
 import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;
-import org.apache.seatunnel.connectors.seatunnel.http.config.HttpRequestMethod;
 
 import com.google.auto.service.AutoService;
 
+import java.io.Serializable;
+
 @AutoService(Factory.class)
 public class HttpSourceFactory implements TableSourceFactory {
 
@@ -40,6 +44,14 @@ public class HttpSourceFactory implements TableSourceFactory 
{
         return getHttpBuilder().build();
     }
 
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        return () ->
+                (SeaTunnelSource<T, SplitT, StateT>)
+                        new HttpSource(context.getOptions().toConfig());
+    }
+
     public OptionRule.Builder getHttpBuilder() {
         return OptionRule.builder()
                 .required(HttpConfig.URL)
@@ -47,9 +59,9 @@ public class HttpSourceFactory implements TableSourceFactory {
                 .optional(HttpConfig.HEADERS)
                 .optional(HttpConfig.PARAMS)
                 .optional(HttpConfig.FORMAT)
+                .optional(HttpConfig.BODY)
                 .optional(HttpConfig.JSON_FIELD)
                 .optional(HttpConfig.CONTENT_FIELD)
-                .conditional(HttpConfig.METHOD, HttpRequestMethod.POST, 
HttpConfig.BODY)
                 .conditional(
                         HttpConfig.FORMAT,
                         HttpConfig.ResponseFormat.JSON,
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSink.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSink.java
index 5656ad11b2..b3fbaa6a5b 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSink.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSink.java
@@ -17,13 +17,16 @@
 
 package org.apache.seatunnel.connectors.seatunnel.feishu.sink;
 
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSink;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import com.google.auto.service.AutoService;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSink;
 
-@AutoService(SeaTunnelSink.class)
 public class FeishuSink extends HttpSink {
+    public FeishuSink(Config pluginConfig, SeaTunnelRowType rowType) {
+        super(pluginConfig, rowType);
+    }
+
     @Override
     public String getPluginName() {
         return "Feishu";
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSinkFactory.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSinkFactory.java
index 2ee37f048b..f9cd6ee01c 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSinkFactory.java
@@ -17,13 +17,24 @@
 
 package org.apache.seatunnel.connectors.seatunnel.feishu.sink;
 
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSinkFactory;
 
 import com.google.auto.service.AutoService;
 
 @AutoService(Factory.class)
 public class FeishuSinkFactory extends HttpSinkFactory {
+
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        CatalogTable catalogTable = context.getCatalogTable();
+        return () ->
+                new FeishuSink(context.getOptions().toConfig(), 
catalogTable.getSeaTunnelRowType());
+    }
+
     @Override
     public String factoryIdentifier() {
         return "Feishu";
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-github/src/main/java/org/apache/seatunnel/connectors/seatunnel/github/source/GithubSource.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-github/src/main/java/org/apache/seatunnel/connectors/seatunnel/github/source/GithubSource.java
index e5bdec5744..0c8315502a 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-github/src/main/java/org/apache/seatunnel/connectors/seatunnel/github/source/GithubSource.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-github/src/main/java/org/apache/seatunnel/connectors/seatunnel/github/source/GithubSource.java
@@ -19,9 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.github.source;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
@@ -34,24 +32,17 @@ import 
org.apache.seatunnel.connectors.seatunnel.github.exception.GithubConnecto
 import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSource;
 import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceReader;
 
-import com.google.auto.service.AutoService;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
-@AutoService(SeaTunnelSource.class)
 public class GithubSource extends HttpSource {
 
     public static final String PLUGIN_NAME = "Github";
 
     private final GithubSourceParameter githubSourceParam = new 
GithubSourceParameter();
 
-    @Override
-    public String getPluginName() {
-        return PLUGIN_NAME;
-    }
-
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
+    public GithubSource(Config pluginConfig) {
+        super(pluginConfig);
         CheckResult result =
                 CheckConfigUtil.checkAllExists(pluginConfig, 
GithubSourceConfig.URL.key());
         if (!result.isSuccess()) {
@@ -62,7 +53,11 @@ public class GithubSource extends HttpSource {
                             getPluginName(), PluginType.SOURCE, 
result.getMsg()));
         }
         githubSourceParam.buildWithConfig(pluginConfig);
-        buildSchemaWithConfig(pluginConfig);
+    }
+
+    @Override
+    public String getPluginName() {
+        return PLUGIN_NAME;
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-github/src/main/java/org/apache/seatunnel/connectors/seatunnel/github/source/GithubSourceFactory.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-github/src/main/java/org/apache/seatunnel/connectors/seatunnel/github/source/GithubSourceFactory.java
index fa1b9f9de8..ca5ad45108 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-github/src/main/java/org/apache/seatunnel/connectors/seatunnel/github/source/GithubSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-github/src/main/java/org/apache/seatunnel/connectors/seatunnel/github/source/GithubSourceFactory.java
@@ -18,12 +18,18 @@
 package org.apache.seatunnel.connectors.seatunnel.github.source;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
 import 
org.apache.seatunnel.connectors.seatunnel.github.config.GithubSourceConfig;
 import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceFactory;
 
 import com.google.auto.service.AutoService;
 
+import java.io.Serializable;
+
 @AutoService(Factory.class)
 public class GithubSourceFactory extends HttpSourceFactory {
 
@@ -32,6 +38,14 @@ public class GithubSourceFactory extends HttpSourceFactory {
         return GithubSource.PLUGIN_NAME;
     }
 
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        return () ->
+                (SeaTunnelSource<T, SplitT, StateT>)
+                        new GithubSource(context.getOptions().toConfig());
+    }
+
     @Override
     public OptionRule optionRule() {
         return 
getHttpBuilder().required(GithubSourceConfig.ACCESS_TOKEN).build();
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-gitlab/src/main/java/org/apache/seatunnel/connectors/seatunnel/gitlab/source/GitlabSource.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-gitlab/src/main/java/org/apache/seatunnel/connectors/seatunnel/gitlab/source/GitlabSource.java
index 524bb06600..73c687b859 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-gitlab/src/main/java/org/apache/seatunnel/connectors/seatunnel/gitlab/source/GitlabSource.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-gitlab/src/main/java/org/apache/seatunnel/connectors/seatunnel/gitlab/source/GitlabSource.java
@@ -19,10 +19,8 @@ package 
org.apache.seatunnel.connectors.seatunnel.gitlab.source;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.source.Boundedness;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
@@ -36,30 +34,14 @@ import 
org.apache.seatunnel.connectors.seatunnel.gitlab.source.exception.GitlabC
 import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSource;
 import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceReader;
 
-import com.google.auto.service.AutoService;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
-@AutoService(SeaTunnelSource.class)
 public class GitlabSource extends HttpSource {
     private final GitlabSourceParameter gitlabSourceParameter = new 
GitlabSourceParameter();
 
-    @Override
-    public String getPluginName() {
-        return "Gitlab";
-    }
-
-    @Override
-    public Boundedness getBoundedness() {
-        if (JobMode.BATCH.equals(jobContext.getJobMode())) {
-            return Boundedness.BOUNDED;
-        }
-        throw new UnsupportedOperationException(
-                "Gitlab source connector not support unbounded operation");
-    }
-
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
+    public GitlabSource(Config pluginConfig) {
+        super(pluginConfig);
         CheckResult result =
                 CheckConfigUtil.checkAllExists(
                         pluginConfig,
@@ -73,7 +55,20 @@ public class GitlabSource extends HttpSource {
                             getPluginName(), PluginType.SOURCE, 
result.getMsg()));
         }
         this.gitlabSourceParameter.buildWithConfig(pluginConfig);
-        buildSchemaWithConfig(pluginConfig);
+    }
+
+    @Override
+    public String getPluginName() {
+        return "Gitlab";
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        if (JobMode.BATCH.equals(jobContext.getJobMode())) {
+            return Boundedness.BOUNDED;
+        }
+        throw new UnsupportedOperationException(
+                "Gitlab source connector not support unbounded operation");
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-gitlab/src/main/java/org/apache/seatunnel/connectors/seatunnel/gitlab/source/GitlabSourceFactory.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-gitlab/src/main/java/org/apache/seatunnel/connectors/seatunnel/gitlab/source/GitlabSourceFactory.java
index c4a0f3e9ac..f400934967 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-gitlab/src/main/java/org/apache/seatunnel/connectors/seatunnel/gitlab/source/GitlabSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-gitlab/src/main/java/org/apache/seatunnel/connectors/seatunnel/gitlab/source/GitlabSourceFactory.java
@@ -18,12 +18,18 @@
 package org.apache.seatunnel.connectors.seatunnel.gitlab.source;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
 import 
org.apache.seatunnel.connectors.seatunnel.gitlab.source.config.GitlabSourceConfig;
 import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceFactory;
 
 import com.google.auto.service.AutoService;
 
+import java.io.Serializable;
+
 @AutoService(Factory.class)
 public class GitlabSourceFactory extends HttpSourceFactory {
     @Override
@@ -31,6 +37,14 @@ public class GitlabSourceFactory extends HttpSourceFactory {
         return "Gitlab";
     }
 
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        return () ->
+                (SeaTunnelSource<T, SplitT, StateT>)
+                        new GitlabSource(context.getOptions().toConfig());
+    }
+
     @Override
     public OptionRule optionRule() {
         return 
getHttpBuilder().required(GitlabSourceConfig.ACCESS_TOKEN).build();
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-jira/src/main/java/org/apache/seatunnel/connectors/seatunnel/jira/source/JiraSource.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-jira/src/main/java/org/apache/seatunnel/connectors/seatunnel/jira/source/JiraSource.java
index bd4a57cdd7..4055563a94 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-jira/src/main/java/org/apache/seatunnel/connectors/seatunnel/jira/source/JiraSource.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-jira/src/main/java/org/apache/seatunnel/connectors/seatunnel/jira/source/JiraSource.java
@@ -21,7 +21,6 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.source.Boundedness;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
@@ -34,32 +33,16 @@ import 
org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceReader;
 import 
org.apache.seatunnel.connectors.seatunnel.jira.source.config.JiraSourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.jira.source.config.JiraSourceParameter;
 
-import com.google.auto.service.AutoService;
 import lombok.extern.slf4j.Slf4j;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.http.util.AuthorizationUtil.getTokenByBasicAuth;
 
 @Slf4j
-@AutoService(SeaTunnelSource.class)
 public class JiraSource extends HttpSource {
     private final JiraSourceParameter jiraSourceParameter = new 
JiraSourceParameter();
 
-    @Override
-    public String getPluginName() {
-        return "Jira";
-    }
-
-    @Override
-    public Boundedness getBoundedness() {
-        if (JobMode.BATCH.equals(jobContext.getJobMode())) {
-            return Boundedness.BOUNDED;
-        }
-        throw new UnsupportedOperationException(
-                "Jira source connector not support unbounded operation");
-    }
-
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
+    protected JiraSource(Config pluginConfig) {
+        super(pluginConfig);
         CheckResult result =
                 CheckConfigUtil.checkAllExists(
                         pluginConfig,
@@ -75,7 +58,20 @@ public class JiraSource extends HttpSource {
                         pluginConfig.getString(JiraSourceConfig.EMAIL.key()),
                         
pluginConfig.getString(JiraSourceConfig.API_TOKEN.key()));
         jiraSourceParameter.buildWithConfig(pluginConfig, accessToken);
-        buildSchemaWithConfig(pluginConfig);
+    }
+
+    @Override
+    public String getPluginName() {
+        return "Jira";
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        if (JobMode.BATCH.equals(jobContext.getJobMode())) {
+            return Boundedness.BOUNDED;
+        }
+        throw new UnsupportedOperationException(
+                "Jira source connector not support unbounded operation");
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-jira/src/main/java/org/apache/seatunnel/connectors/seatunnel/jira/source/JiraSourceFactory.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-jira/src/main/java/org/apache/seatunnel/connectors/seatunnel/jira/source/JiraSourceFactory.java
index 7c809ea421..c06cf98ff1 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-jira/src/main/java/org/apache/seatunnel/connectors/seatunnel/jira/source/JiraSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-jira/src/main/java/org/apache/seatunnel/connectors/seatunnel/jira/source/JiraSourceFactory.java
@@ -18,12 +18,18 @@
 package org.apache.seatunnel.connectors.seatunnel.jira.source;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
 import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.jira.source.config.JiraSourceConfig;
 
 import com.google.auto.service.AutoService;
 
+import java.io.Serializable;
+
 @AutoService(Factory.class)
 public class JiraSourceFactory extends HttpSourceFactory {
     @Override
@@ -31,6 +37,14 @@ public class JiraSourceFactory extends HttpSourceFactory {
         return "Jira";
     }
 
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        return () ->
+                (SeaTunnelSource<T, SplitT, StateT>)
+                        new JiraSource(context.getOptions().toConfig());
+    }
+
     @Override
     public OptionRule optionRule() {
         return getHttpBuilder()
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-klaviyo/src/main/java/org/apache/seatunnel/connectors/seatunnel/klaviyo/source/KlaviyoSource.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-klaviyo/src/main/java/org/apache/seatunnel/connectors/seatunnel/klaviyo/source/KlaviyoSource.java
index 158507bfca..408cf5299a 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-klaviyo/src/main/java/org/apache/seatunnel/connectors/seatunnel/klaviyo/source/KlaviyoSource.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-klaviyo/src/main/java/org/apache/seatunnel/connectors/seatunnel/klaviyo/source/KlaviyoSource.java
@@ -19,9 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.klaviyo.source;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
@@ -34,21 +32,14 @@ import 
org.apache.seatunnel.connectors.seatunnel.klaviyo.source.config.KlaviyoSo
 import 
org.apache.seatunnel.connectors.seatunnel.klaviyo.source.config.KlaviyoSourceParameter;
 import 
org.apache.seatunnel.connectors.seatunnel.klaviyo.source.config.exception.KlaviyoConnectorException;
 
-import com.google.auto.service.AutoService;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
-@AutoService(SeaTunnelSource.class)
 public class KlaviyoSource extends HttpSource {
     private final KlaviyoSourceParameter klaviyoSourceParameter = new 
KlaviyoSourceParameter();
 
-    @Override
-    public String getPluginName() {
-        return "Klaviyo";
-    }
-
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
+    public KlaviyoSource(Config pluginConfig) {
+        super(pluginConfig);
         CheckResult result =
                 CheckConfigUtil.checkAllExists(
                         pluginConfig,
@@ -63,7 +54,11 @@ public class KlaviyoSource extends HttpSource {
                             getPluginName(), PluginType.SOURCE, 
result.getMsg()));
         }
         this.klaviyoSourceParameter.buildWithConfig(pluginConfig);
-        buildSchemaWithConfig(pluginConfig);
+    }
+
+    @Override
+    public String getPluginName() {
+        return "Klaviyo";
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-klaviyo/src/main/java/org/apache/seatunnel/connectors/seatunnel/klaviyo/source/KlaviyoSourceFactory.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-klaviyo/src/main/java/org/apache/seatunnel/connectors/seatunnel/klaviyo/source/KlaviyoSourceFactory.java
index 4b2817122d..44517f3524 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-klaviyo/src/main/java/org/apache/seatunnel/connectors/seatunnel/klaviyo/source/KlaviyoSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-klaviyo/src/main/java/org/apache/seatunnel/connectors/seatunnel/klaviyo/source/KlaviyoSourceFactory.java
@@ -18,12 +18,18 @@
 package org.apache.seatunnel.connectors.seatunnel.klaviyo.source;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
 import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.klaviyo.source.config.KlaviyoSourceConfig;
 
 import com.google.auto.service.AutoService;
 
+import java.io.Serializable;
+
 @AutoService(Factory.class)
 public class KlaviyoSourceFactory extends HttpSourceFactory {
     @Override
@@ -31,6 +37,14 @@ public class KlaviyoSourceFactory extends HttpSourceFactory {
         return "Klaviyo";
     }
 
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        return () ->
+                (SeaTunnelSource<T, SplitT, StateT>)
+                        new KlaviyoSource(context.getOptions().toConfig());
+    }
+
     @Override
     public OptionRule optionRule() {
         return getHttpBuilder()
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-lemlist/src/main/java/org/apache/seatunnel/connectors/seatunnel/lemlist/source/LemlistSource.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-lemlist/src/main/java/org/apache/seatunnel/connectors/seatunnel/lemlist/source/LemlistSource.java
index 35a6404678..123a758635 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-lemlist/src/main/java/org/apache/seatunnel/connectors/seatunnel/lemlist/source/LemlistSource.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-lemlist/src/main/java/org/apache/seatunnel/connectors/seatunnel/lemlist/source/LemlistSource.java
@@ -19,9 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.lemlist.source;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
@@ -34,23 +32,16 @@ import 
org.apache.seatunnel.connectors.seatunnel.lemlist.source.config.LemlistSo
 import 
org.apache.seatunnel.connectors.seatunnel.lemlist.source.config.LemlistSourceParameter;
 import 
org.apache.seatunnel.connectors.seatunnel.lemlist.source.exception.LemlistConnectorException;
 
-import com.google.auto.service.AutoService;
 import lombok.extern.slf4j.Slf4j;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.http.util.AuthorizationUtil.getTokenByBasicAuth;
 
 @Slf4j
-@AutoService(SeaTunnelSource.class)
 public class LemlistSource extends HttpSource {
     private final LemlistSourceParameter lemlistSourceParameter = new 
LemlistSourceParameter();
 
-    @Override
-    public String getPluginName() {
-        return "Lemlist";
-    }
-
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
+    public LemlistSource(Config pluginConfig) {
+        super(pluginConfig);
         CheckResult result =
                 CheckConfigUtil.checkAllExists(
                         pluginConfig,
@@ -67,7 +58,11 @@ public class LemlistSource extends HttpSource {
         String accessToken =
                 getTokenByBasicAuth("", 
pluginConfig.getString(LemlistSourceConfig.PASSWORD.key()));
         lemlistSourceParameter.buildWithConfig(pluginConfig, accessToken);
-        buildSchemaWithConfig(pluginConfig);
+    }
+
+    @Override
+    public String getPluginName() {
+        return "Lemlist";
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-lemlist/src/main/java/org/apache/seatunnel/connectors/seatunnel/lemlist/source/LemlistSourceFactory.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-lemlist/src/main/java/org/apache/seatunnel/connectors/seatunnel/lemlist/source/LemlistSourceFactory.java
index d6ebff02f2..8581961b38 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-lemlist/src/main/java/org/apache/seatunnel/connectors/seatunnel/lemlist/source/LemlistSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-lemlist/src/main/java/org/apache/seatunnel/connectors/seatunnel/lemlist/source/LemlistSourceFactory.java
@@ -18,12 +18,18 @@
 package org.apache.seatunnel.connectors.seatunnel.lemlist.source;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
 import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.lemlist.source.config.LemlistSourceConfig;
 
 import com.google.auto.service.AutoService;
 
+import java.io.Serializable;
+
 @AutoService(Factory.class)
 public class LemlistSourceFactory extends HttpSourceFactory {
     @Override
@@ -31,6 +37,14 @@ public class LemlistSourceFactory extends HttpSourceFactory {
         return "Lemlist";
     }
 
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        return () ->
+                (SeaTunnelSource<T, SplitT, StateT>)
+                        new LemlistSource(context.getOptions().toConfig());
+    }
+
     @Override
     public OptionRule optionRule() {
         return getHttpBuilder().required(LemlistSourceConfig.PASSWORD).build();
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java
index 2ac6ad2b15..31f606afdd 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java
@@ -19,9 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.myhours.source;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
@@ -38,7 +36,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.myhours.source.config.MyHoursSo
 import 
org.apache.seatunnel.connectors.seatunnel.myhours.source.exception.MyHoursConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.myhours.source.exception.MyHoursConnectorException;
 
-import com.google.auto.service.AutoService;
 import com.google.common.base.Strings;
 import lombok.extern.slf4j.Slf4j;
 
@@ -46,17 +43,11 @@ import java.io.IOException;
 import java.util.Map;
 
 @Slf4j
-@AutoService(SeaTunnelSource.class)
 public class MyHoursSource extends HttpSource {
     private final MyHoursSourceParameter myHoursSourceParameter = new 
MyHoursSourceParameter();
 
-    @Override
-    public String getPluginName() {
-        return "MyHours";
-    }
-
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
+    protected MyHoursSource(Config pluginConfig) {
+        super(pluginConfig);
         CheckResult result =
                 CheckConfigUtil.checkAllExists(
                         pluginConfig,
@@ -73,7 +64,11 @@ public class MyHoursSource extends HttpSource {
         // Login to get accessToken
         String accessToken = getAccessToken(pluginConfig);
         this.myHoursSourceParameter.buildWithConfig(pluginConfig, accessToken);
-        buildSchemaWithConfig(pluginConfig);
+    }
+
+    @Override
+    public String getPluginName() {
+        return "MyHours";
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSourceFactory.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSourceFactory.java
index 24daeba1be..7e080e19b7 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSourceFactory.java
@@ -18,12 +18,18 @@
 package org.apache.seatunnel.connectors.seatunnel.myhours.source;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
 import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.myhours.source.config.MyHoursSourceConfig;
 
 import com.google.auto.service.AutoService;
 
+import java.io.Serializable;
+
 @AutoService(Factory.class)
 public class MyHoursSourceFactory extends HttpSourceFactory {
     @Override
@@ -31,6 +37,14 @@ public class MyHoursSourceFactory extends HttpSourceFactory {
         return "MyHours";
     }
 
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        return () ->
+                (SeaTunnelSource<T, SplitT, StateT>)
+                        new MyHoursSource(context.getOptions().toConfig());
+    }
+
     @Override
     public OptionRule optionRule() {
         return getHttpBuilder()
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-notion/src/main/java/org/apache/seatunnel/connectors/seatunnel/notion/source/NotionSource.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-notion/src/main/java/org/apache/seatunnel/connectors/seatunnel/notion/source/NotionSource.java
index b21c94324f..eaddf987b8 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-notion/src/main/java/org/apache/seatunnel/connectors/seatunnel/notion/source/NotionSource.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-notion/src/main/java/org/apache/seatunnel/connectors/seatunnel/notion/source/NotionSource.java
@@ -19,9 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.notion.source;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
@@ -34,21 +32,14 @@ import 
org.apache.seatunnel.connectors.seatunnel.notion.source.config.NotionSour
 import 
org.apache.seatunnel.connectors.seatunnel.notion.source.config.NotionSourceParameter;
 import 
org.apache.seatunnel.connectors.seatunnel.notion.source.exception.NotionConnectorException;
 
-import com.google.auto.service.AutoService;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
-@AutoService(SeaTunnelSource.class)
 public class NotionSource extends HttpSource {
     private final NotionSourceParameter notionSourceParameter = new 
NotionSourceParameter();
 
-    @Override
-    public String getPluginName() {
-        return "Notion";
-    }
-
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
+    protected NotionSource(Config pluginConfig) {
+        super(pluginConfig);
         CheckResult result =
                 CheckConfigUtil.checkAllExists(
                         pluginConfig,
@@ -63,7 +54,11 @@ public class NotionSource extends HttpSource {
                             getPluginName(), PluginType.SOURCE, 
result.getMsg()));
         }
         notionSourceParameter.buildWithConfig(pluginConfig);
-        buildSchemaWithConfig(pluginConfig);
+    }
+
+    @Override
+    public String getPluginName() {
+        return "Notion";
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-notion/src/main/java/org/apache/seatunnel/connectors/seatunnel/notion/source/NotionSourceFactory.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-notion/src/main/java/org/apache/seatunnel/connectors/seatunnel/notion/source/NotionSourceFactory.java
index 0d7503ec77..3811799deb 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-notion/src/main/java/org/apache/seatunnel/connectors/seatunnel/notion/source/NotionSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-notion/src/main/java/org/apache/seatunnel/connectors/seatunnel/notion/source/NotionSourceFactory.java
@@ -18,12 +18,18 @@
 package org.apache.seatunnel.connectors.seatunnel.notion.source;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
 import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.notion.source.config.NotionSourceConfig;
 
 import com.google.auto.service.AutoService;
 
+import java.io.Serializable;
+
 @AutoService(Factory.class)
 public class NotionSourceFactory extends HttpSourceFactory {
     @Override
@@ -31,6 +37,14 @@ public class NotionSourceFactory extends HttpSourceFactory {
         return "Notion";
     }
 
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        return () ->
+                (SeaTunnelSource<T, SplitT, StateT>)
+                        new NotionSource(context.getOptions().toConfig());
+    }
+
     @Override
     public OptionRule optionRule() {
         return getHttpBuilder()
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-onesignal/src/main/java/org/apache/seatunnel/connectors/seatunnel/onesignal/source/OneSignalSource.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-onesignal/src/main/java/org/apache/seatunnel/connectors/seatunnel/onesignal/source/OneSignalSource.java
index 2e0a4e2e53..0ec750bb97 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-onesignal/src/main/java/org/apache/seatunnel/connectors/seatunnel/onesignal/source/OneSignalSource.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-onesignal/src/main/java/org/apache/seatunnel/connectors/seatunnel/onesignal/source/OneSignalSource.java
@@ -19,9 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.onesignal.source;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
@@ -34,22 +32,15 @@ import 
org.apache.seatunnel.connectors.seatunnel.onesignal.source.config.OneSign
 import 
org.apache.seatunnel.connectors.seatunnel.onesignal.source.config.OneSignalSourceParameter;
 import 
org.apache.seatunnel.connectors.seatunnel.onesignal.source.config.exception.OneSignalConnectorException;
 
-import com.google.auto.service.AutoService;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
-@AutoService(SeaTunnelSource.class)
 public class OneSignalSource extends HttpSource {
     private final OneSignalSourceParameter oneSignalSourceParameter =
             new OneSignalSourceParameter();
 
-    @Override
-    public String getPluginName() {
-        return "OneSignal";
-    }
-
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
+    protected OneSignalSource(Config pluginConfig) {
+        super(pluginConfig);
         CheckResult result =
                 CheckConfigUtil.checkAllExists(
                         pluginConfig,
@@ -63,7 +54,11 @@ public class OneSignalSource extends HttpSource {
                             getPluginName(), PluginType.SOURCE, 
result.getMsg()));
         }
         oneSignalSourceParameter.buildWithConfig(pluginConfig);
-        buildSchemaWithConfig(pluginConfig);
+    }
+
+    @Override
+    public String getPluginName() {
+        return "OneSignal";
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-onesignal/src/main/java/org/apache/seatunnel/connectors/seatunnel/onesignal/source/OneSignalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-onesignal/src/main/java/org/apache/seatunnel/connectors/seatunnel/onesignal/source/OneSignalSourceFactory.java
index 283f6c1c34..c37e44ddaf 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-onesignal/src/main/java/org/apache/seatunnel/connectors/seatunnel/onesignal/source/OneSignalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-onesignal/src/main/java/org/apache/seatunnel/connectors/seatunnel/onesignal/source/OneSignalSourceFactory.java
@@ -18,12 +18,18 @@
 package org.apache.seatunnel.connectors.seatunnel.onesignal.source;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
 import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.onesignal.source.config.OneSignalSourceConfig;
 
 import com.google.auto.service.AutoService;
 
+import java.io.Serializable;
+
 @AutoService(Factory.class)
 public class OneSignalSourceFactory extends HttpSourceFactory {
     @Override
@@ -31,6 +37,14 @@ public class OneSignalSourceFactory extends 
HttpSourceFactory {
         return "OneSignal";
     }
 
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        return () ->
+                (SeaTunnelSource<T, SplitT, StateT>)
+                        new OneSignalSource(context.getOptions().toConfig());
+    }
+
     @Override
     public OptionRule optionRule() {
         return 
getHttpBuilder().required(OneSignalSourceConfig.PASSWORD).build();
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-persistiq/src/main/java/org/apache/seatunnel/connectors/seatunnel/persistiq/source/PersistiqSource.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-persistiq/src/main/java/org/apache/seatunnel/connectors/seatunnel/persistiq/source/PersistiqSource.java
index 11fd142898..1af30d1e41 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-persistiq/src/main/java/org/apache/seatunnel/connectors/seatunnel/persistiq/source/PersistiqSource.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-persistiq/src/main/java/org/apache/seatunnel/connectors/seatunnel/persistiq/source/PersistiqSource.java
@@ -20,7 +20,6 @@ package 
org.apache.seatunnel.connectors.seatunnel.persistiq.source;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
@@ -32,22 +31,15 @@ import 
org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceReader;
 import 
org.apache.seatunnel.connectors.seatunnel.persistiq.source.config.PersistiqSourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.persistiq.source.config.PersistiqSourceParameter;
 
-import com.google.auto.service.AutoService;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
-@AutoService(SeaTunnelSource.class)
 public class PersistiqSource extends HttpSource {
     private final PersistiqSourceParameter persistiqSourceParameter =
             new PersistiqSourceParameter();
 
-    @Override
-    public String getPluginName() {
-        return "Persistiq";
-    }
-
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
+    public PersistiqSource(Config pluginConfig) {
+        super(pluginConfig);
         CheckResult result =
                 CheckConfigUtil.checkAllExists(
                         pluginConfig,
@@ -57,7 +49,11 @@ public class PersistiqSource extends HttpSource {
             throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
         }
         persistiqSourceParameter.buildWithConfig(pluginConfig);
-        buildSchemaWithConfig(pluginConfig);
+    }
+
+    @Override
+    public String getPluginName() {
+        return "Persistiq";
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-persistiq/src/main/java/org/apache/seatunnel/connectors/seatunnel/persistiq/source/PersistiqSourceFactory.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-persistiq/src/main/java/org/apache/seatunnel/connectors/seatunnel/persistiq/source/PersistiqSourceFactory.java
index 4fd1e0522a..fa19c99789 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-persistiq/src/main/java/org/apache/seatunnel/connectors/seatunnel/persistiq/source/PersistiqSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-persistiq/src/main/java/org/apache/seatunnel/connectors/seatunnel/persistiq/source/PersistiqSourceFactory.java
@@ -18,12 +18,18 @@
 package org.apache.seatunnel.connectors.seatunnel.persistiq.source;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
 import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.persistiq.source.config.PersistiqSourceConfig;
 
 import com.google.auto.service.AutoService;
 
+import java.io.Serializable;
+
 @AutoService(Factory.class)
 public class PersistiqSourceFactory extends HttpSourceFactory {
     @Override
@@ -31,6 +37,14 @@ public class PersistiqSourceFactory extends 
HttpSourceFactory {
         return "Persistiq";
     }
 
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        return () ->
+                (SeaTunnelSource<T, SplitT, StateT>)
+                        new PersistiqSource(context.getOptions().toConfig());
+    }
+
     @Override
     public OptionRule optionRule() {
         return 
getHttpBuilder().required(PersistiqSourceConfig.PASSWORD).build();
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java
index 7e0cb99548..ca6459bee1 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java
@@ -17,19 +17,22 @@
 
 package org.apache.seatunnel.connectors.seatunnel.wechat.sink;
 
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSink;
 import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSinkWriter;
 import 
org.apache.seatunnel.connectors.seatunnel.wechat.sink.config.WeChatSinkConfig;
 
-import com.google.auto.service.AutoService;
-
-@AutoService(SeaTunnelSink.class)
 public class WeChatSink extends HttpSink {
 
+    public WeChatSink(Config pluginConfig, SeaTunnelRowType rowType) {
+        super(pluginConfig, rowType);
+    }
+
     @Override
     public String getPluginName() {
         return "WeChat";
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/pom.xml
index 05428e78d2..84b73a5998 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/pom.xml
@@ -92,6 +92,12 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-http-feishu</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
index bd85ed876e..6266905a7b 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
@@ -149,9 +149,14 @@ public class HttpIT extends TestSuiteBase implements 
TestResource {
         Container.ExecResult execResult15 =
                 container.executeJob("/http_page_increase_page_num.conf");
         Assertions.assertEquals(0, execResult15.getExitCode());
+
         Container.ExecResult execResult16 =
                 container.executeJob("/http_page_increase_no_page_num.conf");
         Assertions.assertEquals(0, execResult16.getExitCode());
+
+        Container.ExecResult execResult17 =
+                container.executeJob("/http_jsonrequestbody_to_feishu.conf");
+        Assertions.assertEquals(0, execResult17.getExitCode());
     }
 
     public String getMockServerConfig() {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_jsonrequestbody_to_feishu.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_jsonrequestbody_to_feishu.conf
new file mode 100644
index 0000000000..5027863ce6
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_jsonrequestbody_to_feishu.conf
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Http {
+    result_table_name = "http"
+    url = "http://mockserver:1080/example/jsonBody";
+    method = "POST"
+    body="{"id":1}"
+    format = "json"
+    schema = {
+      fields {
+        name = string
+        age = int
+      }
+    }
+  }
+}
+
+sink {
+   Feishu {
+               url = 
"http://mockserver:1080/example/feishu/108bb8f208d9b2378c8c7aedad715c19";
+           }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
index 9cb561225d..d41975d133 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
@@ -4670,5 +4670,26 @@
         "hasNext": false
       }
     }
+  },
+  {
+    "httpRequest": {
+      "path": "/example/feishu/108bb8f208d9b2378c8c7aedad715c19",
+      "method": "POST"
+    },
+    "httpResponse": {
+      "body": [
+        {
+          "name": "lzl",
+          "age": 18
+        },
+        {
+          "name": "pizz",
+          "age": 19
+        }
+      ],
+      "headers": {
+        "Content-Type": "application/json"
+      }
+    }
   }
 ]

Reply via email to