This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c15032c035 [Feature][Connector-V2] Add Airtable source and sink 
(#10469)
c15032c035 is described below

commit c15032c035a3b95c4bb6b260ef5a4921decbbfaa
Author: xiaosiyuan <[email protected]>
AuthorDate: Tue Mar 3 16:24:37 2026 +0800

    [Feature][Connector-V2] Add Airtable source and sink (#10469)
---
 config/plugin_config                               |   1 +
 .../changelog/connector-http-airtable.md           |   6 +
 docs/en/connectors/sink/Airtable.md                |  88 +++++++++
 docs/en/connectors/source/Airtable.md              | 182 ++++++++++++++++++
 .../changelog/connector-http-airtable.md           |   6 +
 docs/zh/connectors/sink/Airtable.md                |  88 +++++++++
 docs/zh/connectors/source/Airtable.md              | 182 ++++++++++++++++++
 plugin-mapping.properties                          |   2 +
 .../{ => connector-http-airtable}/pom.xml          |  29 ++-
 .../seatunnel/airtable/config/AirtableConfig.java  | 115 ++++++++++++
 .../seatunnel/airtable/sink/AirtableSink.java      |  90 +++++++++
 .../airtable/sink/AirtableSinkFactory.java         |  58 ++++++
 .../airtable/sink/AirtableSinkWriter.java          | 204 +++++++++++++++++++++
 .../airtable/sink/config/AirtableSinkOptions.java  |  39 ++++
 .../seatunnel/airtable/source/AirtableSource.java  |  88 +++++++++
 .../airtable/source/AirtableSourceFactory.java     |  92 ++++++++++
 .../airtable/source/AirtableSourceReader.java      | 120 ++++++++++++
 .../source/config/AirtableSourceOptions.java       | 100 ++++++++++
 .../source/config/AirtableSourceParameter.java     | 181 ++++++++++++++++++
 .../seatunnel/airtable/AirtableFactoryTest.java    |  33 ++++
 .../airtable/sink/AirtableSinkWriterTest.java      | 117 ++++++++++++
 .../airtable/source/AirtableSourceReaderTest.java  | 102 +++++++++++
 .../seatunnel/http/source/HttpSourceReader.java    |  19 +-
 seatunnel-connectors-v2/connector-http/pom.xml     |   1 +
 seatunnel-dist/pom.xml                             |   6 +
 .../connector-http-e2e/pom.xml                     |   6 +
 .../seatunnel/e2e/connector/http/HttpIT.java       |  12 ++
 .../test/resources/airtable_json_to_assert.conf    |  89 +++++++++
 .../src/test/resources/fake_to_airtable.conf       |  53 ++++++
 .../src/test/resources/mockserver-config.json      | 115 ++++++++++++
 30 files changed, 2198 insertions(+), 26 deletions(-)

diff --git a/config/plugin_config b/config/plugin_config
index bae0026bd9..1a79f35c39 100644
--- a/config/plugin_config
+++ b/config/plugin_config
@@ -59,6 +59,7 @@ connector-http-myhours
 connector-http-notion
 connector-http-onesignal
 connector-http-wechat
+connector-http-airtable
 connector-hudi
 connector-iceberg
 connector-influxdb
diff --git a/docs/en/connectors/changelog/connector-http-airtable.md 
b/docs/en/connectors/changelog/connector-http-airtable.md
new file mode 100644
index 0000000000..1cf41e674b
--- /dev/null
+++ b/docs/en/connectors/changelog/connector-http-airtable.md
@@ -0,0 +1,6 @@
+<details><summary> Change Log </summary>
+
+| Change | Commit | Version |
+| --- | --- | --- |
+
+</details>
diff --git a/docs/en/connectors/sink/Airtable.md 
b/docs/en/connectors/sink/Airtable.md
new file mode 100644
index 0000000000..5c50ae016f
--- /dev/null
+++ b/docs/en/connectors/sink/Airtable.md
@@ -0,0 +1,88 @@
+import ChangeLog from '../changelog/connector-http-airtable.md';
+
+# Airtable
+
+> Airtable sink connector
+
+## Description
+
+Used to write data to Airtable.
+
+## Key Features
+
+- [ ] [exactly-once](../../introduction/concepts/connector-v2-features.md)
+- [ ] [cdc](../../introduction/concepts/connector-v2-features.md)
+- [ ] [support multiple table 
write](../../introduction/concepts/connector-v2-features.md)
+
+## Options
+
+|            name             |  type   | required | default value |
+|-----------------------------|---------|----------|---------------|
+| token                       | String  | Yes      | -             |
+| base_id                     | String  | Yes      | -             |
+| table                       | String  | Yes      | -             |
+| api_base_url                | String  | No       | https://api.airtable.com |
+| typecast                    | boolean | No       | false         |
+| batch_size                  | int     | No       | 10            |
+| request_interval_ms         | int     | No       | 220           |
+| rate_limit_backoff_ms       | int     | No       | 30000         |
+| rate_limit_max_retries      | int     | No       | 3             |
+| common-options              |         | No       | -             |
+
+### token [String]
+
+Airtable personal access token. You can create one at 
https://airtable.com/create/tokens.
+
+### base_id [String]
+
+The ID of the Airtable base (starts with `app`).
+
+### table [String]
+
+The table name or table ID to write to.
+
+### api_base_url [String]
+
+Airtable API base URL. Default is `https://api.airtable.com`.
+
+### typecast [boolean]
+
+If true, Airtable will automatically convert values to match the field type. 
Default false.
+
+### batch_size [int]
+
+Number of records per API request. Maximum 10 per Airtable API limit. Default 
10.
+
+### request_interval_ms [int]
+
+Minimum interval in milliseconds between API requests. Default 220ms.
+
+### rate_limit_backoff_ms [int]
+
+Base backoff time in milliseconds when receiving a 429 (rate limit) response. 
Default 30000ms.
+
+### rate_limit_max_retries [int]
+
+Maximum number of retries after receiving a 429 response. Default 3.
+
+### common options
+
+Sink plugin common parameters, please refer to [Sink Common 
Options](../common-options/sink-common-options.md) for details.
+
+## Example
+
+```hocon
+sink {
+  Airtable {
+    token = "patXXXXXXXX.XXXXXXXX"
+    base_id = "appXXXXXXXX"
+    table = "Shipments"
+    typecast = true
+    batch_size = 10
+  }
+}
+```
+
+## Changelog
+
+<ChangeLog />
diff --git a/docs/en/connectors/source/Airtable.md 
b/docs/en/connectors/source/Airtable.md
new file mode 100644
index 0000000000..61acae4983
--- /dev/null
+++ b/docs/en/connectors/source/Airtable.md
@@ -0,0 +1,182 @@
+import ChangeLog from '../changelog/connector-http-airtable.md';
+
+# Airtable
+
+> Airtable source connector
+
+## Description
+
+Used to read data from Airtable.
+
+## Key features
+
+- [x] [batch](../../introduction/concepts/connector-v2-features.md)
+- [ ] [stream](../../introduction/concepts/connector-v2-features.md)
+- [ ] [exactly-once](../../introduction/concepts/connector-v2-features.md)
+- [x] [column projection](../../introduction/concepts/connector-v2-features.md)
+- [ ] [parallelism](../../introduction/concepts/connector-v2-features.md)
+- [ ] [support user-defined 
split](../../introduction/concepts/connector-v2-features.md)
+
+## Options
+
+|            name             |  type   | required | default value |
+|-----------------------------|---------|----------|---------------|
+| token                       | String  | Yes      | -             |
+| base_id                     | String  | Yes      | -             |
+| table                       | String  | Yes      | -             |
+| api_base_url                | String  | No       | https://api.airtable.com |
+| view                        | String  | No       | -             |
+| fields                      | List    | No       | -             |
+| filter_by_formula           | String  | No       | -             |
+| max_records                 | int     | No       | -             |
+| page_size                   | int     | No       | -             |
+| sort                        | String  | No       | -             |
+| cell_format                 | String  | No       | -             |
+| return_fields_by_field_id   | boolean | No       | -             |
+| record_metadata             | List    | No       | -             |
+| time_zone                   | String  | No       | -             |
+| user_locale                 | String  | No       | -             |
+| request_interval_ms         | int     | No       | 220           |
+| rate_limit_backoff_ms       | int     | No       | 30000         |
+| rate_limit_max_retries      | int     | No       | 3             |
+| schema                      | Config  | No       | -             |
+| schema.fields               | Config  | No       | -             |
+| format                      | String  | No       | text          |
+| content_field               | String  | No       | -             |
+| json_field                  | Config  | No       | -             |
+| common-options              | config  | No       | -             |
+
+### token [String]
+
+Airtable personal access token. You can create one at 
https://airtable.com/create/tokens.
+
+### base_id [String]
+
+The ID of the Airtable base (starts with `app`).
+
+### table [String]
+
+The table name or table ID to read from.
+
+### api_base_url [String]
+
+Airtable API base URL. Default is `https://api.airtable.com`.
+
+### view [String]
+
+The name or ID of a view in the table. Only records visible in this view will 
be returned.
+
+### fields [List]
+
+A list of field names to include in the response.
+
+### filter_by_formula [String]
+
+An Airtable formula to filter records. See [Airtable formula 
reference](https://support.airtable.com/docs/formula-field-reference).
+
+### max_records [int]
+
+Maximum total number of records to return.
+
+### page_size [int]
+
+Number of records per page (1-100).
+
+### sort [String]
+
+Sort definition as a JSON array, e.g. `[{"field":"Name","direction":"asc"}]`.
+
+### cell_format [String]
+
+The format for cell values, either `json` or `string`.
+
+### return_fields_by_field_id [boolean]
+
+If true, field keys in the response will be field IDs instead of field names.
+
+### record_metadata [List]
+
+Additional record metadata to return, e.g. `["commentCount"]`.
+
+### time_zone [String]
+
+The time zone for formatting date/time values.
+
+### user_locale [String]
+
+The user locale for formatting values.
+
+### request_interval_ms [int]
+
+Minimum interval in milliseconds between API requests. Default 220ms (to stay 
within Airtable's 5 requests/second limit).
+
+### rate_limit_backoff_ms [int]
+
+Base backoff time in milliseconds when receiving a 429 (rate limit) response. 
Default 30000ms.
+
+### rate_limit_max_retries [int]
+
+Maximum number of retries after receiving a 429 response. Default 3.
+
+### schema [Config]
+
+#### fields [Config]
+
+The schema fields of upstream data. For more details, please refer to [Schema 
Feature](../../introduction/concepts/schema-feature.md).
+
+### format [String]
+
+The format of upstream data, supports `json` and `text`, default `text`.
+
+### content_field [String]
+
+JsonPath expression to extract data from the response. For Airtable, you 
typically use `$.records[*].fields` to extract the fields from each record.
+
+### json_field [Config]
+
+This parameter helps you configure the schema and must be used with schema.
+
+### common options
+
+Source plugin common parameters, please refer to [Source Common 
Options](../common-options/source-common-options.md) for details.
+
+## Example
+
+Read from an Airtable table and output raw text:
+
+```hocon
+source {
+  Airtable {
+    token = "patXXXXXXXX.XXXXXXXX"
+    base_id = "appXXXXXXXX"
+    table = "Shipments"
+    format = "text"
+    max_records = 10
+  }
+}
+```
+
+Read with schema and extract record fields:
+
+```hocon
+source {
+  Airtable {
+    token = "patXXXXXXXX.XXXXXXXX"
+    base_id = "appXXXXXXXX"
+    table = "Shipments"
+    content_field = "$.records[*].fields"
+    filter_by_formula = "{Status} = 'Shipped'"
+    schema = {
+      fields {
+        Name = string
+        Status = string
+        Weight = float
+      }
+    }
+  }
+}
+```
+
+## Changelog
+
+<ChangeLog />
diff --git a/docs/zh/connectors/changelog/connector-http-airtable.md 
b/docs/zh/connectors/changelog/connector-http-airtable.md
new file mode 100644
index 0000000000..1cf41e674b
--- /dev/null
+++ b/docs/zh/connectors/changelog/connector-http-airtable.md
@@ -0,0 +1,6 @@
+<details><summary> Change Log </summary>
+
+| Change | Commit | Version |
+| --- | --- | --- |
+
+</details>
diff --git a/docs/zh/connectors/sink/Airtable.md 
b/docs/zh/connectors/sink/Airtable.md
new file mode 100644
index 0000000000..cc8c1e49d1
--- /dev/null
+++ b/docs/zh/connectors/sink/Airtable.md
@@ -0,0 +1,88 @@
+import ChangeLog from '../changelog/connector-http-airtable.md';
+
+# Airtable
+
+> Airtable Sink 连接器
+
+## 描述
+
+用于将数据写入 Airtable。
+
+## 关键特性
+
+- [ ] [精确一次](../../introduction/concepts/connector-v2-features.md)
+- [ ] [cdc](../../introduction/concepts/connector-v2-features.md)
+- [ ] [支持多表写入](../../introduction/concepts/connector-v2-features.md)
+
+## 选项
+
+| 参数名 | 类型 | 必须 | 默认值 |
+|--------|------|------|--------|
+| token                       | String  | 是 | -             |
+| base_id                     | String  | 是 | -             |
+| table                       | String  | 是 | -             |
+| api_base_url                | String  | 否 | https://api.airtable.com |
+| typecast                    | boolean | 否 | false         |
+| batch_size                  | int     | 否 | 10            |
+| request_interval_ms         | int     | 否 | 220           |
+| rate_limit_backoff_ms       | int     | 否 | 30000         |
+| rate_limit_max_retries      | int     | 否 | 3             |
+| common-options              |         | 否 | -             |
+
+### token [String]
+
+Airtable 个人访问令牌。可在 https://airtable.com/create/tokens 创建。
+
+### base_id [String]
+
+Airtable Base ID(以 `app` 开头)。
+
+### table [String]
+
+要写入的表名或表 ID。
+
+### api_base_url [String]
+
+Airtable API 基础 URL,默认 `https://api.airtable.com`。
+
+### typecast [boolean]
+
+如果为 true,Airtable 会自动将值转换为匹配的字段类型。默认 false。
+
+### batch_size [int]
+
+每次 API 请求的记录数,受 Airtable API 限制最大为 10。默认 10。
+
+### request_interval_ms [int]
+
+API 请求之间的最小间隔(毫秒),默认 220ms。
+
+### rate_limit_backoff_ms [int]
+
+收到 429(限流)响应时的基础退避时间(毫秒),默认 30000ms。
+
+### rate_limit_max_retries [int]
+
+收到 429 响应后的最大重试次数,默认 3。
+
+### common options
+
+汇插件通用参数,请参考 [Sink Common Options](../common-options/sink-common-options.md)。
+
+## 示例
+
+```hocon
+sink {
+  Airtable {
+    token = "patXXXXXXXX.XXXXXXXX"
+    base_id = "appXXXXXXXX"
+    table = "Shipments"
+    typecast = true
+    batch_size = 10
+  }
+}
+```
+
+## 变更日志
+
+<ChangeLog />
diff --git a/docs/zh/connectors/source/Airtable.md 
b/docs/zh/connectors/source/Airtable.md
new file mode 100644
index 0000000000..19bbb70df1
--- /dev/null
+++ b/docs/zh/connectors/source/Airtable.md
@@ -0,0 +1,182 @@
+import ChangeLog from '../changelog/connector-http-airtable.md';
+
+# Airtable
+
+> Airtable 源连接器
+
+## 描述
+
+用于从 Airtable 读取数据。
+
+## 关键特性
+
+- [x] [批](../../introduction/concepts/connector-v2-features.md)
+- [ ] [流](../../introduction/concepts/connector-v2-features.md)
+- [ ] [精确一次](../../introduction/concepts/connector-v2-features.md)
+- [x] [列投影](../../introduction/concepts/connector-v2-features.md)
+- [ ] [并行性](../../introduction/concepts/connector-v2-features.md)
+- [ ] [支持用户自定义split](../../introduction/concepts/connector-v2-features.md)
+
+## 选项
+
+| 参数名 | 类型 | 必须 | 默认值 |
+|--------|------|------|--------|
+| token                       | String  | 是 | -             |
+| base_id                     | String  | 是 | -             |
+| table                       | String  | 是 | -             |
+| api_base_url                | String  | 否 | https://api.airtable.com |
+| view                        | String  | 否 | -             |
+| fields                      | List    | 否 | -             |
+| filter_by_formula           | String  | 否 | -             |
+| max_records                 | int     | 否 | -             |
+| page_size                   | int     | 否 | -             |
+| sort                        | String  | 否 | -             |
+| cell_format                 | String  | 否 | -             |
+| return_fields_by_field_id   | boolean | 否 | -             |
+| record_metadata             | List    | 否 | -             |
+| time_zone                   | String  | 否 | -             |
+| user_locale                 | String  | 否 | -             |
+| request_interval_ms         | int     | 否 | 220           |
+| rate_limit_backoff_ms       | int     | 否 | 30000         |
+| rate_limit_max_retries      | int     | 否 | 3             |
+| schema                      | Config  | 否 | -             |
+| schema.fields               | Config  | 否 | -             |
+| format                      | String  | 否 | text          |
+| content_field               | String  | 否 | -             |
+| json_field                  | Config  | 否 | -             |
+| common-options              | config  | 否 | -             |
+
+### token [String]
+
+Airtable 个人访问令牌。可在 https://airtable.com/create/tokens 创建。
+
+### base_id [String]
+
+Airtable Base ID(以 `app` 开头)。
+
+### table [String]
+
+要读取的表名或表 ID。
+
+### api_base_url [String]
+
+Airtable API 基础 URL,默认 `https://api.airtable.com`。
+
+### view [String]
+
+视图名称或 ID,仅返回该视图中可见的记录。
+
+### fields [List]
+
+要包含在响应中的字段名列表。
+
+### filter_by_formula [String]
+
+Airtable 公式表达式,用于过滤记录。参考 [Airtable 
公式文档](https://support.airtable.com/docs/formula-field-reference)。
+
+### max_records [int]
+
+返回的最大记录总数。
+
+### page_size [int]
+
+每页记录数(1-100)。
+
+### sort [String]
+
+排序定义 JSON 数组,例如 `[{"field":"Name","direction":"asc"}]`。
+
+### cell_format [String]
+
+单元格值格式,`json` 或 `string`。
+
+### return_fields_by_field_id [boolean]
+
+如果为 true,响应中的字段键将使用字段 ID 而非字段名。
+
+### record_metadata [List]
+
+要返回的额外记录元数据,例如 `["commentCount"]`。
+
+### time_zone [String]
+
+用于格式化日期/时间值的时区。
+
+### user_locale [String]
+
+用于格式化值的用户区域设置。
+
+### request_interval_ms [int]
+
+API 请求之间的最小间隔(毫秒),默认 220ms(以保持在 Airtable 每秒 5 次请求的限制内)。
+
+### rate_limit_backoff_ms [int]
+
+收到 429(限流)响应时的基础退避时间(毫秒),默认 30000ms。
+
+### rate_limit_max_retries [int]
+
+收到 429 响应后的最大重试次数,默认 3。
+
+### schema [Config]
+
+#### fields [Config]
+
+上游数据的模式字段。更多详情请参考 [Schema 特性](../../introduction/concepts/schema-feature.md)。
+
+### format [String]
+
+上游数据的格式,支持 `json` 和 `text`,默认 `text`。
+
+### content_field [String]
+
+用于从响应中提取数据的 JsonPath 表达式。对于 Airtable,通常使用 `$.records[*].fields` 来提取每条记录的字段。
+
+### json_field [Config]
+
+此参数帮助您配置模式,必须与 schema 一起使用。
+
+### common options
+
+源插件通用参数,请参考 [Source Common 
Options](../common-options/source-common-options.md)。
+
+## 示例
+
+读取 Airtable 表并输出原始文本:
+
+```hocon
+source {
+  Airtable {
+    token = "patXXXXXXXX.XXXXXXXX"
+    base_id = "appXXXXXXXX"
+    table = "Shipments"
+    format = "text"
+    max_records = 10
+  }
+}
+```
+
+指定 schema 并提取记录字段:
+
+```hocon
+source {
+  Airtable {
+    token = "patXXXXXXXX.XXXXXXXX"
+    base_id = "appXXXXXXXX"
+    table = "Shipments"
+    content_field = "$.records[*].fields"
+    filter_by_formula = "{Status} = 'Shipped'"
+    schema = {
+      fields {
+        Name = string
+        Status = string
+        Weight = float
+      }
+    }
+  }
+}
+```
+
+## 变更日志
+
+<ChangeLog />
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 7c0c652a52..e45b0082d1 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -98,6 +98,8 @@ seatunnel.source.Jira = connector-http-jira
 seatunnel.source.Gitlab = connector-http-gitlab
 seatunnel.source.Github = connector-http-github
 seatunnel.source.Notion = connector-http-notion
+seatunnel.source.Airtable = connector-http-airtable
+seatunnel.sink.Airtable = connector-http-airtable
 seatunnel.sink.RabbitMQ = connector-rabbitmq
 seatunnel.source.RabbitMQ = connector-rabbitmq
 seatunnel.source.OpenMldb = connector-openmldb
diff --git a/seatunnel-connectors-v2/connector-http/pom.xml 
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/pom.xml
similarity index 61%
copy from seatunnel-connectors-v2/connector-http/pom.xml
copy to seatunnel-connectors-v2/connector-http/connector-http-airtable/pom.xml
index e32339a5f0..ec734bf3be 100644
--- a/seatunnel-connectors-v2/connector-http/pom.xml
+++ b/seatunnel-connectors-v2/connector-http/connector-http-airtable/pom.xml
@@ -22,26 +22,19 @@
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.seatunnel</groupId>
-        <artifactId>seatunnel-connectors-v2</artifactId>
+        <artifactId>connector-http</artifactId>
         <version>${revision}</version>
     </parent>
-    <artifactId>connector-http</artifactId>
-    <packaging>pom</packaging>
-    <name>SeaTunnel : Connectors V2 : Http :</name>
 
-    <modules>
-        <module>connector-http-base</module>
-        <module>connector-http-feishu</module>
-        <module>connector-http-wechat</module>
-        <module>connector-http-myhours</module>
-        <module>connector-http-lemlist</module>
-        <module>connector-http-klaviyo</module>
-        <module>connector-http-onesignal</module>
-        <module>connector-http-jira</module>
-        <module>connector-http-gitlab</module>
-        <module>connector-http-github</module>
-        <module>connector-http-notion</module>
-        <module>connector-http-persistiq</module>
-    </modules>
+    <artifactId>connector-http-airtable</artifactId>
+    <name>SeaTunnel : Connectors V2 : Http : Airtable</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-http-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
 
 </project>
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/config/AirtableConfig.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/config/AirtableConfig.java
new file mode 100644
index 0000000000..548ad29ae3
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/config/AirtableConfig.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.airtable.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpCommonOptions;
+
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+public class AirtableConfig extends HttpCommonOptions {
+
+    public static final String AUTHORIZATION = "Authorization";
+    public static final String BEARER = "Bearer";
+    public static final String CONTENT_TYPE = "Content-Type";
+    public static final String APPLICATION_JSON = "application/json";
+
+    public static final String DEFAULT_API_BASE_URL = 
"https://api.airtable.com";;
+
+    private static final String API_VERSION_PATH = "/v0";
+
+    public static final Option<String> API_BASE_URL =
+            Options.key("api_base_url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Airtable API base URL, default is 
https://api.airtable.com";);
+
+    public static final Option<String> TOKEN =
+            Options.key("token")
+                    .stringType()
+                    .noDefaultValue()
+                    .withFallbackKeys("api_key")
+                    .withDescription("Airtable personal access token");
+
+    public static final Option<String> BASE_ID =
+            Options.key("base_id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Airtable base ID");
+
+    public static final Option<String> TABLE =
+            Options.key("table")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Airtable table name or table ID");
+
+    public static final Option<Integer> REQUEST_INTERVAL_MS =
+            Options.key("request_interval_ms")
+                    .intType()
+                    .defaultValue(220)
+                    .withDescription(
+                            "Minimum interval in milliseconds between Airtable 
API requests, must be >= 0.");
+
+    public static final Option<Integer> RATE_LIMIT_BACKOFF_MS =
+            Options.key("rate_limit_backoff_ms")
+                    .intType()
+                    .defaultValue(30000)
+                    .withDescription(
+                            "Base backoff time in milliseconds when Airtable 
returns 429, must be >= 0.");
+
+    public static final Option<Integer> RATE_LIMIT_MAX_RETRIES =
+            Options.key("rate_limit_max_retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription(
+                            "Maximum retries after receiving Airtable 429 
responses, must be >= 0.");
+
+    public static String buildBaseUrl(String apiBaseUrl, String baseId, String 
table) {
+        String normalized =
+                apiBaseUrl.endsWith("/")
+                        ? apiBaseUrl.substring(0, apiBaseUrl.length() - 1)
+                        : apiBaseUrl;
+        if (!normalized.endsWith(API_VERSION_PATH)) {
+            normalized = normalized + API_VERSION_PATH;
+        }
+        return normalized + "/" + baseId + "/" + encodePathSegment(table);
+    }
+
+    public static String encodePathSegment(String value) {
+        try {
+            String encoded = URLEncoder.encode(value, 
StandardCharsets.UTF_8.name());
+            return encoded.replace("+", "%20");
+        } catch (java.io.UnsupportedEncodingException e) {
+            throw new IllegalStateException("UTF-8 encoding is not supported", 
e);
+        }
+    }
+
+    public static Map<String, String> buildAuthHeaders(
+            String token, Map<String, String> existingHeaders) {
+        Map<String, String> headers =
+                
Optional.ofNullable(existingHeaders).map(HashMap::new).orElse(new HashMap<>());
+        headers.put(AUTHORIZATION, BEARER + " " + token);
+        headers.put(CONTENT_TYPE, APPLICATION_JSON);
+        return headers;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/sink/AirtableSink.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/sink/AirtableSink.java
new file mode 100644
index 0000000000..ae3bf36aa3
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/sink/AirtableSink.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.airtable.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.airtable.config.AirtableConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.airtable.sink.config.AirtableSinkOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
+
+import java.io.IOException;
+import java.util.Optional;
+
+public class AirtableSink extends AbstractSimpleSink<SeaTunnelRow, Void>
+        implements SupportMultiTableSink {
+
+    private final CatalogTable catalogTable;
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final HttpParameter httpParameter;
+    private final int batchSize;
+    private final boolean typecast;
+    private final int requestIntervalMs;
+    private final int rateLimitBackoffMs;
+    private final int rateLimitMaxRetries;
+
+    public AirtableSink(ReadonlyConfig pluginConfig, CatalogTable 
catalogTable) {
+        this.catalogTable = catalogTable;
+        this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
+
+        String baseId = pluginConfig.get(AirtableConfig.BASE_ID);
+        String table = pluginConfig.get(AirtableConfig.TABLE);
+        String token = pluginConfig.get(AirtableConfig.TOKEN);
+        String apiBaseUrl =
+                pluginConfig
+                        .getOptional(AirtableConfig.API_BASE_URL)
+                        .orElse(AirtableConfig.DEFAULT_API_BASE_URL);
+
+        this.httpParameter = new HttpParameter();
+        this.httpParameter.setUrl(AirtableConfig.buildBaseUrl(apiBaseUrl, 
baseId, table));
+        this.httpParameter.setHeaders(AirtableConfig.buildAuthHeaders(token, 
null));
+
+        this.batchSize = pluginConfig.get(AirtableSinkOptions.BATCH_SIZE);
+        this.typecast = pluginConfig.get(AirtableSinkOptions.TYPECAST);
+        this.requestIntervalMs = 
pluginConfig.get(AirtableConfig.REQUEST_INTERVAL_MS);
+        this.rateLimitBackoffMs = 
pluginConfig.get(AirtableConfig.RATE_LIMIT_BACKOFF_MS);
+        this.rateLimitMaxRetries = 
pluginConfig.get(AirtableConfig.RATE_LIMIT_MAX_RETRIES);
+    }
+
+    @Override
+    public String getPluginName() {
+        return "Airtable";
+    }
+
+    @Override
+    public AirtableSinkWriter createWriter(SinkWriter.Context context) throws 
IOException {
+        return new AirtableSinkWriter(
+                seaTunnelRowType,
+                httpParameter,
+                batchSize,
+                typecast,
+                requestIntervalMs,
+                rateLimitBackoffMs,
+                rateLimitMaxRetries);
+    }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.ofNullable(catalogTable);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/sink/AirtableSinkFactory.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/sink/AirtableSinkFactory.java
new file mode 100644
index 0000000000..3273a6d128
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/sink/AirtableSinkFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.airtable.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.options.SinkConnectorCommonOptions;
+import org.apache.seatunnel.api.table.connector.TableSink;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import 
org.apache.seatunnel.connectors.seatunnel.airtable.config.AirtableConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.airtable.sink.config.AirtableSinkOptions;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class AirtableSinkFactory implements TableSinkFactory {
+
+    @Override
+    public String factoryIdentifier() {
+        return "Airtable";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(AirtableConfig.TOKEN, AirtableConfig.BASE_ID, 
AirtableConfig.TABLE)
+                .optional(
+                        AirtableConfig.API_BASE_URL,
+                        AirtableSinkOptions.TYPECAST,
+                        AirtableSinkOptions.BATCH_SIZE,
+                        AirtableConfig.REQUEST_INTERVAL_MS,
+                        AirtableConfig.RATE_LIMIT_BACKOFF_MS,
+                        AirtableConfig.RATE_LIMIT_MAX_RETRIES,
+                        SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
+                .build();
+    }
+
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        return () -> new AirtableSink(context.getOptions(), 
context.getCatalogTable());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/sink/AirtableSinkWriter.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/sink/AirtableSinkWriter.java
new file mode 100644
index 0000000000..97cd959893
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/sink/AirtableSinkWriter.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.airtable.sink;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ArrayNode;
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import 
org.apache.seatunnel.connectors.seatunnel.http.client.HttpClientProvider;
+import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+@Slf4j
+public class AirtableSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
+        implements SupportMultiTableSinkWriter<Void> {
+
+    private static final int STATUS_TOO_MANY_REQUESTS = 429;
+    private static final long MAX_BACKOFF_MILLIS = 300000L;
+
+    private final HttpClientProvider httpClient;
+    private final String url;
+    private final Map<String, String> headers;
+    private final JsonSerializationSchema serializationSchema;
+    private final ObjectMapper objectMapper;
+    private final int batchSize;
+    private final boolean typecast;
+    private final int requestIntervalMs;
+    private final int rateLimitBackoffMs;
+    private final int rateLimitMaxRetries;
+    private final List<SeaTunnelRow> batchBuffer;
+    private long lastRequestTimeMillis;
+
+    public AirtableSinkWriter(
+            SeaTunnelRowType seaTunnelRowType,
+            HttpParameter httpParameter,
+            int batchSize,
+            boolean typecast,
+            int requestIntervalMs,
+            int rateLimitBackoffMs,
+            int rateLimitMaxRetries) {
+        this.url = httpParameter.getUrl();
+        this.headers = httpParameter.getHeaders();
+        this.httpClient = new HttpClientProvider(httpParameter);
+        this.serializationSchema = new 
JsonSerializationSchema(seaTunnelRowType);
+        this.objectMapper = serializationSchema.getMapper();
+        this.batchSize = Math.min(Math.max(batchSize, 1), 10);
+        this.typecast = typecast;
+        this.requestIntervalMs = Math.max(0, requestIntervalMs);
+        this.rateLimitBackoffMs = Math.max(0, rateLimitBackoffMs);
+        this.rateLimitMaxRetries = Math.max(0, rateLimitMaxRetries);
+        this.batchBuffer = new ArrayList<>(this.batchSize);
+        this.lastRequestTimeMillis = 0L;
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+        batchBuffer.add(element);
+        if (batchBuffer.size() >= batchSize) {
+            flush();
+        }
+    }
+
+    private void flush() throws IOException {
+        if (batchBuffer.isEmpty()) {
+            return;
+        }
+
+        String body = buildRequestBody();
+        sendWithRateLimitRetry(body);
+        batchBuffer.clear();
+    }
+
+    private String buildRequestBody() throws IOException {
+        ObjectNode root = objectMapper.createObjectNode();
+        ArrayNode records = objectMapper.createArrayNode();
+
+        for (SeaTunnelRow row : batchBuffer) {
+            byte[] serialized = serializationSchema.serialize(row);
+            JsonNode fieldsNode = objectMapper.readTree(serialized);
+            ObjectNode record = objectMapper.createObjectNode();
+            record.set("fields", fieldsNode);
+            records.add(record);
+        }
+
+        root.set("records", records);
+        if (typecast) {
+            root.put("typecast", true);
+        }
+
+        return objectMapper.writeValueAsString(root);
+    }
+
+    private void sendWithRateLimitRetry(String body) throws IOException {
+        int retryCount = 0;
+        while (true) {
+            waitForRequestSlot();
+            try {
+                HttpResponse response = httpClient.doPost(url, headers, body);
+                if (HttpResponse.STATUS_OK == response.getCode()) {
+                    return;
+                }
+                if (response.getCode() == STATUS_TOO_MANY_REQUESTS
+                        && retryCount < rateLimitMaxRetries) {
+                    retryCount++;
+                    long backoffMillis = calculateBackoffMillis(retryCount);
+                    log.warn(
+                            "Airtable API rate limit reached, retry {}/{} 
after {} ms",
+                            retryCount,
+                            rateLimitMaxRetries,
+                            backoffMillis);
+                    try {
+                        Thread.sleep(backoffMillis);
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        throw new RuntimeException(e);
+                    }
+                    continue;
+                }
+                throw new IOException(
+                        String.format(
+                                "Airtable API request failed, status 
code:[%s], content:[%s]",
+                                response.getCode(), response.getContent()));
+            } catch (IOException e) {
+                throw e;
+            } catch (Exception e) {
+                throw new IOException("Failed to send Airtable API request", 
e);
+            }
+        }
+    }
+
+    private void waitForRequestSlot() {
+        if (requestIntervalMs <= 0) {
+            return;
+        }
+        long now = System.currentTimeMillis();
+        long elapsed = now - lastRequestTimeMillis;
+        if (elapsed < requestIntervalMs) {
+            try {
+                Thread.sleep(requestIntervalMs - elapsed);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException(e);
+            }
+        }
+        lastRequestTimeMillis = System.currentTimeMillis();
+    }
+
+    private long calculateBackoffMillis(int retryCount) {
+        if (rateLimitBackoffMs <= 0) {
+            return 0L;
+        }
+        long exponential = 1L << Math.min(20, Math.max(0, retryCount - 1));
+        long waitMillis = rateLimitBackoffMs * exponential;
+        return Math.min(waitMillis, MAX_BACKOFF_MILLIS);
+    }
+
+    @Override
+    public Optional<Void> prepareCommit() {
+        try {
+            flush();
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to flush data in 
prepareCommit", e);
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public void close() throws IOException {
+        flush();
+        if (Objects.nonNull(httpClient)) {
+            httpClient.close();
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/sink/config/AirtableSinkOptions.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/sink/config/AirtableSinkOptions.java
new file mode 100644
index 0000000000..fb1ab7ca5f
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/sink/config/AirtableSinkOptions.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.airtable.sink.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import 
org.apache.seatunnel.connectors.seatunnel.airtable.config.AirtableConfig;
+
+public class AirtableSinkOptions extends AirtableConfig {
+
+    public static final Option<Boolean> TYPECAST =
+            Options.key("typecast")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "If true, Airtable will automatically typecast 
values to match the field type.");
+
+    public static final Option<Integer> BATCH_SIZE =
+            Options.key("batch_size")
+                    .intType()
+                    .defaultValue(10)
+                    .withDescription(
+                            "Number of records per API request, maximum 10 per 
Airtable API limit.");
+}
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/AirtableSource.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/AirtableSource.java
new file mode 100644
index 0000000000..085364b57a
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/AirtableSource.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.airtable.source;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.constants.JobMode;
+import 
org.apache.seatunnel.connectors.seatunnel.airtable.source.config.AirtableSourceOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.airtable.source.config.AirtableSourceParameter;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import 
org.apache.seatunnel.connectors.seatunnel.http.config.HttpPaginationType;
+import org.apache.seatunnel.connectors.seatunnel.http.config.PageInfo;
+import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSource;
+
+public class AirtableSource extends HttpSource {
+
+    public static final String PLUGIN_NAME = "Airtable";
+
+    private final AirtableSourceParameter airtableSourceParameter = new 
AirtableSourceParameter();
+    private final int requestIntervalMs;
+    private final int rateLimitBackoffMs;
+    private final int rateLimitMaxRetries;
+
+    public AirtableSource(ReadonlyConfig pluginConfig) {
+        super(pluginConfig);
+        airtableSourceParameter.buildWithConfig(pluginConfig);
+        this.requestIntervalMs = 
pluginConfig.get(AirtableSourceOptions.REQUEST_INTERVAL_MS);
+        this.rateLimitBackoffMs = 
pluginConfig.get(AirtableSourceOptions.RATE_LIMIT_BACKOFF_MS);
+        this.rateLimitMaxRetries = 
pluginConfig.get(AirtableSourceOptions.RATE_LIMIT_MAX_RETRIES);
+        if (this.pageInfo == null) {
+            PageInfo info = new PageInfo();
+            info.setPageType(HttpPaginationType.CURSOR.getCode());
+            info.setPageCursorFieldName("offset");
+            info.setPageCursorResponseField("$.offset");
+            info.setUsePlaceholderReplacement(false);
+            // Avoid NPE in HttpSourceReader.updateRequestParam for cursor 
pagination
+            // (pageIndex is unused for cursor mode but referenced 
defensively).
+            info.setPageIndex(0L);
+            this.pageInfo = info;
+        }
+    }
+
+    @Override
+    public String getPluginName() {
+        return PLUGIN_NAME;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        if (JobMode.BATCH.equals(jobContext.getJobMode())) {
+            return Boundedness.BOUNDED;
+        }
+        throw new UnsupportedOperationException(
+                "Airtable source connector not support unbounded operation");
+    }
+
+    @Override
+    public AbstractSingleSplitReader<SeaTunnelRow> createReader(
+            SingleSplitReaderContext readerContext) throws Exception {
+        return new AirtableSourceReader(
+                airtableSourceParameter,
+                readerContext,
+                deserializationSchema,
+                jsonField,
+                contentField,
+                pageInfo,
+                requestIntervalMs,
+                rateLimitBackoffMs,
+                rateLimitMaxRetries);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/AirtableSourceFactory.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/AirtableSourceFactory.java
new file mode 100644
index 0000000000..6dd25dca90
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/AirtableSourceFactory.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.airtable.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.options.ConnectorCommonOptions;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import 
org.apache.seatunnel.connectors.seatunnel.airtable.source.config.AirtableSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceFactory;
+
+import com.google.auto.service.AutoService;
+
+import java.io.Serializable;
+
+@AutoService(Factory.class)
+public class AirtableSourceFactory extends HttpSourceFactory {
+
+    @Override
+    public String factoryIdentifier() {
+        return "Airtable";
+    }
+
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        return () -> (SeaTunnelSource<T, SplitT, StateT>) new 
AirtableSource(context.getOptions());
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(
+                        AirtableSourceOptions.TOKEN,
+                        AirtableSourceOptions.BASE_ID,
+                        AirtableSourceOptions.TABLE)
+                .optional(
+                        AirtableSourceOptions.API_BASE_URL,
+                        AirtableSourceOptions.VIEW,
+                        AirtableSourceOptions.FIELDS,
+                        AirtableSourceOptions.FILTER_BY_FORMULA,
+                        AirtableSourceOptions.MAX_RECORDS,
+                        AirtableSourceOptions.PAGE_SIZE,
+                        AirtableSourceOptions.SORT,
+                        AirtableSourceOptions.CELL_FORMAT,
+                        AirtableSourceOptions.RETURN_FIELDS_BY_FIELD_ID,
+                        AirtableSourceOptions.RECORD_METADATA,
+                        AirtableSourceOptions.TIME_ZONE,
+                        AirtableSourceOptions.USER_LOCALE,
+                        AirtableSourceOptions.OFFSET,
+                        AirtableSourceOptions.REQUEST_INTERVAL_MS,
+                        AirtableSourceOptions.RATE_LIMIT_BACKOFF_MS,
+                        AirtableSourceOptions.RATE_LIMIT_MAX_RETRIES,
+                        // Base HTTP options (aligned with 
HttpSourceFactory.getHttpBuilder)
+                        HttpSourceOptions.HEADERS,
+                        HttpSourceOptions.BODY,
+                        HttpSourceOptions.FORMAT,
+                        HttpSourceOptions.PAGEING,
+                        HttpSourceOptions.JSON_FIELD,
+                        HttpSourceOptions.CONTENT_FIELD,
+                        HttpSourceOptions.POLL_INTERVAL_MILLS,
+                        HttpSourceOptions.RETRY,
+                        HttpSourceOptions.RETRY_BACKOFF_MULTIPLIER_MS,
+                        HttpSourceOptions.RETRY_BACKOFF_MAX_MS,
+                        HttpSourceOptions.JSON_FILED_MISSED_RETURN_NULL)
+                .conditional(
+                        HttpSourceOptions.FORMAT,
+                        HttpConfig.ResponseFormat.JSON,
+                        ConnectorCommonOptions.SCHEMA)
+                .build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/AirtableSourceReader.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/AirtableSourceReader.java
new file mode 100644
index 0000000000..8f22b7a50b
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/AirtableSourceReader.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.airtable.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
+import org.apache.seatunnel.connectors.seatunnel.http.config.JsonField;
+import org.apache.seatunnel.connectors.seatunnel.http.config.PageInfo;
+import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceReader;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class AirtableSourceReader extends HttpSourceReader {
+
+    private static final int STATUS_TOO_MANY_REQUESTS = 429;
+    private static final long MAX_BACKOFF_MILLIS = 300000L;
+
+    private final int requestIntervalMs;
+    private final int rateLimitBackoffMs;
+    private final int rateLimitMaxRetries;
+    private long lastRequestTimeMillis = 0L;
+
+    public AirtableSourceReader(
+            HttpParameter httpParameter,
+            SingleSplitReaderContext context,
+            DeserializationSchema<SeaTunnelRow> deserializationSchema,
+            JsonField jsonField,
+            String contentJson,
+            PageInfo pageInfo,
+            int requestIntervalMs,
+            int rateLimitBackoffMs,
+            int rateLimitMaxRetries) {
+        super(httpParameter, context, deserializationSchema, jsonField, 
contentJson, pageInfo);
+        this.requestIntervalMs = Math.max(0, requestIntervalMs);
+        this.rateLimitBackoffMs = Math.max(0, rateLimitBackoffMs);
+        this.rateLimitMaxRetries = Math.max(0, rateLimitMaxRetries);
+    }
+
+    @Override
+    protected HttpResponse executeRequest() throws Exception {
+        int retryCount = 0;
+        while (true) {
+            waitForRequestSlot();
+            HttpResponse response = doExecuteRequest();
+            if (response.getCode() == STATUS_TOO_MANY_REQUESTS
+                    && retryCount < rateLimitMaxRetries) {
+                retryCount += 1;
+                long backoffMillis = calculateBackoffMillis(retryCount);
+                log.warn(
+                        "Airtable API rate limit reached, retry {}/{} after {} 
ms",
+                        retryCount,
+                        rateLimitMaxRetries,
+                        backoffMillis);
+                try {
+                    Thread.sleep(backoffMillis);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new RuntimeException(e);
+                }
+                continue;
+            }
+            return response;
+        }
+    }
+
+    private HttpResponse doExecuteRequest() throws Exception {
+        return httpClient.execute(
+                this.httpParameter.getUrl(),
+                this.httpParameter.getMethod().getMethod(),
+                this.httpParameter.getHeaders(),
+                this.httpParameter.getParams(),
+                this.httpParameter.getBody(),
+                this.httpParameter.isKeepParamsAsForm());
+    }
+
+    private void waitForRequestSlot() {
+        if (requestIntervalMs <= 0) {
+            return;
+        }
+        long now = System.currentTimeMillis();
+        long elapsed = now - lastRequestTimeMillis;
+        if (elapsed < requestIntervalMs) {
+            try {
+                Thread.sleep(requestIntervalMs - elapsed);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException(e);
+            }
+        }
+        lastRequestTimeMillis = System.currentTimeMillis();
+    }
+
+    private long calculateBackoffMillis(int retryCount) {
+        if (rateLimitBackoffMs <= 0) {
+            return 0L;
+        }
+        long exponential = 1L << Math.min(20, Math.max(0, retryCount - 1));
+        long waitMillis = rateLimitBackoffMs * exponential;
+        return Math.min(waitMillis, MAX_BACKOFF_MILLIS);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/config/AirtableSourceOptions.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/config/AirtableSourceOptions.java
new file mode 100644
index 0000000000..0fff930355
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/config/AirtableSourceOptions.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.airtable.source.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import 
org.apache.seatunnel.connectors.seatunnel.airtable.config.AirtableConfig;
+
+import java.util.List;
+
+public class AirtableSourceOptions extends AirtableConfig {
+
+    public static final Option<String> VIEW =
+            Options.key("view")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The name or ID of a view");
+
+    public static final Option<List<String>> FIELDS =
+            Options.key("fields")
+                    .listType()
+                    .noDefaultValue()
+                    .withDescription("The list of field names to include");
+
+    public static final Option<String> FILTER_BY_FORMULA =
+            Options.key("filter_by_formula")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Airtable filterByFormula expression");
+
+    public static final Option<Integer> MAX_RECORDS =
+            Options.key("max_records")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("Maximum number of records to return, 
must be greater than 0");
+
+    public static final Option<Integer> PAGE_SIZE =
+            Options.key("page_size")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("Number of records per page, must be in 
range [1, 100]");
+
+    public static final Option<String> SORT =
+            Options.key("sort")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Sort definition JSON array, e.g. 
[{\"field\":\"Name\",\"direction\":\"asc\"}]");
+
+    public static final Option<String> CELL_FORMAT =
+            Options.key("cell_format")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("cellFormat value, e.g. json or string");
+
+    public static final Option<Boolean> RETURN_FIELDS_BY_FIELD_ID =
+            Options.key("return_fields_by_field_id")
+                    .booleanType()
+                    .noDefaultValue()
+                    .withDescription("Return fields by field ID instead of 
field name");
+
+    public static final Option<List<String>> RECORD_METADATA =
+            Options.key("record_metadata")
+                    .listType()
+                    .noDefaultValue()
+                    .withDescription("Record metadata to return, e.g. 
[\"commentCount\"]");
+
+    public static final Option<String> TIME_ZONE =
+            Options.key("time_zone")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The time zone for cell values");
+
+    public static final Option<String> USER_LOCALE =
+            Options.key("user_locale")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The user locale for cell values");
+
+    public static final Option<String> OFFSET =
+            Options.key("offset")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Pagination offset returned by Airtable");
+}
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/config/AirtableSourceParameter.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/config/AirtableSourceParameter.java
new file mode 100644
index 0000000000..811550dfca
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/config/AirtableSourceParameter.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.airtable.source.config;
+
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.seatunnel.shade.com.google.common.base.Strings;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.airtable.config.AirtableConfig;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpRequestMethod;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AirtableSourceParameter extends HttpParameter {
+    private static final String LIST_RECORDS_SUFFIX = "/listRecords";
+
+    @Override
+    public void buildWithConfig(ReadonlyConfig pluginConfig) {
+        super.buildWithConfig(pluginConfig);
+        String baseId = pluginConfig.get(AirtableSourceOptions.BASE_ID);
+        String table = pluginConfig.get(AirtableSourceOptions.TABLE);
+        String apiBaseUrl =
+                pluginConfig
+                        .getOptional(AirtableSourceOptions.API_BASE_URL)
+                        .orElse(AirtableConfig.DEFAULT_API_BASE_URL);
+
+        this.setUrl(AirtableConfig.buildBaseUrl(apiBaseUrl, baseId, table) + 
LIST_RECORDS_SUFFIX);
+        this.setMethod(HttpRequestMethod.POST);
+
+        String token = pluginConfig.get(AirtableSourceOptions.TOKEN);
+        this.setHeaders(AirtableConfig.buildAuthHeaders(token, getHeaders()));
+
+        this.setBody(buildRequestBody(pluginConfig, this.getBody()));
+    }
+
+    private String buildRequestBody(ReadonlyConfig pluginConfig, String 
existingBody) {
+        Map<String, Object> body = new HashMap<>();
+        if (!Strings.isNullOrEmpty(existingBody)) {
+            try {
+                Map<String, Object> parsed =
+                        JsonUtils.parseObject(
+                                existingBody, new TypeReference<Map<String, 
Object>>() {});
+                if (parsed != null) {
+                    body.putAll(parsed);
+                }
+            } catch (Exception ignored) {
+                // Ignore non-JSON body and build Airtable request body from 
options.
+            }
+        }
+
+        checkBodyConflicts(pluginConfig, body);
+
+        pluginConfig
+                .getOptional(AirtableSourceOptions.FIELDS)
+                .ifPresent(value -> body.put("fields", value));
+        pluginConfig
+                .getOptional(AirtableSourceOptions.FILTER_BY_FORMULA)
+                .ifPresent(value -> body.put("filterByFormula", value));
+        pluginConfig
+                .getOptional(AirtableSourceOptions.MAX_RECORDS)
+                .ifPresent(value -> body.put("maxRecords", value));
+        pluginConfig
+                .getOptional(AirtableSourceOptions.PAGE_SIZE)
+                .ifPresent(value -> body.put("pageSize", value));
+        pluginConfig
+                .getOptional(AirtableSourceOptions.SORT)
+                .ifPresent(value -> body.put("sort", parseSort(value)));
+        pluginConfig
+                .getOptional(AirtableSourceOptions.VIEW)
+                .ifPresent(value -> body.put("view", value));
+        pluginConfig
+                .getOptional(AirtableSourceOptions.CELL_FORMAT)
+                .ifPresent(value -> body.put("cellFormat", value));
+        pluginConfig
+                .getOptional(AirtableSourceOptions.RETURN_FIELDS_BY_FIELD_ID)
+                .ifPresent(value -> body.put("returnFieldsByFieldId", value));
+        pluginConfig
+                .getOptional(AirtableSourceOptions.RECORD_METADATA)
+                .ifPresent(value -> body.put("recordMetadata", value));
+        pluginConfig
+                .getOptional(AirtableSourceOptions.TIME_ZONE)
+                .ifPresent(value -> body.put("timeZone", value));
+        pluginConfig
+                .getOptional(AirtableSourceOptions.USER_LOCALE)
+                .ifPresent(value -> body.put("userLocale", value));
+
+        // Keep offset key for key-based cursor replacement in 
HttpSourceReader.
+        // Dedicated option wins; otherwise preserve body offset if present.
+        if 
(pluginConfig.getOptional(AirtableSourceOptions.OFFSET).isPresent()) {
+            body.put("offset", pluginConfig.get(AirtableSourceOptions.OFFSET));
+        } else {
+            body.putIfAbsent("offset", null);
+        }
+
+        return JsonUtils.toJsonString(body);
+    }
+
+    private void checkBodyConflicts(ReadonlyConfig pluginConfig, Map<String, 
Object> body) {
+        if (body.isEmpty()) {
+            return;
+        }
+        List<String> conflicts = new ArrayList<>();
+        checkConflict(pluginConfig, body, AirtableSourceOptions.FIELDS, 
"fields", conflicts);
+        checkConflict(
+                pluginConfig,
+                body,
+                AirtableSourceOptions.FILTER_BY_FORMULA,
+                "filterByFormula",
+                conflicts);
+        checkConflict(
+                pluginConfig, body, AirtableSourceOptions.MAX_RECORDS, 
"maxRecords", conflicts);
+        checkConflict(pluginConfig, body, AirtableSourceOptions.PAGE_SIZE, 
"pageSize", conflicts);
+        checkConflict(pluginConfig, body, AirtableSourceOptions.SORT, "sort", 
conflicts);
+        checkConflict(pluginConfig, body, AirtableSourceOptions.VIEW, "view", 
conflicts);
+        checkConflict(
+                pluginConfig, body, AirtableSourceOptions.CELL_FORMAT, 
"cellFormat", conflicts);
+        checkConflict(
+                pluginConfig,
+                body,
+                AirtableSourceOptions.RETURN_FIELDS_BY_FIELD_ID,
+                "returnFieldsByFieldId",
+                conflicts);
+        checkConflict(
+                pluginConfig,
+                body,
+                AirtableSourceOptions.RECORD_METADATA,
+                "recordMetadata",
+                conflicts);
+        checkConflict(pluginConfig, body, AirtableSourceOptions.TIME_ZONE, 
"timeZone", conflicts);
+        checkConflict(
+                pluginConfig, body, AirtableSourceOptions.USER_LOCALE, 
"userLocale", conflicts);
+        checkConflict(pluginConfig, body, AirtableSourceOptions.OFFSET, 
"offset", conflicts);
+        if (!conflicts.isEmpty()) {
+            throw new IllegalArgumentException(
+                    "Conflict between 'body' and dedicated Airtable options 
for keys: "
+                            + String.join(", ", conflicts)
+                            + ". Please use either the dedicated option or 
'body', not both.");
+        }
+    }
+
+    private void checkConflict(
+            ReadonlyConfig pluginConfig,
+            Map<String, Object> body,
+            Option<?> option,
+            String bodyKey,
+            List<String> conflicts) {
+        if (pluginConfig.getOptional(option).isPresent() && 
body.containsKey(bodyKey)) {
+            conflicts.add(bodyKey + " (option: " + option.key() + ")");
+        }
+    }
+
+    private Object parseSort(String sortJson) {
+        try {
+            return JsonUtils.parseObject(
+                    sortJson, new TypeReference<List<Map<String, Object>>>() 
{});
+        } catch (RuntimeException e) {
+            throw new IllegalArgumentException("Invalid sort JSON: " + 
sortJson, e);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/test/java/org/apache/seatunnel/connectors/seatunnel/airtable/AirtableFactoryTest.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/test/java/org/apache/seatunnel/connectors/seatunnel/airtable/AirtableFactoryTest.java
new file mode 100644
index 0000000000..eebe34ff6d
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/test/java/org/apache/seatunnel/connectors/seatunnel/airtable/AirtableFactoryTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.airtable;
+
+import 
org.apache.seatunnel.connectors.seatunnel.airtable.sink.AirtableSinkFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.airtable.source.AirtableSourceFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class AirtableFactoryTest {
+
+    @Test
+    public void optionRule() {
+        Assertions.assertNotNull((new AirtableSourceFactory()).optionRule());
+        Assertions.assertNotNull((new AirtableSinkFactory()).optionRule());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/test/java/org/apache/seatunnel/connectors/seatunnel/airtable/sink/AirtableSinkWriterTest.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/test/java/org/apache/seatunnel/connectors/seatunnel/airtable/sink/AirtableSinkWriterTest.java
new file mode 100644
index 0000000000..236c0821f8
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/test/java/org/apache/seatunnel/connectors/seatunnel/airtable/sink/AirtableSinkWriterTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.airtable.sink;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.http.client.HttpClientProvider;
+import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class AirtableSinkWriterTest {
+
+    @Mock private HttpClientProvider httpClient;
+
+    private SeaTunnelRowType rowType;
+
+    @BeforeEach
+    public void setUp() {
+        MockitoAnnotations.openMocks(this);
+        rowType =
+                new SeaTunnelRowType(
+                        new String[] {"Name", "Age"},
+                        new SeaTunnelDataType[] {BasicType.STRING_TYPE, 
BasicType.INT_TYPE});
+    }
+
+    private AirtableSinkWriter createWriter(int batchSize, boolean typecast) 
throws Exception {
+        HttpParameter param = new HttpParameter();
+        param.setUrl("https://api.airtable.com/v0/appXXX/tblYYY";);
+        Map<String, String> headers = new HashMap<>();
+        headers.put("Authorization", "Bearer test_token");
+        headers.put("Content-Type", "application/json");
+        param.setHeaders(headers);
+
+        AirtableSinkWriter writer =
+                new AirtableSinkWriter(rowType, param, batchSize, typecast, 0, 
0, 3);
+
+        Field field = AirtableSinkWriter.class.getDeclaredField("httpClient");
+        field.setAccessible(true);
+        field.set(writer, httpClient);
+        return writer;
+    }
+
+    @Test
+    public void testBatchWriteBodyFormat() throws Exception {
+        when(httpClient.doPost(anyString(), any(), anyString()))
+                .thenReturn(new HttpResponse(200, "{}"));
+
+        AirtableSinkWriter writer = createWriter(2, false);
+        writer.write(new SeaTunnelRow(new Object[] {"Alice", 30}));
+        writer.write(new SeaTunnelRow(new Object[] {"Bob", 25}));
+
+        ArgumentCaptor<String> bodyCaptor = 
ArgumentCaptor.forClass(String.class);
+        verify(httpClient, times(1)).doPost(anyString(), any(), 
bodyCaptor.capture());
+
+        ObjectMapper mapper = new ObjectMapper();
+        JsonNode root = mapper.readTree(bodyCaptor.getValue());
+        Assertions.assertTrue(root.has("records"));
+        Assertions.assertFalse(root.has("typecast"));
+
+        JsonNode records = root.get("records");
+        Assertions.assertEquals(2, records.size());
+        Assertions.assertTrue(records.get(0).has("fields"));
+        Assertions.assertEquals("Alice", 
records.get(0).get("fields").get("Name").asText());
+    }
+
+    @Test
+    public void testThrowsAfterMaxRetries() throws Exception {
+        when(httpClient.doPost(anyString(), any(), anyString()))
+                .thenReturn(new HttpResponse(429, 
"{\"error\":{\"type\":\"RATE_LIMIT\"}}"));
+
+        AirtableSinkWriter writer = createWriter(1, false);
+
+        Assertions.assertThrows(
+                IOException.class,
+                () -> writer.write(new SeaTunnelRow(new Object[] {"Alice", 
30})));
+        // 1 initial + 3 retries = 4 calls
+        verify(httpClient, times(4)).doPost(anyString(), any(), anyString());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/test/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/AirtableSourceReaderTest.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/test/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/AirtableSourceReaderTest.java
new file mode 100644
index 0000000000..a26be6763f
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/test/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/AirtableSourceReaderTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.airtable.source;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import 
org.apache.seatunnel.connectors.seatunnel.http.client.HttpClientProvider;
+import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpRequestMethod;
+import 
org.apache.seatunnel.connectors.seatunnel.http.source.SimpleTextDeserializationSchema;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class AirtableSourceReaderTest {
+
+    @Mock private SingleSplitReaderContext context;
+    @Mock private HttpClientProvider httpClient;
+
+    private HttpParameter parameter;
+    private SimpleTextDeserializationSchema schema;
+
+    @BeforeEach
+    public void setUp() {
+        MockitoAnnotations.openMocks(this);
+        parameter = new HttpParameter();
+        
parameter.setUrl("https://api.airtable.com/v0/appBase/table/listRecords";);
+        parameter.setMethod(HttpRequestMethod.POST);
+
+        SeaTunnelRowType rowType =
+                new SeaTunnelRowType(
+                        new String[] {"content"}, new SeaTunnelDataType[] 
{BasicType.STRING_TYPE});
+        schema = new SimpleTextDeserializationSchema(rowType);
+    }
+
+    private AirtableSourceReader createReader(int rateLimitMaxRetries) {
+        AirtableSourceReader reader =
+                new AirtableSourceReader(
+                        parameter, context, schema, null, null, null, 0, 0, 
rateLimitMaxRetries);
+        reader.setHttpClient(httpClient);
+        return reader;
+    }
+
+    @Test
+    public void testRetryOn429ThenSuccess() throws Exception {
+        when(httpClient.execute(anyString(), anyString(), any(), any(), any(), 
anyBoolean()))
+                .thenReturn(new HttpResponse(429, 
"{\"error\":{\"type\":\"RATE_LIMIT\"}}"))
+                .thenReturn(
+                        new HttpResponse(
+                                200,
+                                
"{\"records\":[{\"id\":\"rec1\",\"fields\":{\"Name\":\"Alice\"}}]}"));
+
+        AirtableSourceReader reader = createReader(2);
+        HttpResponse response = reader.executeRequest();
+
+        Assertions.assertEquals(200, response.getCode());
+        verify(httpClient, times(2))
+                .execute(anyString(), anyString(), any(), any(), any(), 
anyBoolean());
+    }
+
+    @Test
+    public void testStopRetryAfterMaxRetries() throws Exception {
+        when(httpClient.execute(anyString(), anyString(), any(), any(), any(), 
anyBoolean()))
+                .thenReturn(new HttpResponse(429, 
"{\"error\":{\"type\":\"RATE_LIMIT\"}}"));
+
+        AirtableSourceReader reader = createReader(1);
+        HttpResponse response = reader.executeRequest();
+
+        Assertions.assertEquals(429, response.getCode());
+        // 1 initial + 1 retry = 2 calls
+        verify(httpClient, times(2))
+                .execute(anyString(), anyString(), any(), any(), any(), 
anyBoolean());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
index 42486924e5..c7ab3c9c00 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
@@ -124,14 +124,7 @@ public class HttpSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
     }
 
     public void pollAndCollectData(Collector<SeaTunnelRow> output) throws 
Exception {
-        HttpResponse response =
-                httpClient.execute(
-                        this.httpParameter.getUrl(),
-                        this.httpParameter.getMethod().getMethod(),
-                        this.httpParameter.getHeaders(),
-                        this.httpParameter.getParams(),
-                        this.httpParameter.getBody(),
-                        this.httpParameter.isKeepParamsAsForm());
+        HttpResponse response = executeRequest();
         if (response.getCode() >= 200 && response.getCode() <= 207) {
             String content = response.getContent();
             if (!Strings.isNullOrEmpty(content)) {
@@ -160,6 +153,16 @@ public class HttpSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
         }
     }
 
+    protected HttpResponse executeRequest() throws Exception {
+        return httpClient.execute(
+                this.httpParameter.getUrl(),
+                this.httpParameter.getMethod().getMethod(),
+                this.httpParameter.getHeaders(),
+                this.httpParameter.getParams(),
+                this.httpParameter.getBody(),
+                this.httpParameter.isKeepParamsAsForm());
+    }
+
     @VisibleForTesting
     public void updateRequestParam(PageInfo pageInfo, boolean 
usePlaceholderReplacement) {
         // 1. keep page param as http param
diff --git a/seatunnel-connectors-v2/connector-http/pom.xml 
b/seatunnel-connectors-v2/connector-http/pom.xml
index e32339a5f0..6842f7c413 100644
--- a/seatunnel-connectors-v2/connector-http/pom.xml
+++ b/seatunnel-connectors-v2/connector-http/pom.xml
@@ -42,6 +42,7 @@
         <module>connector-http-github</module>
         <module>connector-http-notion</module>
         <module>connector-http-persistiq</module>
+        <module>connector-http-airtable</module>
     </modules>
 
 </project>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index 88d7e2c911..4559cd6663 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -490,6 +490,12 @@
                     <version>${project.version}</version>
                     <scope>provided</scope>
                 </dependency>
+                <dependency>
+                    <groupId>org.apache.seatunnel</groupId>
+                    <artifactId>connector-http-airtable</artifactId>
+                    <version>${project.version}</version>
+                    <scope>provided</scope>
+                </dependency>
                 <dependency>
                     <groupId>org.apache.seatunnel</groupId>
                     <artifactId>connector-rabbitmq</artifactId>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/pom.xml
index 97e435e092..67b8c17a1a 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/pom.xml
@@ -117,6 +117,12 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-http-airtable</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.mock-server</groupId>
             <artifactId>mockserver-netty-no-dependencies</artifactId>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
index 1d5c0f0afc..30eca3211b 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
@@ -361,6 +361,18 @@ public class HttpIT extends TestSuiteBase implements 
TestResource {
         Container.ExecResult execResult21 =
                 container.executeJob("/http_page_cursor_num_assert.conf");
         Assertions.assertEquals(0, execResult21.getExitCode());
+
+        // http airtable source
+        Container.ExecResult execResult22 = 
container.executeJob("/airtable_json_to_assert.conf");
+        Assertions.assertEquals(0, execResult22.getExitCode());
+    }
+
+    @TestTemplate
+    public void testFakeToAirtableSink(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult = 
container.executeJob("/fake_to_airtable.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        
mockServerClient.verify(request().withPath("/v0/appTEST123/SinkTable").withMethod("POST"));
     }
 
     @TestTemplate
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/airtable_json_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/airtable_json_to_assert.conf
new file mode 100644
index 0000000000..d4cb4b46cd
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/airtable_json_to_assert.conf
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Airtable {
+    plugin_output = "http"
+    api_base_url = "http://mockserver:1080";
+    token = "test_token"
+    base_id = "appTEST123"
+    table = "TestTable"
+    format = "json"
+    content_field = "$.records[*].fields"
+    page_size = 2
+    request_interval_ms = 0
+    schema = {
+      fields {
+        Name = string
+        Age = int
+        Status = string
+      }
+    }
+  }
+}
+
+sink {
+  Assert {
+    plugin_input = "http"
+    rules {
+      row_rules = [
+        {
+          rule_type = MIN_ROW
+          rule_value = 5
+        },
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        }
+      ],
+      field_rules = [
+        {
+          field_name = Name
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = Age
+          field_type = int
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = Status
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/fake_to_airtable.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/fake_to_airtable.conf
new file mode 100644
index 0000000000..98350dd5cc
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/fake_to_airtable.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        Name = string
+        Age = int
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = ["Alice", 30]
+      },
+      {
+        kind = INSERT
+        fields = ["Bob", 25]
+      }
+    ]
+  }
+}
+
+sink {
+  Airtable {
+    api_base_url = "http://mockserver:1080";
+    token = "test_token"
+    base_id = "appTEST123"
+    table = "SinkTable"
+    batch_size = 10
+    request_interval_ms = 0
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
index 662fd71c28..f948a23d2d 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
@@ -4828,5 +4828,120 @@
         ]
       }
     }
+  },
+  {
+    "httpRequest": {
+      "method": "POST",
+      "path": "/v0/appTEST123/TestTable/listRecords",
+      "body": {
+        "type": "JSON",
+        "json": "{\"pageSize\":2}",
+        "matchType": "STRICT"
+      }
+    },
+    "httpResponse": {
+      "body": {
+        "records": [
+          {
+            "id": "rec001",
+            "createdTime": "2024-01-01T00:00:00.000Z",
+            "fields": {
+              "Name": "Name001",
+              "Age": 21,
+              "Status": "Active"
+            }
+          },
+          {
+            "id": "rec002",
+            "createdTime": "2024-01-02T00:00:00.000Z",
+            "fields": {
+              "Name": "Name002",
+              "Age": 22,
+              "Status": "Inactive"
+            }
+          }
+        ],
+        "offset": "itrPAGE2/recPAGE2"
+      }
+    }
+  },
+  {
+    "httpRequest": {
+      "method": "POST",
+      "path": "/v0/appTEST123/TestTable/listRecords",
+      "body": {
+        "type": "JSON",
+        "json": "{\"offset\":\"itrPAGE2/recPAGE2\",\"pageSize\":2}",
+        "matchType": "STRICT"
+      }
+    },
+    "httpResponse": {
+      "body": {
+        "records": [
+          {
+            "id": "rec003",
+            "createdTime": "2024-01-03T00:00:00.000Z",
+            "fields": {
+              "Name": "Name003",
+              "Age": 23,
+              "Status": "Active"
+            }
+          },
+          {
+            "id": "rec004",
+            "createdTime": "2024-01-04T00:00:00.000Z",
+            "fields": {
+              "Name": "Name004",
+              "Age": 24,
+              "Status": "Inactive"
+            }
+          }
+        ],
+        "offset": "itrPAGE3/recPAGE3"
+      }
+    }
+  },
+  {
+    "httpRequest": {
+      "method": "POST",
+      "path": "/v0/appTEST123/TestTable/listRecords",
+      "body": {
+        "type": "JSON",
+        "json": "{\"offset\":\"itrPAGE3/recPAGE3\",\"pageSize\":2}",
+        "matchType": "STRICT"
+      }
+    },
+    "httpResponse": {
+      "body": {
+        "records": [
+          {
+            "id": "rec005",
+            "createdTime": "2024-01-05T00:00:00.000Z",
+            "fields": {
+              "Name": "Name005",
+              "Age": 25,
+              "Status": "Active"
+            }
+          }
+        ]
+      }
+    }
+  },
+  {
+    "httpRequest": {
+      "method": "POST",
+      "path": "/v0/appTEST123/SinkTable"
+    },
+    "httpResponse": {
+      "statusCode": 200,
+      "body": {
+        "records": [
+          {
+            "id": "recNew001",
+            "fields": {}
+          }
+        ]
+      }
+    }
   }
 ]
\ No newline at end of file

Reply via email to