This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c15032c035 [Feature][Connector-V2] Add Airtable source and sink
(#10469)
c15032c035 is described below
commit c15032c035a3b95c4bb6b260ef5a4921decbbfaa
Author: xiaosiyuan <[email protected]>
AuthorDate: Tue Mar 3 16:24:37 2026 +0800
[Feature][Connector-V2] Add Airtable source and sink (#10469)
---
config/plugin_config | 1 +
.../changelog/connector-http-airtable.md | 6 +
docs/en/connectors/sink/Airtable.md | 88 +++++++++
docs/en/connectors/source/Airtable.md | 182 ++++++++++++++++++
.../changelog/connector-http-airtable.md | 6 +
docs/zh/connectors/sink/Airtable.md | 88 +++++++++
docs/zh/connectors/source/Airtable.md | 182 ++++++++++++++++++
plugin-mapping.properties | 2 +
.../{ => connector-http-airtable}/pom.xml | 29 ++-
.../seatunnel/airtable/config/AirtableConfig.java | 115 ++++++++++++
.../seatunnel/airtable/sink/AirtableSink.java | 90 +++++++++
.../airtable/sink/AirtableSinkFactory.java | 58 ++++++
.../airtable/sink/AirtableSinkWriter.java | 204 +++++++++++++++++++++
.../airtable/sink/config/AirtableSinkOptions.java | 39 ++++
.../seatunnel/airtable/source/AirtableSource.java | 88 +++++++++
.../airtable/source/AirtableSourceFactory.java | 92 ++++++++++
.../airtable/source/AirtableSourceReader.java | 120 ++++++++++++
.../source/config/AirtableSourceOptions.java | 100 ++++++++++
.../source/config/AirtableSourceParameter.java | 181 ++++++++++++++++++
.../seatunnel/airtable/AirtableFactoryTest.java | 33 ++++
.../airtable/sink/AirtableSinkWriterTest.java | 117 ++++++++++++
.../airtable/source/AirtableSourceReaderTest.java | 102 +++++++++++
.../seatunnel/http/source/HttpSourceReader.java | 19 +-
seatunnel-connectors-v2/connector-http/pom.xml | 1 +
seatunnel-dist/pom.xml | 6 +
.../connector-http-e2e/pom.xml | 6 +
.../seatunnel/e2e/connector/http/HttpIT.java | 12 ++
.../test/resources/airtable_json_to_assert.conf | 89 +++++++++
.../src/test/resources/fake_to_airtable.conf | 53 ++++++
.../src/test/resources/mockserver-config.json | 115 ++++++++++++
30 files changed, 2198 insertions(+), 26 deletions(-)
diff --git a/config/plugin_config b/config/plugin_config
index bae0026bd9..1a79f35c39 100644
--- a/config/plugin_config
+++ b/config/plugin_config
@@ -59,6 +59,7 @@ connector-http-myhours
connector-http-notion
connector-http-onesignal
connector-http-wechat
+connector-http-airtable
connector-hudi
connector-iceberg
connector-influxdb
diff --git a/docs/en/connectors/changelog/connector-http-airtable.md
b/docs/en/connectors/changelog/connector-http-airtable.md
new file mode 100644
index 0000000000..1cf41e674b
--- /dev/null
+++ b/docs/en/connectors/changelog/connector-http-airtable.md
@@ -0,0 +1,6 @@
+<details><summary> Change Log </summary>
+
+| Change | Commit | Version |
+| --- | --- | --- |
+
+</details>
diff --git a/docs/en/connectors/sink/Airtable.md
b/docs/en/connectors/sink/Airtable.md
new file mode 100644
index 0000000000..5c50ae016f
--- /dev/null
+++ b/docs/en/connectors/sink/Airtable.md
@@ -0,0 +1,88 @@
+import ChangeLog from '../changelog/connector-http-airtable.md';
+
+# Airtable
+
+> Airtable sink connector
+
+## Description
+
+Used to write data to Airtable.
+
+## Key Features
+
+- [ ] [exactly-once](../../introduction/concepts/connector-v2-features.md)
+- [ ] [cdc](../../introduction/concepts/connector-v2-features.md)
+- [ ] [support multiple table
write](../../introduction/concepts/connector-v2-features.md)
+
+## Options
+
+| name | type | required | default value |
+|-----------------------------|---------|----------|---------------|
+| token | String | Yes | - |
+| base_id | String | Yes | - |
+| table | String | Yes | - |
+| api_base_url | String | No | https://api.airtable.com |
+| typecast | boolean | No | false |
+| batch_size | int | No | 10 |
+| request_interval_ms | int | No | 220 |
+| rate_limit_backoff_ms | int | No | 30000 |
+| rate_limit_max_retries | int | No | 3 |
+| common-options | | No | - |
+
+### token [String]
+
+Airtable personal access token. You can create one at
https://airtable.com/create/tokens.
+
+### base_id [String]
+
+The ID of the Airtable base (starts with `app`).
+
+### table [String]
+
+The table name or table ID to write to.
+
+### api_base_url [String]
+
+Airtable API base URL. Default is `https://api.airtable.com`.
+
+### typecast [boolean]
+
+If true, Airtable will automatically convert values to match the field type.
Default false.
+
+### batch_size [int]
+
+Number of records per API request. Maximum 10 per Airtable API limit. Default
10.
+
+### request_interval_ms [int]
+
+Minimum interval in milliseconds between API requests. Default 220ms.
+
+### rate_limit_backoff_ms [int]
+
+Base backoff time in milliseconds when receiving a 429 (rate limit) response.
Default 30000ms.
+
+### rate_limit_max_retries [int]
+
+Maximum number of retries after receiving a 429 response. Default 3.
+
+### common options
+
+Sink plugin common parameters, please refer to [Sink Common
Options](../common-options/sink-common-options.md) for details.
+
+## Example
+
+```hocon
+sink {
+ Airtable {
+ token = "patXXXXXXXX.XXXXXXXX"
+ base_id = "appXXXXXXXX"
+ table = "Shipments"
+ typecast = true
+ batch_size = 10
+ }
+}
+```
+
+## Changelog
+
+<ChangeLog />
diff --git a/docs/en/connectors/source/Airtable.md
b/docs/en/connectors/source/Airtable.md
new file mode 100644
index 0000000000..61acae4983
--- /dev/null
+++ b/docs/en/connectors/source/Airtable.md
@@ -0,0 +1,182 @@
+import ChangeLog from '../changelog/connector-http-airtable.md';
+
+# Airtable
+
+> Airtable source connector
+
+## Description
+
+Used to read data from Airtable.
+
+## Key features
+
+- [x] [batch](../../introduction/concepts/connector-v2-features.md)
+- [ ] [stream](../../introduction/concepts/connector-v2-features.md)
+- [ ] [exactly-once](../../introduction/concepts/connector-v2-features.md)
+- [x] [column projection](../../introduction/concepts/connector-v2-features.md)
+- [ ] [parallelism](../../introduction/concepts/connector-v2-features.md)
+- [ ] [support user-defined
split](../../introduction/concepts/connector-v2-features.md)
+
+## Options
+
+| name | type | required | default value |
+|-----------------------------|---------|----------|---------------|
+| token | String | Yes | - |
+| base_id | String | Yes | - |
+| table | String | Yes | - |
+| api_base_url | String | No | https://api.airtable.com |
+| view | String | No | - |
+| fields | List | No | - |
+| filter_by_formula | String | No | - |
+| max_records | int | No | - |
+| page_size | int | No | - |
+| sort | String | No | - |
+| cell_format | String | No | - |
+| return_fields_by_field_id | boolean | No | - |
+| record_metadata | List | No | - |
+| time_zone | String | No | - |
+| user_locale | String | No | - |
+| request_interval_ms | int | No | 220 |
+| rate_limit_backoff_ms | int | No | 30000 |
+| rate_limit_max_retries | int | No | 3 |
+| schema | Config | No | - |
+| schema.fields | Config | No | - |
+| format | String | No | text |
+| content_field | String | No | - |
+| json_field | Config | No | - |
+| common-options | config | No | - |
+
+### token [String]
+
+Airtable personal access token. You can create one at
https://airtable.com/create/tokens.
+
+### base_id [String]
+
+The ID of the Airtable base (starts with `app`).
+
+### table [String]
+
+The table name or table ID to read from.
+
+### api_base_url [String]
+
+Airtable API base URL. Default is `https://api.airtable.com`.
+
+### view [String]
+
+The name or ID of a view in the table. Only records visible in this view will
be returned.
+
+### fields [List]
+
+A list of field names to include in the response.
+
+### filter_by_formula [String]
+
+An Airtable formula to filter records. See [Airtable formula
reference](https://support.airtable.com/docs/formula-field-reference).
+
+### max_records [int]
+
+Maximum total number of records to return.
+
+### page_size [int]
+
+Number of records per page (1-100).
+
+### sort [String]
+
+Sort definition as a JSON array, e.g. `[{"field":"Name","direction":"asc"}]`.
+
+### cell_format [String]
+
+The format for cell values, either `json` or `string`.
+
+### return_fields_by_field_id [boolean]
+
+If true, field keys in the response will be field IDs instead of field names.
+
+### record_metadata [List]
+
+Additional record metadata to return, e.g. `["commentCount"]`.
+
+### time_zone [String]
+
+The time zone for formatting date/time values.
+
+### user_locale [String]
+
+The user locale for formatting values.
+
+### request_interval_ms [int]
+
+Minimum interval in milliseconds between API requests. Default 220ms (to stay
within Airtable's 5 requests/second limit).
+
+### rate_limit_backoff_ms [int]
+
+Base backoff time in milliseconds when receiving a 429 (rate limit) response.
Default 30000ms.
+
+### rate_limit_max_retries [int]
+
+Maximum number of retries after receiving a 429 response. Default 3.
+
+### schema [Config]
+
+#### fields [Config]
+
+The schema fields of upstream data. For more details, please refer to [Schema
Feature](../../introduction/concepts/schema-feature.md).
+
+### format [String]
+
+The format of upstream data, supports `json` and `text`, default `text`.
+
+### content_field [String]
+
+JsonPath expression to extract data from the response. For Airtable, you
typically use `$.records[*].fields` to extract the fields from each record.
+
+### json_field [Config]
+
+This parameter helps you configure the schema and must be used with schema.
+
+### common options
+
+Source plugin common parameters, please refer to [Source Common
Options](../common-options/source-common-options.md) for details.
+
+## Example
+
+Read from an Airtable table and output raw text:
+
+```hocon
+source {
+ Airtable {
+ token = "patXXXXXXXX.XXXXXXXX"
+ base_id = "appXXXXXXXX"
+ table = "Shipments"
+ format = "text"
+ max_records = 10
+ }
+}
+```
+
+Read with schema and extract record fields:
+
+```hocon
+source {
+ Airtable {
+ token = "patXXXXXXXX.XXXXXXXX"
+ base_id = "appXXXXXXXX"
+ table = "Shipments"
+ content_field = "$.records[*].fields"
+ filter_by_formula = "{Status} = 'Shipped'"
+ schema = {
+ fields {
+ Name = string
+ Status = string
+ Weight = float
+ }
+ }
+ }
+}
+```
+
+## Changelog
+
+<ChangeLog />
diff --git a/docs/zh/connectors/changelog/connector-http-airtable.md
b/docs/zh/connectors/changelog/connector-http-airtable.md
new file mode 100644
index 0000000000..1cf41e674b
--- /dev/null
+++ b/docs/zh/connectors/changelog/connector-http-airtable.md
@@ -0,0 +1,6 @@
+<details><summary> Change Log </summary>
+
+| Change | Commit | Version |
+| --- | --- | --- |
+
+</details>
diff --git a/docs/zh/connectors/sink/Airtable.md
b/docs/zh/connectors/sink/Airtable.md
new file mode 100644
index 0000000000..cc8c1e49d1
--- /dev/null
+++ b/docs/zh/connectors/sink/Airtable.md
@@ -0,0 +1,88 @@
+import ChangeLog from '../changelog/connector-http-airtable.md';
+
+# Airtable
+
+> Airtable Sink 连接器
+
+## 描述
+
+用于将数据写入 Airtable。
+
+## 关键特性
+
+- [ ] [精确一次](../../introduction/concepts/connector-v2-features.md)
+- [ ] [cdc](../../introduction/concepts/connector-v2-features.md)
+- [ ] [支持多表写入](../../introduction/concepts/connector-v2-features.md)
+
+## 选项
+
+| 参数名 | 类型 | 必须 | 默认值 |
+|--------|------|------|--------|
+| token | String | 是 | - |
+| base_id | String | 是 | - |
+| table | String | 是 | - |
+| api_base_url | String | 否 | https://api.airtable.com |
+| typecast | boolean | 否 | false |
+| batch_size | int | 否 | 10 |
+| request_interval_ms | int | 否 | 220 |
+| rate_limit_backoff_ms | int | 否 | 30000 |
+| rate_limit_max_retries | int | 否 | 3 |
+| common-options | | 否 | - |
+
+### token [String]
+
+Airtable 个人访问令牌。可在 https://airtable.com/create/tokens 创建。
+
+### base_id [String]
+
+Airtable Base ID(以 `app` 开头)。
+
+### table [String]
+
+要写入的表名或表 ID。
+
+### api_base_url [String]
+
+Airtable API 基础 URL,默认 `https://api.airtable.com`。
+
+### typecast [boolean]
+
+如果为 true,Airtable 会自动将值转换为匹配的字段类型。默认 false。
+
+### batch_size [int]
+
+每次 API 请求的记录数,受 Airtable API 限制最大为 10。默认 10。
+
+### request_interval_ms [int]
+
+API 请求之间的最小间隔(毫秒),默认 220ms。
+
+### rate_limit_backoff_ms [int]
+
+收到 429(限流)响应时的基础退避时间(毫秒),默认 30000ms。
+
+### rate_limit_max_retries [int]
+
+收到 429 响应后的最大重试次数,默认 3。
+
+### common options
+
+汇插件通用参数,请参考 [Sink Common Options](../common-options/sink-common-options.md)。
+
+## 示例
+
+```hocon
+sink {
+ Airtable {
+ token = "patXXXXXXXX.XXXXXXXX"
+ base_id = "appXXXXXXXX"
+ table = "Shipments"
+ typecast = true
+ batch_size = 10
+ }
+}
+```
+
+## 变更日志
+
+<ChangeLog />
diff --git a/docs/zh/connectors/source/Airtable.md
b/docs/zh/connectors/source/Airtable.md
new file mode 100644
index 0000000000..19bbb70df1
--- /dev/null
+++ b/docs/zh/connectors/source/Airtable.md
@@ -0,0 +1,182 @@
+import ChangeLog from '../changelog/connector-http-airtable.md';
+
+# Airtable
+
+> Airtable 源连接器
+
+## 描述
+
+用于从 Airtable 读取数据。
+
+## 关键特性
+
+- [x] [批](../../introduction/concepts/connector-v2-features.md)
+- [ ] [流](../../introduction/concepts/connector-v2-features.md)
+- [ ] [精确一次](../../introduction/concepts/connector-v2-features.md)
+- [x] [列投影](../../introduction/concepts/connector-v2-features.md)
+- [ ] [并行性](../../introduction/concepts/connector-v2-features.md)
+- [ ] [支持用户自定义split](../../introduction/concepts/connector-v2-features.md)
+
+## 选项
+
+| 参数名 | 类型 | 必须 | 默认值 |
+|--------|------|------|--------|
+| token | String | 是 | - |
+| base_id | String | 是 | - |
+| table | String | 是 | - |
+| api_base_url | String | 否 | https://api.airtable.com |
+| view | String | 否 | - |
+| fields | List | 否 | - |
+| filter_by_formula | String | 否 | - |
+| max_records | int | 否 | - |
+| page_size | int | 否 | - |
+| sort | String | 否 | - |
+| cell_format | String | 否 | - |
+| return_fields_by_field_id | boolean | 否 | - |
+| record_metadata | List | 否 | - |
+| time_zone | String | 否 | - |
+| user_locale | String | 否 | - |
+| request_interval_ms | int | 否 | 220 |
+| rate_limit_backoff_ms | int | 否 | 30000 |
+| rate_limit_max_retries | int | 否 | 3 |
+| schema | Config | 否 | - |
+| schema.fields | Config | 否 | - |
+| format | String | 否 | text |
+| content_field | String | 否 | - |
+| json_field | Config | 否 | - |
+| common-options | config | 否 | - |
+
+### token [String]
+
+Airtable 个人访问令牌。可在 https://airtable.com/create/tokens 创建。
+
+### base_id [String]
+
+Airtable Base ID(以 `app` 开头)。
+
+### table [String]
+
+要读取的表名或表 ID。
+
+### api_base_url [String]
+
+Airtable API 基础 URL,默认 `https://api.airtable.com`。
+
+### view [String]
+
+视图名称或 ID,仅返回该视图中可见的记录。
+
+### fields [List]
+
+要包含在响应中的字段名列表。
+
+### filter_by_formula [String]
+
+Airtable 公式表达式,用于过滤记录。参考 [Airtable
公式文档](https://support.airtable.com/docs/formula-field-reference)。
+
+### max_records [int]
+
+返回的最大记录总数。
+
+### page_size [int]
+
+每页记录数(1-100)。
+
+### sort [String]
+
+排序定义 JSON 数组,例如 `[{"field":"Name","direction":"asc"}]`。
+
+### cell_format [String]
+
+单元格值格式,`json` 或 `string`。
+
+### return_fields_by_field_id [boolean]
+
+如果为 true,响应中的字段键将使用字段 ID 而非字段名。
+
+### record_metadata [List]
+
+要返回的额外记录元数据,例如 `["commentCount"]`。
+
+### time_zone [String]
+
+用于格式化日期/时间值的时区。
+
+### user_locale [String]
+
+用于格式化值的用户区域设置。
+
+### request_interval_ms [int]
+
+API 请求之间的最小间隔(毫秒),默认 220ms(以保持在 Airtable 每秒 5 次请求的限制内)。
+
+### rate_limit_backoff_ms [int]
+
+收到 429(限流)响应时的基础退避时间(毫秒),默认 30000ms。
+
+### rate_limit_max_retries [int]
+
+收到 429 响应后的最大重试次数,默认 3。
+
+### schema [Config]
+
+#### fields [Config]
+
+上游数据的模式字段。更多详情请参考 [Schema 特性](../../introduction/concepts/schema-feature.md)。
+
+### format [String]
+
+上游数据的格式,支持 `json` 和 `text`,默认 `text`。
+
+### content_field [String]
+
+用于从响应中提取数据的 JsonPath 表达式。对于 Airtable,通常使用 `$.records[*].fields` 来提取每条记录的字段。
+
+### json_field [Config]
+
+此参数帮助您配置模式,必须与 schema 一起使用。
+
+### common options
+
+源插件通用参数,请参考 [Source Common
Options](../common-options/source-common-options.md)。
+
+## 示例
+
+读取 Airtable 表并输出原始文本:
+
+```hocon
+source {
+ Airtable {
+ token = "patXXXXXXXX.XXXXXXXX"
+ base_id = "appXXXXXXXX"
+ table = "Shipments"
+ format = "text"
+ max_records = 10
+ }
+}
+```
+
+指定 schema 并提取记录字段:
+
+```hocon
+source {
+ Airtable {
+ token = "patXXXXXXXX.XXXXXXXX"
+ base_id = "appXXXXXXXX"
+ table = "Shipments"
+ content_field = "$.records[*].fields"
+ filter_by_formula = "{Status} = 'Shipped'"
+ schema = {
+ fields {
+ Name = string
+ Status = string
+ Weight = float
+ }
+ }
+ }
+}
+```
+
+## 变更日志
+
+<ChangeLog />
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 7c0c652a52..e45b0082d1 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -98,6 +98,8 @@ seatunnel.source.Jira = connector-http-jira
seatunnel.source.Gitlab = connector-http-gitlab
seatunnel.source.Github = connector-http-github
seatunnel.source.Notion = connector-http-notion
+seatunnel.source.Airtable = connector-http-airtable
+seatunnel.sink.Airtable = connector-http-airtable
seatunnel.sink.RabbitMQ = connector-rabbitmq
seatunnel.source.RabbitMQ = connector-rabbitmq
seatunnel.source.OpenMldb = connector-openmldb
diff --git a/seatunnel-connectors-v2/connector-http/pom.xml
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/pom.xml
similarity index 61%
copy from seatunnel-connectors-v2/connector-http/pom.xml
copy to seatunnel-connectors-v2/connector-http/connector-http-airtable/pom.xml
index e32339a5f0..ec734bf3be 100644
--- a/seatunnel-connectors-v2/connector-http/pom.xml
+++ b/seatunnel-connectors-v2/connector-http/connector-http-airtable/pom.xml
@@ -22,26 +22,19 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-connectors-v2</artifactId>
+ <artifactId>connector-http</artifactId>
<version>${revision}</version>
</parent>
- <artifactId>connector-http</artifactId>
- <packaging>pom</packaging>
- <name>SeaTunnel : Connectors V2 : Http :</name>
- <modules>
- <module>connector-http-base</module>
- <module>connector-http-feishu</module>
- <module>connector-http-wechat</module>
- <module>connector-http-myhours</module>
- <module>connector-http-lemlist</module>
- <module>connector-http-klaviyo</module>
- <module>connector-http-onesignal</module>
- <module>connector-http-jira</module>
- <module>connector-http-gitlab</module>
- <module>connector-http-github</module>
- <module>connector-http-notion</module>
- <module>connector-http-persistiq</module>
- </modules>
+ <artifactId>connector-http-airtable</artifactId>
+ <name>SeaTunnel : Connectors V2 : Http : Airtable</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-http-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
</project>
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/config/AirtableConfig.java
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/config/AirtableConfig.java
new file mode 100644
index 0000000000..548ad29ae3
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/config/AirtableConfig.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.airtable.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpCommonOptions;
+
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+public class AirtableConfig extends HttpCommonOptions {
+
+ public static final String AUTHORIZATION = "Authorization";
+ public static final String BEARER = "Bearer";
+ public static final String CONTENT_TYPE = "Content-Type";
+ public static final String APPLICATION_JSON = "application/json";
+
+ public static final String DEFAULT_API_BASE_URL =
"https://api.airtable.com";
+
+ private static final String API_VERSION_PATH = "/v0";
+
+ public static final Option<String> API_BASE_URL =
+ Options.key("api_base_url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Airtable API base URL, default is
https://api.airtable.com");
+
+ public static final Option<String> TOKEN =
+ Options.key("token")
+ .stringType()
+ .noDefaultValue()
+ .withFallbackKeys("api_key")
+ .withDescription("Airtable personal access token");
+
+ public static final Option<String> BASE_ID =
+ Options.key("base_id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Airtable base ID");
+
+ public static final Option<String> TABLE =
+ Options.key("table")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Airtable table name or table ID");
+
+ public static final Option<Integer> REQUEST_INTERVAL_MS =
+ Options.key("request_interval_ms")
+ .intType()
+ .defaultValue(220)
+ .withDescription(
+ "Minimum interval in milliseconds between Airtable
API requests, must be >= 0.");
+
+ public static final Option<Integer> RATE_LIMIT_BACKOFF_MS =
+ Options.key("rate_limit_backoff_ms")
+ .intType()
+ .defaultValue(30000)
+ .withDescription(
+ "Base backoff time in milliseconds when Airtable
returns 429, must be >= 0.");
+
+ public static final Option<Integer> RATE_LIMIT_MAX_RETRIES =
+ Options.key("rate_limit_max_retries")
+ .intType()
+ .defaultValue(3)
+ .withDescription(
+ "Maximum retries after receiving Airtable 429
responses, must be >= 0.");
+
+ public static String buildBaseUrl(String apiBaseUrl, String baseId, String
table) {
+ String normalized =
+ apiBaseUrl.endsWith("/")
+ ? apiBaseUrl.substring(0, apiBaseUrl.length() - 1)
+ : apiBaseUrl;
+ if (!normalized.endsWith(API_VERSION_PATH)) {
+ normalized = normalized + API_VERSION_PATH;
+ }
+ return normalized + "/" + baseId + "/" + encodePathSegment(table);
+ }
+
+ public static String encodePathSegment(String value) {
+ try {
+ String encoded = URLEncoder.encode(value,
StandardCharsets.UTF_8.name());
+ return encoded.replace("+", "%20");
+ } catch (java.io.UnsupportedEncodingException e) {
+ throw new IllegalStateException("UTF-8 encoding is not supported",
e);
+ }
+ }
+
+ public static Map<String, String> buildAuthHeaders(
+ String token, Map<String, String> existingHeaders) {
+ Map<String, String> headers =
+
Optional.ofNullable(existingHeaders).map(HashMap::new).orElse(new HashMap<>());
+ headers.put(AUTHORIZATION, BEARER + " " + token);
+ headers.put(CONTENT_TYPE, APPLICATION_JSON);
+ return headers;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/sink/AirtableSink.java
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/sink/AirtableSink.java
new file mode 100644
index 0000000000..ae3bf36aa3
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/sink/AirtableSink.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.airtable.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.airtable.config.AirtableConfig;
+import
org.apache.seatunnel.connectors.seatunnel.airtable.sink.config.AirtableSinkOptions;
+import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
+
+import java.io.IOException;
+import java.util.Optional;
+
+public class AirtableSink extends AbstractSimpleSink<SeaTunnelRow, Void>
+ implements SupportMultiTableSink {
+
+ private final CatalogTable catalogTable;
+ private final SeaTunnelRowType seaTunnelRowType;
+ private final HttpParameter httpParameter;
+ private final int batchSize;
+ private final boolean typecast;
+ private final int requestIntervalMs;
+ private final int rateLimitBackoffMs;
+ private final int rateLimitMaxRetries;
+
+ public AirtableSink(ReadonlyConfig pluginConfig, CatalogTable
catalogTable) {
+ this.catalogTable = catalogTable;
+ this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
+
+ String baseId = pluginConfig.get(AirtableConfig.BASE_ID);
+ String table = pluginConfig.get(AirtableConfig.TABLE);
+ String token = pluginConfig.get(AirtableConfig.TOKEN);
+ String apiBaseUrl =
+ pluginConfig
+ .getOptional(AirtableConfig.API_BASE_URL)
+ .orElse(AirtableConfig.DEFAULT_API_BASE_URL);
+
+ this.httpParameter = new HttpParameter();
+ this.httpParameter.setUrl(AirtableConfig.buildBaseUrl(apiBaseUrl,
baseId, table));
+ this.httpParameter.setHeaders(AirtableConfig.buildAuthHeaders(token,
null));
+
+ this.batchSize = pluginConfig.get(AirtableSinkOptions.BATCH_SIZE);
+ this.typecast = pluginConfig.get(AirtableSinkOptions.TYPECAST);
+ this.requestIntervalMs =
pluginConfig.get(AirtableConfig.REQUEST_INTERVAL_MS);
+ this.rateLimitBackoffMs =
pluginConfig.get(AirtableConfig.RATE_LIMIT_BACKOFF_MS);
+ this.rateLimitMaxRetries =
pluginConfig.get(AirtableConfig.RATE_LIMIT_MAX_RETRIES);
+ }
+
+ @Override
+ public String getPluginName() {
+ return "Airtable";
+ }
+
+ @Override
+ public AirtableSinkWriter createWriter(SinkWriter.Context context) throws
IOException {
+ return new AirtableSinkWriter(
+ seaTunnelRowType,
+ httpParameter,
+ batchSize,
+ typecast,
+ requestIntervalMs,
+ rateLimitBackoffMs,
+ rateLimitMaxRetries);
+ }
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.ofNullable(catalogTable);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/sink/AirtableSinkFactory.java
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/sink/AirtableSinkFactory.java
new file mode 100644
index 0000000000..3273a6d128
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/sink/AirtableSinkFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.airtable.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.options.SinkConnectorCommonOptions;
+import org.apache.seatunnel.api.table.connector.TableSink;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import
org.apache.seatunnel.connectors.seatunnel.airtable.config.AirtableConfig;
+import
org.apache.seatunnel.connectors.seatunnel.airtable.sink.config.AirtableSinkOptions;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class AirtableSinkFactory implements TableSinkFactory {
+
+ @Override
+ public String factoryIdentifier() {
+ return "Airtable";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(AirtableConfig.TOKEN, AirtableConfig.BASE_ID,
AirtableConfig.TABLE)
+ .optional(
+ AirtableConfig.API_BASE_URL,
+ AirtableSinkOptions.TYPECAST,
+ AirtableSinkOptions.BATCH_SIZE,
+ AirtableConfig.REQUEST_INTERVAL_MS,
+ AirtableConfig.RATE_LIMIT_BACKOFF_MS,
+ AirtableConfig.RATE_LIMIT_MAX_RETRIES,
+ SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
+ .build();
+ }
+
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ return () -> new AirtableSink(context.getOptions(),
context.getCatalogTable());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/sink/AirtableSinkWriter.java
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/sink/AirtableSinkWriter.java
new file mode 100644
index 0000000000..97cd959893
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/sink/AirtableSinkWriter.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.airtable.sink;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ArrayNode;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import
org.apache.seatunnel.connectors.seatunnel.http.client.HttpClientProvider;
+import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+@Slf4j
+public class AirtableSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
+ implements SupportMultiTableSinkWriter<Void> {
+
+ private static final int STATUS_TOO_MANY_REQUESTS = 429;
+ private static final long MAX_BACKOFF_MILLIS = 300000L;
+
+ private final HttpClientProvider httpClient;
+ private final String url;
+ private final Map<String, String> headers;
+ private final JsonSerializationSchema serializationSchema;
+ private final ObjectMapper objectMapper;
+ private final int batchSize;
+ private final boolean typecast;
+ private final int requestIntervalMs;
+ private final int rateLimitBackoffMs;
+ private final int rateLimitMaxRetries;
+ private final List<SeaTunnelRow> batchBuffer;
+ private long lastRequestTimeMillis;
+
+ public AirtableSinkWriter(
+ SeaTunnelRowType seaTunnelRowType,
+ HttpParameter httpParameter,
+ int batchSize,
+ boolean typecast,
+ int requestIntervalMs,
+ int rateLimitBackoffMs,
+ int rateLimitMaxRetries) {
+ this.url = httpParameter.getUrl();
+ this.headers = httpParameter.getHeaders();
+ this.httpClient = new HttpClientProvider(httpParameter);
+ this.serializationSchema = new
JsonSerializationSchema(seaTunnelRowType);
+ this.objectMapper = serializationSchema.getMapper();
+ this.batchSize = Math.min(Math.max(batchSize, 1), 10);
+ this.typecast = typecast;
+ this.requestIntervalMs = Math.max(0, requestIntervalMs);
+ this.rateLimitBackoffMs = Math.max(0, rateLimitBackoffMs);
+ this.rateLimitMaxRetries = Math.max(0, rateLimitMaxRetries);
+ this.batchBuffer = new ArrayList<>(this.batchSize);
+ this.lastRequestTimeMillis = 0L;
+ }
+
+ @Override
+ public void write(SeaTunnelRow element) throws IOException {
+ batchBuffer.add(element);
+ if (batchBuffer.size() >= batchSize) {
+ flush();
+ }
+ }
+
+ private void flush() throws IOException {
+ if (batchBuffer.isEmpty()) {
+ return;
+ }
+
+ String body = buildRequestBody();
+ sendWithRateLimitRetry(body);
+ batchBuffer.clear();
+ }
+
+ private String buildRequestBody() throws IOException {
+ ObjectNode root = objectMapper.createObjectNode();
+ ArrayNode records = objectMapper.createArrayNode();
+
+ for (SeaTunnelRow row : batchBuffer) {
+ byte[] serialized = serializationSchema.serialize(row);
+ JsonNode fieldsNode = objectMapper.readTree(serialized);
+ ObjectNode record = objectMapper.createObjectNode();
+ record.set("fields", fieldsNode);
+ records.add(record);
+ }
+
+ root.set("records", records);
+ if (typecast) {
+ root.put("typecast", true);
+ }
+
+ return objectMapper.writeValueAsString(root);
+ }
+
+ private void sendWithRateLimitRetry(String body) throws IOException {
+ int retryCount = 0;
+ while (true) {
+ waitForRequestSlot();
+ try {
+ HttpResponse response = httpClient.doPost(url, headers, body);
+ if (HttpResponse.STATUS_OK == response.getCode()) {
+ return;
+ }
+ if (response.getCode() == STATUS_TOO_MANY_REQUESTS
+ && retryCount < rateLimitMaxRetries) {
+ retryCount++;
+ long backoffMillis = calculateBackoffMillis(retryCount);
+ log.warn(
+ "Airtable API rate limit reached, retry {}/{}
after {} ms",
+ retryCount,
+ rateLimitMaxRetries,
+ backoffMillis);
+ try {
+ Thread.sleep(backoffMillis);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ continue;
+ }
+ throw new IOException(
+ String.format(
+ "Airtable API request failed, status
code:[%s], content:[%s]",
+ response.getCode(), response.getContent()));
+ } catch (IOException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new IOException("Failed to send Airtable API request",
e);
+ }
+ }
+ }
+
+ private void waitForRequestSlot() {
+ if (requestIntervalMs <= 0) {
+ return;
+ }
+ long now = System.currentTimeMillis();
+ long elapsed = now - lastRequestTimeMillis;
+ if (elapsed < requestIntervalMs) {
+ try {
+ Thread.sleep(requestIntervalMs - elapsed);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+ lastRequestTimeMillis = System.currentTimeMillis();
+ }
+
+ private long calculateBackoffMillis(int retryCount) {
+ if (rateLimitBackoffMs <= 0) {
+ return 0L;
+ }
+ long exponential = 1L << Math.min(20, Math.max(0, retryCount - 1));
+ long waitMillis = rateLimitBackoffMs * exponential;
+ return Math.min(waitMillis, MAX_BACKOFF_MILLIS);
+ }
+
+ @Override
+ public Optional<Void> prepareCommit() {
+ try {
+ flush();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to flush data in
prepareCommit", e);
+ }
+ return Optional.empty();
+ }
+
+ @Override
+ public void close() throws IOException {
+ flush();
+ if (Objects.nonNull(httpClient)) {
+ httpClient.close();
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/sink/config/AirtableSinkOptions.java
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/sink/config/AirtableSinkOptions.java
new file mode 100644
index 0000000000..fb1ab7ca5f
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/sink/config/AirtableSinkOptions.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.airtable.sink.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import
org.apache.seatunnel.connectors.seatunnel.airtable.config.AirtableConfig;
+
+public class AirtableSinkOptions extends AirtableConfig {
+
+ public static final Option<Boolean> TYPECAST =
+ Options.key("typecast")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "If true, Airtable will automatically typecast
values to match the field type.");
+
+ public static final Option<Integer> BATCH_SIZE =
+ Options.key("batch_size")
+ .intType()
+ .defaultValue(10)
+ .withDescription(
+ "Number of records per API request, maximum 10 per
Airtable API limit.");
+}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/AirtableSource.java
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/AirtableSource.java
new file mode 100644
index 0000000000..085364b57a
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/AirtableSource.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.airtable.source;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.constants.JobMode;
+import
org.apache.seatunnel.connectors.seatunnel.airtable.source.config.AirtableSourceOptions;
+import
org.apache.seatunnel.connectors.seatunnel.airtable.source.config.AirtableSourceParameter;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import
org.apache.seatunnel.connectors.seatunnel.http.config.HttpPaginationType;
+import org.apache.seatunnel.connectors.seatunnel.http.config.PageInfo;
+import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSource;
+
+public class AirtableSource extends HttpSource {
+
+ public static final String PLUGIN_NAME = "Airtable";
+
+ private final AirtableSourceParameter airtableSourceParameter = new
AirtableSourceParameter();
+ private final int requestIntervalMs;
+ private final int rateLimitBackoffMs;
+ private final int rateLimitMaxRetries;
+
+ public AirtableSource(ReadonlyConfig pluginConfig) {
+ super(pluginConfig);
+ airtableSourceParameter.buildWithConfig(pluginConfig);
+ this.requestIntervalMs =
pluginConfig.get(AirtableSourceOptions.REQUEST_INTERVAL_MS);
+ this.rateLimitBackoffMs =
pluginConfig.get(AirtableSourceOptions.RATE_LIMIT_BACKOFF_MS);
+ this.rateLimitMaxRetries =
pluginConfig.get(AirtableSourceOptions.RATE_LIMIT_MAX_RETRIES);
+ if (this.pageInfo == null) {
+ PageInfo info = new PageInfo();
+ info.setPageType(HttpPaginationType.CURSOR.getCode());
+ info.setPageCursorFieldName("offset");
+ info.setPageCursorResponseField("$.offset");
+ info.setUsePlaceholderReplacement(false);
+ // Avoid NPE in HttpSourceReader.updateRequestParam for cursor
pagination
+ // (pageIndex is unused for cursor mode but referenced
defensively).
+ info.setPageIndex(0L);
+ this.pageInfo = info;
+ }
+ }
+
+ @Override
+ public String getPluginName() {
+ return PLUGIN_NAME;
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ if (JobMode.BATCH.equals(jobContext.getJobMode())) {
+ return Boundedness.BOUNDED;
+ }
+ throw new UnsupportedOperationException(
+ "Airtable source connector not support unbounded operation");
+ }
+
+ @Override
+ public AbstractSingleSplitReader<SeaTunnelRow> createReader(
+ SingleSplitReaderContext readerContext) throws Exception {
+ return new AirtableSourceReader(
+ airtableSourceParameter,
+ readerContext,
+ deserializationSchema,
+ jsonField,
+ contentField,
+ pageInfo,
+ requestIntervalMs,
+ rateLimitBackoffMs,
+ rateLimitMaxRetries);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/AirtableSourceFactory.java
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/AirtableSourceFactory.java
new file mode 100644
index 0000000000..6dd25dca90
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/AirtableSourceFactory.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.airtable.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.options.ConnectorCommonOptions;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import
org.apache.seatunnel.connectors.seatunnel.airtable.source.config.AirtableSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceFactory;
+
+import com.google.auto.service.AutoService;
+
+import java.io.Serializable;
+
+@AutoService(Factory.class)
+public class AirtableSourceFactory extends HttpSourceFactory {
+
+ @Override
+ public String factoryIdentifier() {
+ return "Airtable";
+ }
+
+ @Override
+ public <T, SplitT extends SourceSplit, StateT extends Serializable>
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
+ return () -> (SeaTunnelSource<T, SplitT, StateT>) new
AirtableSource(context.getOptions());
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(
+ AirtableSourceOptions.TOKEN,
+ AirtableSourceOptions.BASE_ID,
+ AirtableSourceOptions.TABLE)
+ .optional(
+ AirtableSourceOptions.API_BASE_URL,
+ AirtableSourceOptions.VIEW,
+ AirtableSourceOptions.FIELDS,
+ AirtableSourceOptions.FILTER_BY_FORMULA,
+ AirtableSourceOptions.MAX_RECORDS,
+ AirtableSourceOptions.PAGE_SIZE,
+ AirtableSourceOptions.SORT,
+ AirtableSourceOptions.CELL_FORMAT,
+ AirtableSourceOptions.RETURN_FIELDS_BY_FIELD_ID,
+ AirtableSourceOptions.RECORD_METADATA,
+ AirtableSourceOptions.TIME_ZONE,
+ AirtableSourceOptions.USER_LOCALE,
+ AirtableSourceOptions.OFFSET,
+ AirtableSourceOptions.REQUEST_INTERVAL_MS,
+ AirtableSourceOptions.RATE_LIMIT_BACKOFF_MS,
+ AirtableSourceOptions.RATE_LIMIT_MAX_RETRIES,
+ // Base HTTP options (aligned with
HttpSourceFactory.getHttpBuilder)
+ HttpSourceOptions.HEADERS,
+ HttpSourceOptions.BODY,
+ HttpSourceOptions.FORMAT,
+ HttpSourceOptions.PAGEING,
+ HttpSourceOptions.JSON_FIELD,
+ HttpSourceOptions.CONTENT_FIELD,
+ HttpSourceOptions.POLL_INTERVAL_MILLS,
+ HttpSourceOptions.RETRY,
+ HttpSourceOptions.RETRY_BACKOFF_MULTIPLIER_MS,
+ HttpSourceOptions.RETRY_BACKOFF_MAX_MS,
+ HttpSourceOptions.JSON_FILED_MISSED_RETURN_NULL)
+ .conditional(
+ HttpSourceOptions.FORMAT,
+ HttpConfig.ResponseFormat.JSON,
+ ConnectorCommonOptions.SCHEMA)
+ .build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/AirtableSourceReader.java
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/AirtableSourceReader.java
new file mode 100644
index 0000000000..8f22b7a50b
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/AirtableSourceReader.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.airtable.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
+import org.apache.seatunnel.connectors.seatunnel.http.config.JsonField;
+import org.apache.seatunnel.connectors.seatunnel.http.config.PageInfo;
+import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceReader;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class AirtableSourceReader extends HttpSourceReader {
+
+ private static final int STATUS_TOO_MANY_REQUESTS = 429;
+ private static final long MAX_BACKOFF_MILLIS = 300000L;
+
+ private final int requestIntervalMs;
+ private final int rateLimitBackoffMs;
+ private final int rateLimitMaxRetries;
+ private long lastRequestTimeMillis = 0L;
+
+ public AirtableSourceReader(
+ HttpParameter httpParameter,
+ SingleSplitReaderContext context,
+ DeserializationSchema<SeaTunnelRow> deserializationSchema,
+ JsonField jsonField,
+ String contentJson,
+ PageInfo pageInfo,
+ int requestIntervalMs,
+ int rateLimitBackoffMs,
+ int rateLimitMaxRetries) {
+ super(httpParameter, context, deserializationSchema, jsonField,
contentJson, pageInfo);
+ this.requestIntervalMs = Math.max(0, requestIntervalMs);
+ this.rateLimitBackoffMs = Math.max(0, rateLimitBackoffMs);
+ this.rateLimitMaxRetries = Math.max(0, rateLimitMaxRetries);
+ }
+
+ @Override
+ protected HttpResponse executeRequest() throws Exception {
+ int retryCount = 0;
+ while (true) {
+ waitForRequestSlot();
+ HttpResponse response = doExecuteRequest();
+ if (response.getCode() == STATUS_TOO_MANY_REQUESTS
+ && retryCount < rateLimitMaxRetries) {
+ retryCount += 1;
+ long backoffMillis = calculateBackoffMillis(retryCount);
+ log.warn(
+ "Airtable API rate limit reached, retry {}/{} after {}
ms",
+ retryCount,
+ rateLimitMaxRetries,
+ backoffMillis);
+ try {
+ Thread.sleep(backoffMillis);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ continue;
+ }
+ return response;
+ }
+ }
+
+ private HttpResponse doExecuteRequest() throws Exception {
+ return httpClient.execute(
+ this.httpParameter.getUrl(),
+ this.httpParameter.getMethod().getMethod(),
+ this.httpParameter.getHeaders(),
+ this.httpParameter.getParams(),
+ this.httpParameter.getBody(),
+ this.httpParameter.isKeepParamsAsForm());
+ }
+
+ private void waitForRequestSlot() {
+ if (requestIntervalMs <= 0) {
+ return;
+ }
+ long now = System.currentTimeMillis();
+ long elapsed = now - lastRequestTimeMillis;
+ if (elapsed < requestIntervalMs) {
+ try {
+ Thread.sleep(requestIntervalMs - elapsed);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+ lastRequestTimeMillis = System.currentTimeMillis();
+ }
+
+ private long calculateBackoffMillis(int retryCount) {
+ if (rateLimitBackoffMs <= 0) {
+ return 0L;
+ }
+ long exponential = 1L << Math.min(20, Math.max(0, retryCount - 1));
+ long waitMillis = rateLimitBackoffMs * exponential;
+ return Math.min(waitMillis, MAX_BACKOFF_MILLIS);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/config/AirtableSourceOptions.java
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/config/AirtableSourceOptions.java
new file mode 100644
index 0000000000..0fff930355
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/config/AirtableSourceOptions.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.airtable.source.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import
org.apache.seatunnel.connectors.seatunnel.airtable.config.AirtableConfig;
+
+import java.util.List;
+
+public class AirtableSourceOptions extends AirtableConfig {
+
+ public static final Option<String> VIEW =
+ Options.key("view")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The name or ID of a view");
+
+ public static final Option<List<String>> FIELDS =
+ Options.key("fields")
+ .listType()
+ .noDefaultValue()
+ .withDescription("The list of field names to include");
+
+ public static final Option<String> FILTER_BY_FORMULA =
+ Options.key("filter_by_formula")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Airtable filterByFormula expression");
+
+ public static final Option<Integer> MAX_RECORDS =
+ Options.key("max_records")
+ .intType()
+ .noDefaultValue()
+ .withDescription("Maximum number of records to return,
must be greater than 0");
+
+ public static final Option<Integer> PAGE_SIZE =
+ Options.key("page_size")
+ .intType()
+ .noDefaultValue()
+ .withDescription("Number of records per page, must be in
range [1, 100]");
+
+ public static final Option<String> SORT =
+ Options.key("sort")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Sort definition JSON array, e.g.
[{\"field\":\"Name\",\"direction\":\"asc\"}]");
+
+ public static final Option<String> CELL_FORMAT =
+ Options.key("cell_format")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("cellFormat value, e.g. json or string");
+
+ public static final Option<Boolean> RETURN_FIELDS_BY_FIELD_ID =
+ Options.key("return_fields_by_field_id")
+ .booleanType()
+ .noDefaultValue()
+ .withDescription("Return fields by field ID instead of
field name");
+
+ public static final Option<List<String>> RECORD_METADATA =
+ Options.key("record_metadata")
+ .listType()
+ .noDefaultValue()
+ .withDescription("Record metadata to return, e.g.
[\"commentCount\"]");
+
+ public static final Option<String> TIME_ZONE =
+ Options.key("time_zone")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The time zone for cell values");
+
+ public static final Option<String> USER_LOCALE =
+ Options.key("user_locale")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The user locale for cell values");
+
+ public static final Option<String> OFFSET =
+ Options.key("offset")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Pagination offset returned by Airtable");
+}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/config/AirtableSourceParameter.java
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/config/AirtableSourceParameter.java
new file mode 100644
index 0000000000..811550dfca
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/main/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/config/AirtableSourceParameter.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.airtable.source.config;
+
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.seatunnel.shade.com.google.common.base.Strings;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import
org.apache.seatunnel.connectors.seatunnel.airtable.config.AirtableConfig;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpRequestMethod;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AirtableSourceParameter extends HttpParameter {
+ private static final String LIST_RECORDS_SUFFIX = "/listRecords";
+
+ @Override
+ public void buildWithConfig(ReadonlyConfig pluginConfig) {
+ super.buildWithConfig(pluginConfig);
+ String baseId = pluginConfig.get(AirtableSourceOptions.BASE_ID);
+ String table = pluginConfig.get(AirtableSourceOptions.TABLE);
+ String apiBaseUrl =
+ pluginConfig
+ .getOptional(AirtableSourceOptions.API_BASE_URL)
+ .orElse(AirtableConfig.DEFAULT_API_BASE_URL);
+
+ this.setUrl(AirtableConfig.buildBaseUrl(apiBaseUrl, baseId, table) +
LIST_RECORDS_SUFFIX);
+ this.setMethod(HttpRequestMethod.POST);
+
+ String token = pluginConfig.get(AirtableSourceOptions.TOKEN);
+ this.setHeaders(AirtableConfig.buildAuthHeaders(token, getHeaders()));
+
+ this.setBody(buildRequestBody(pluginConfig, this.getBody()));
+ }
+
+ private String buildRequestBody(ReadonlyConfig pluginConfig, String
existingBody) {
+ Map<String, Object> body = new HashMap<>();
+ if (!Strings.isNullOrEmpty(existingBody)) {
+ try {
+ Map<String, Object> parsed =
+ JsonUtils.parseObject(
+ existingBody, new TypeReference<Map<String,
Object>>() {});
+ if (parsed != null) {
+ body.putAll(parsed);
+ }
+ } catch (Exception ignored) {
+ // Ignore non-JSON body and build Airtable request body from
options.
+ }
+ }
+
+ checkBodyConflicts(pluginConfig, body);
+
+ pluginConfig
+ .getOptional(AirtableSourceOptions.FIELDS)
+ .ifPresent(value -> body.put("fields", value));
+ pluginConfig
+ .getOptional(AirtableSourceOptions.FILTER_BY_FORMULA)
+ .ifPresent(value -> body.put("filterByFormula", value));
+ pluginConfig
+ .getOptional(AirtableSourceOptions.MAX_RECORDS)
+ .ifPresent(value -> body.put("maxRecords", value));
+ pluginConfig
+ .getOptional(AirtableSourceOptions.PAGE_SIZE)
+ .ifPresent(value -> body.put("pageSize", value));
+ pluginConfig
+ .getOptional(AirtableSourceOptions.SORT)
+ .ifPresent(value -> body.put("sort", parseSort(value)));
+ pluginConfig
+ .getOptional(AirtableSourceOptions.VIEW)
+ .ifPresent(value -> body.put("view", value));
+ pluginConfig
+ .getOptional(AirtableSourceOptions.CELL_FORMAT)
+ .ifPresent(value -> body.put("cellFormat", value));
+ pluginConfig
+ .getOptional(AirtableSourceOptions.RETURN_FIELDS_BY_FIELD_ID)
+ .ifPresent(value -> body.put("returnFieldsByFieldId", value));
+ pluginConfig
+ .getOptional(AirtableSourceOptions.RECORD_METADATA)
+ .ifPresent(value -> body.put("recordMetadata", value));
+ pluginConfig
+ .getOptional(AirtableSourceOptions.TIME_ZONE)
+ .ifPresent(value -> body.put("timeZone", value));
+ pluginConfig
+ .getOptional(AirtableSourceOptions.USER_LOCALE)
+ .ifPresent(value -> body.put("userLocale", value));
+
+ // Keep offset key for key-based cursor replacement in
HttpSourceReader.
+ // Dedicated option wins; otherwise preserve body offset if present.
+ if
(pluginConfig.getOptional(AirtableSourceOptions.OFFSET).isPresent()) {
+ body.put("offset", pluginConfig.get(AirtableSourceOptions.OFFSET));
+ } else {
+ body.putIfAbsent("offset", null);
+ }
+
+ return JsonUtils.toJsonString(body);
+ }
+
+ private void checkBodyConflicts(ReadonlyConfig pluginConfig, Map<String,
Object> body) {
+ if (body.isEmpty()) {
+ return;
+ }
+ List<String> conflicts = new ArrayList<>();
+ checkConflict(pluginConfig, body, AirtableSourceOptions.FIELDS,
"fields", conflicts);
+ checkConflict(
+ pluginConfig,
+ body,
+ AirtableSourceOptions.FILTER_BY_FORMULA,
+ "filterByFormula",
+ conflicts);
+ checkConflict(
+ pluginConfig, body, AirtableSourceOptions.MAX_RECORDS,
"maxRecords", conflicts);
+ checkConflict(pluginConfig, body, AirtableSourceOptions.PAGE_SIZE,
"pageSize", conflicts);
+ checkConflict(pluginConfig, body, AirtableSourceOptions.SORT, "sort",
conflicts);
+ checkConflict(pluginConfig, body, AirtableSourceOptions.VIEW, "view",
conflicts);
+ checkConflict(
+ pluginConfig, body, AirtableSourceOptions.CELL_FORMAT,
"cellFormat", conflicts);
+ checkConflict(
+ pluginConfig,
+ body,
+ AirtableSourceOptions.RETURN_FIELDS_BY_FIELD_ID,
+ "returnFieldsByFieldId",
+ conflicts);
+ checkConflict(
+ pluginConfig,
+ body,
+ AirtableSourceOptions.RECORD_METADATA,
+ "recordMetadata",
+ conflicts);
+ checkConflict(pluginConfig, body, AirtableSourceOptions.TIME_ZONE,
"timeZone", conflicts);
+ checkConflict(
+ pluginConfig, body, AirtableSourceOptions.USER_LOCALE,
"userLocale", conflicts);
+ checkConflict(pluginConfig, body, AirtableSourceOptions.OFFSET,
"offset", conflicts);
+ if (!conflicts.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Conflict between 'body' and dedicated Airtable options
for keys: "
+ + String.join(", ", conflicts)
+ + ". Please use either the dedicated option or
'body', not both.");
+ }
+ }
+
+ private void checkConflict(
+ ReadonlyConfig pluginConfig,
+ Map<String, Object> body,
+ Option<?> option,
+ String bodyKey,
+ List<String> conflicts) {
+ if (pluginConfig.getOptional(option).isPresent() &&
body.containsKey(bodyKey)) {
+ conflicts.add(bodyKey + " (option: " + option.key() + ")");
+ }
+ }
+
+ private Object parseSort(String sortJson) {
+ try {
+ return JsonUtils.parseObject(
+ sortJson, new TypeReference<List<Map<String, Object>>>()
{});
+ } catch (RuntimeException e) {
+ throw new IllegalArgumentException("Invalid sort JSON: " +
sortJson, e);
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/test/java/org/apache/seatunnel/connectors/seatunnel/airtable/AirtableFactoryTest.java
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/test/java/org/apache/seatunnel/connectors/seatunnel/airtable/AirtableFactoryTest.java
new file mode 100644
index 0000000000..eebe34ff6d
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/test/java/org/apache/seatunnel/connectors/seatunnel/airtable/AirtableFactoryTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.airtable;
+
+import
org.apache.seatunnel.connectors.seatunnel.airtable.sink.AirtableSinkFactory;
+import
org.apache.seatunnel.connectors.seatunnel.airtable.source.AirtableSourceFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class AirtableFactoryTest {
+
+ @Test
+ public void optionRule() {
+ Assertions.assertNotNull((new AirtableSourceFactory()).optionRule());
+ Assertions.assertNotNull((new AirtableSinkFactory()).optionRule());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/test/java/org/apache/seatunnel/connectors/seatunnel/airtable/sink/AirtableSinkWriterTest.java
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/test/java/org/apache/seatunnel/connectors/seatunnel/airtable/sink/AirtableSinkWriterTest.java
new file mode 100644
index 0000000000..236c0821f8
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/test/java/org/apache/seatunnel/connectors/seatunnel/airtable/sink/AirtableSinkWriterTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.airtable.sink;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.http.client.HttpClientProvider;
+import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class AirtableSinkWriterTest {
+
+ @Mock private HttpClientProvider httpClient;
+
+ private SeaTunnelRowType rowType;
+
+ @BeforeEach
+ public void setUp() {
+ MockitoAnnotations.openMocks(this);
+ rowType =
+ new SeaTunnelRowType(
+ new String[] {"Name", "Age"},
+ new SeaTunnelDataType[] {BasicType.STRING_TYPE,
BasicType.INT_TYPE});
+ }
+
+ private AirtableSinkWriter createWriter(int batchSize, boolean typecast)
throws Exception {
+ HttpParameter param = new HttpParameter();
+ param.setUrl("https://api.airtable.com/v0/appXXX/tblYYY");
+ Map<String, String> headers = new HashMap<>();
+ headers.put("Authorization", "Bearer test_token");
+ headers.put("Content-Type", "application/json");
+ param.setHeaders(headers);
+
+ AirtableSinkWriter writer =
+ new AirtableSinkWriter(rowType, param, batchSize, typecast, 0,
0, 3);
+
+ Field field = AirtableSinkWriter.class.getDeclaredField("httpClient");
+ field.setAccessible(true);
+ field.set(writer, httpClient);
+ return writer;
+ }
+
+ @Test
+ public void testBatchWriteBodyFormat() throws Exception {
+ when(httpClient.doPost(anyString(), any(), anyString()))
+ .thenReturn(new HttpResponse(200, "{}"));
+
+ AirtableSinkWriter writer = createWriter(2, false);
+ writer.write(new SeaTunnelRow(new Object[] {"Alice", 30}));
+ writer.write(new SeaTunnelRow(new Object[] {"Bob", 25}));
+
+ ArgumentCaptor<String> bodyCaptor =
ArgumentCaptor.forClass(String.class);
+ verify(httpClient, times(1)).doPost(anyString(), any(),
bodyCaptor.capture());
+
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode root = mapper.readTree(bodyCaptor.getValue());
+ Assertions.assertTrue(root.has("records"));
+ Assertions.assertFalse(root.has("typecast"));
+
+ JsonNode records = root.get("records");
+ Assertions.assertEquals(2, records.size());
+ Assertions.assertTrue(records.get(0).has("fields"));
+ Assertions.assertEquals("Alice",
records.get(0).get("fields").get("Name").asText());
+ }
+
+ @Test
+ public void testThrowsAfterMaxRetries() throws Exception {
+ when(httpClient.doPost(anyString(), any(), anyString()))
+ .thenReturn(new HttpResponse(429,
"{\"error\":{\"type\":\"RATE_LIMIT\"}}"));
+
+ AirtableSinkWriter writer = createWriter(1, false);
+
+ Assertions.assertThrows(
+ IOException.class,
+ () -> writer.write(new SeaTunnelRow(new Object[] {"Alice",
30})));
+ // 1 initial + 3 retries = 4 calls
+ verify(httpClient, times(4)).doPost(anyString(), any(), anyString());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/test/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/AirtableSourceReaderTest.java
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/test/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/AirtableSourceReaderTest.java
new file mode 100644
index 0000000000..a26be6763f
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-http/connector-http-airtable/src/test/java/org/apache/seatunnel/connectors/seatunnel/airtable/source/AirtableSourceReaderTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.airtable.source;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import
org.apache.seatunnel.connectors.seatunnel.http.client.HttpClientProvider;
+import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpRequestMethod;
+import
org.apache.seatunnel.connectors.seatunnel.http.source.SimpleTextDeserializationSchema;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class AirtableSourceReaderTest {
+
+ @Mock private SingleSplitReaderContext context;
+ @Mock private HttpClientProvider httpClient;
+
+ private HttpParameter parameter;
+ private SimpleTextDeserializationSchema schema;
+
+ @BeforeEach
+ public void setUp() {
+ MockitoAnnotations.openMocks(this);
+ parameter = new HttpParameter();
+
parameter.setUrl("https://api.airtable.com/v0/appBase/table/listRecords");
+ parameter.setMethod(HttpRequestMethod.POST);
+
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(
+ new String[] {"content"}, new SeaTunnelDataType[]
{BasicType.STRING_TYPE});
+ schema = new SimpleTextDeserializationSchema(rowType);
+ }
+
+ private AirtableSourceReader createReader(int rateLimitMaxRetries) {
+ AirtableSourceReader reader =
+ new AirtableSourceReader(
+ parameter, context, schema, null, null, null, 0, 0,
rateLimitMaxRetries);
+ reader.setHttpClient(httpClient);
+ return reader;
+ }
+
+ @Test
+ public void testRetryOn429ThenSuccess() throws Exception {
+ when(httpClient.execute(anyString(), anyString(), any(), any(), any(),
anyBoolean()))
+ .thenReturn(new HttpResponse(429,
"{\"error\":{\"type\":\"RATE_LIMIT\"}}"))
+ .thenReturn(
+ new HttpResponse(
+ 200,
+
"{\"records\":[{\"id\":\"rec1\",\"fields\":{\"Name\":\"Alice\"}}]}"));
+
+ AirtableSourceReader reader = createReader(2);
+ HttpResponse response = reader.executeRequest();
+
+ Assertions.assertEquals(200, response.getCode());
+ verify(httpClient, times(2))
+ .execute(anyString(), anyString(), any(), any(), any(),
anyBoolean());
+ }
+
+ @Test
+ public void testStopRetryAfterMaxRetries() throws Exception {
+ when(httpClient.execute(anyString(), anyString(), any(), any(), any(),
anyBoolean()))
+ .thenReturn(new HttpResponse(429,
"{\"error\":{\"type\":\"RATE_LIMIT\"}}"));
+
+ AirtableSourceReader reader = createReader(1);
+ HttpResponse response = reader.executeRequest();
+
+ Assertions.assertEquals(429, response.getCode());
+ // 1 initial + 1 retry = 2 calls
+ verify(httpClient, times(2))
+ .execute(anyString(), anyString(), any(), any(), any(),
anyBoolean());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
index 42486924e5..c7ab3c9c00 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
@@ -124,14 +124,7 @@ public class HttpSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow> {
}
public void pollAndCollectData(Collector<SeaTunnelRow> output) throws
Exception {
- HttpResponse response =
- httpClient.execute(
- this.httpParameter.getUrl(),
- this.httpParameter.getMethod().getMethod(),
- this.httpParameter.getHeaders(),
- this.httpParameter.getParams(),
- this.httpParameter.getBody(),
- this.httpParameter.isKeepParamsAsForm());
+ HttpResponse response = executeRequest();
if (response.getCode() >= 200 && response.getCode() <= 207) {
String content = response.getContent();
if (!Strings.isNullOrEmpty(content)) {
@@ -160,6 +153,16 @@ public class HttpSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow> {
}
}
+ protected HttpResponse executeRequest() throws Exception {
+ return httpClient.execute(
+ this.httpParameter.getUrl(),
+ this.httpParameter.getMethod().getMethod(),
+ this.httpParameter.getHeaders(),
+ this.httpParameter.getParams(),
+ this.httpParameter.getBody(),
+ this.httpParameter.isKeepParamsAsForm());
+ }
+
@VisibleForTesting
public void updateRequestParam(PageInfo pageInfo, boolean
usePlaceholderReplacement) {
// 1. keep page param as http param
diff --git a/seatunnel-connectors-v2/connector-http/pom.xml
b/seatunnel-connectors-v2/connector-http/pom.xml
index e32339a5f0..6842f7c413 100644
--- a/seatunnel-connectors-v2/connector-http/pom.xml
+++ b/seatunnel-connectors-v2/connector-http/pom.xml
@@ -42,6 +42,7 @@
<module>connector-http-github</module>
<module>connector-http-notion</module>
<module>connector-http-persistiq</module>
+ <module>connector-http-airtable</module>
</modules>
</project>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index 88d7e2c911..4559cd6663 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -490,6 +490,12 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-http-airtable</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-rabbitmq</artifactId>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/pom.xml
index 97e435e092..67b8c17a1a 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/pom.xml
@@ -117,6 +117,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-http-airtable</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.mock-server</groupId>
<artifactId>mockserver-netty-no-dependencies</artifactId>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
index 1d5c0f0afc..30eca3211b 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
@@ -361,6 +361,18 @@ public class HttpIT extends TestSuiteBase implements
TestResource {
Container.ExecResult execResult21 =
container.executeJob("/http_page_cursor_num_assert.conf");
Assertions.assertEquals(0, execResult21.getExitCode());
+
+ // http airtable source
+ Container.ExecResult execResult22 =
container.executeJob("/airtable_json_to_assert.conf");
+ Assertions.assertEquals(0, execResult22.getExitCode());
+ }
+
+ @TestTemplate
+ public void testFakeToAirtableSink(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/fake_to_airtable.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
mockServerClient.verify(request().withPath("/v0/appTEST123/SinkTable").withMethod("POST"));
}
@TestTemplate
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/airtable_json_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/airtable_json_to_assert.conf
new file mode 100644
index 0000000000..d4cb4b46cd
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/airtable_json_to_assert.conf
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Airtable {
+ plugin_output = "http"
+ api_base_url = "http://mockserver:1080"
+ token = "test_token"
+ base_id = "appTEST123"
+ table = "TestTable"
+ format = "json"
+ content_field = "$.records[*].fields"
+ page_size = 2
+ request_interval_ms = 0
+ schema = {
+ fields {
+ Name = string
+ Age = int
+ Status = string
+ }
+ }
+ }
+}
+
+sink {
+ Assert {
+ plugin_input = "http"
+ rules {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 5
+ },
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = Name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = Age
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = Status
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/fake_to_airtable.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/fake_to_airtable.conf
new file mode 100644
index 0000000000..98350dd5cc
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/fake_to_airtable.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ Name = string
+ Age = int
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = ["Alice", 30]
+ },
+ {
+ kind = INSERT
+ fields = ["Bob", 25]
+ }
+ ]
+ }
+}
+
+sink {
+ Airtable {
+ api_base_url = "http://mockserver:1080"
+ token = "test_token"
+ base_id = "appTEST123"
+ table = "SinkTable"
+ batch_size = 10
+ request_interval_ms = 0
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
index 662fd71c28..f948a23d2d 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
@@ -4828,5 +4828,120 @@
]
}
}
+ },
+ {
+ "httpRequest": {
+ "method": "POST",
+ "path": "/v0/appTEST123/TestTable/listRecords",
+ "body": {
+ "type": "JSON",
+ "json": "{\"pageSize\":2}",
+ "matchType": "STRICT"
+ }
+ },
+ "httpResponse": {
+ "body": {
+ "records": [
+ {
+ "id": "rec001",
+ "createdTime": "2024-01-01T00:00:00.000Z",
+ "fields": {
+ "Name": "Name001",
+ "Age": 21,
+ "Status": "Active"
+ }
+ },
+ {
+ "id": "rec002",
+ "createdTime": "2024-01-02T00:00:00.000Z",
+ "fields": {
+ "Name": "Name002",
+ "Age": 22,
+ "Status": "Inactive"
+ }
+ }
+ ],
+ "offset": "itrPAGE2/recPAGE2"
+ }
+ }
+ },
+ {
+ "httpRequest": {
+ "method": "POST",
+ "path": "/v0/appTEST123/TestTable/listRecords",
+ "body": {
+ "type": "JSON",
+ "json": "{\"offset\":\"itrPAGE2/recPAGE2\",\"pageSize\":2}",
+ "matchType": "STRICT"
+ }
+ },
+ "httpResponse": {
+ "body": {
+ "records": [
+ {
+ "id": "rec003",
+ "createdTime": "2024-01-03T00:00:00.000Z",
+ "fields": {
+ "Name": "Name003",
+ "Age": 23,
+ "Status": "Active"
+ }
+ },
+ {
+ "id": "rec004",
+ "createdTime": "2024-01-04T00:00:00.000Z",
+ "fields": {
+ "Name": "Name004",
+ "Age": 24,
+ "Status": "Inactive"
+ }
+ }
+ ],
+ "offset": "itrPAGE3/recPAGE3"
+ }
+ }
+ },
+ {
+ "httpRequest": {
+ "method": "POST",
+ "path": "/v0/appTEST123/TestTable/listRecords",
+ "body": {
+ "type": "JSON",
+ "json": "{\"offset\":\"itrPAGE3/recPAGE3\",\"pageSize\":2}",
+ "matchType": "STRICT"
+ }
+ },
+ "httpResponse": {
+ "body": {
+ "records": [
+ {
+ "id": "rec005",
+ "createdTime": "2024-01-05T00:00:00.000Z",
+ "fields": {
+ "Name": "Name005",
+ "Age": 25,
+ "Status": "Active"
+ }
+ }
+ ]
+ }
+ }
+ },
+ {
+ "httpRequest": {
+ "method": "POST",
+ "path": "/v0/appTEST123/SinkTable"
+ },
+ "httpResponse": {
+ "statusCode": 200,
+ "body": {
+ "records": [
+ {
+ "id": "recNew001",
+ "fields": {}
+ }
+ ]
+ }
+ }
}
]
\ No newline at end of file