This is an automated email from the ASF dual-hosted git repository.
davidzollo pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 93039e90fc [Feature][Connector] Add Salesforce Source Connector
(#10938)
93039e90fc is described below
commit 93039e90fc2a46250edf0f5f690e84e7651738ba
Author: NixonWahome <[email protected]>
AuthorDate: Tue Jun 9 12:10:40 2026 +0300
[Feature][Connector] Add Salesforce Source Connector (#10938)
---
.github/workflows/labeler/label-scope-conf.yml | 5 +
config/plugin_config | 1 +
docs/en/connector-v2/source/Salesforce.md | 116 +++++++
docs/zh/connector-v2/source/Salesforce.md | 115 +++++++
plugin-mapping.properties | 1 +
.../connector-salesforce/pom.xml | 59 ++++
.../salesforce/client/SalesforceClient.java | 342 +++++++++++++++++++++
.../salesforce/config/SalesforceParameters.java | 52 ++++
.../salesforce/config/SalesforceSourceOptions.java | 113 +++++++
.../salesforce/config/SalesforceTableConfig.java | 42 +++
.../exception/SalesforceConnectorErrorCode.java | 48 +++
.../exception/SalesforceConnectorException.java | 37 +++
.../salesforce/source/SalesforceSource.java | 162 ++++++++++
.../salesforce/source/SalesforceSourceFactory.java | 78 +++++
.../salesforce/source/SalesforceSourceReader.java | 144 +++++++++
.../salesforce/SalesforceSourceFactoryTest.java | 167 ++++++++++
.../salesforce/client/SalesforceClientTest.java | 245 +++++++++++++++
seatunnel-connectors-v2/pom.xml | 1 +
seatunnel-dist/pom.xml | 6 +
19 files changed, 1734 insertions(+)
diff --git a/.github/workflows/labeler/label-scope-conf.yml
b/.github/workflows/labeler/label-scope-conf.yml
index bd0e87633a..d8e386907b 100644
--- a/.github/workflows/labeler/label-scope-conf.yml
+++ b/.github/workflows/labeler/label-scope-conf.yml
@@ -83,6 +83,11 @@ cassandra:
- changed-files:
- any-glob-to-any-file:
seatunnel-connectors-v2/connector-cassandra/**
- all-globs-to-all-files:
'!seatunnel-connectors-v2/connector-!(cassandra)/**'
+salesforce:
+ - all:
+ - changed-files:
+ - any-glob-to-any-file:
seatunnel-connectors-v2/connector-salesforce/**
+ - all-globs-to-all-files:
'!seatunnel-connectors-v2/connector-!(salesforce)/**'
cdc:
- all:
- changed-files:
diff --git a/config/plugin_config b/config/plugin_config
index 88402cbf45..fc6bf5c7c2 100644
--- a/config/plugin_config
+++ b/config/plugin_config
@@ -23,6 +23,7 @@
connector-amazondynamodb
connector-assert
connector-cassandra
+connector-salesforce
connector-cdc-mysql
connector-cdc-mongodb
connector-cdc-sqlserver
diff --git a/docs/en/connector-v2/source/Salesforce.md
b/docs/en/connector-v2/source/Salesforce.md
new file mode 100644
index 0000000000..ab87e5abcb
--- /dev/null
+++ b/docs/en/connector-v2/source/Salesforce.md
@@ -0,0 +1,116 @@
+# Salesforce
+
+> Salesforce source connector
+
+## Support Those Engines
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+## Key Features
+
+- [x] [batch](../../introduction/concepts/connector-v2-features.md)
+- [ ] [stream](../../introduction/concepts/connector-v2-features.md)
+- [ ] [exactly-once](../../introduction/concepts/connector-v2-features.md)
+- [x] [column projection](../../introduction/concepts/connector-v2-features.md)
+- [ ] [parallelism](../../introduction/concepts/connector-v2-features.md)
+- [x] [support multiple table
read](../../introduction/concepts/connector-v2-features.md)
+
+## Description
+
+Reads data from Salesforce objects using the Salesforce Bulk API 2.0. Supports
+single-object and multi-object (tables_configs) batch ingestion.
+
+Authentication uses the OAuth 2.0 Username-Password flow.
+
+## Supported DataSource Info
+
+| Datasource | Supported Versions |
+|------------|-------------------|
+| Salesforce | REST API v50.0+ |
+
+## Source Options
+
+| Name | Type | Required | Default | Description
|
+|--------------------|---------|----------|---------|-----------------------------------------------------------------------------------------------|
+| client_id | String | Yes | - | Salesforce Connected App
client ID (consumer key). |
+| client_secret | String | Yes | - | Salesforce Connected App
client secret. |
+| username | String | Yes | - | Salesforce username.
|
+| password | String | Yes | - | Salesforce password.
|
+| security_token | String | No | "" | Salesforce security
token appended to the password. Leave empty if your org IP is trusted. |
+| instance_url | String | Yes | - | Salesforce instance URL,
e.g. `https://yourorg.salesforce.com`. |
+| api_version | String | No | v59.0 | Salesforce REST API
version. |
+| object_name | String | No* | - | Salesforce object for
single-object mode, e.g. `Account`. Exclusive with `tables_configs`. |
+| tables_configs | List | No* | - | Multi-object
configuration list. Each entry requires `table_path`. Exclusive with
`object_name`. |
+| filter | String | No | - | SOQL WHERE clause
appended to the auto-built `SELECT FIELDS(ALL) FROM <object>` query. |
+| request_timeout_ms | Integer | No | 60000 | HTTP request timeout in
milliseconds. |
+| poll_interval_ms | Long | No | 5000 | Interval between Bulk
API job status polls (ms). |
+| job_completion_timeout_ms | Long | No | 3600000 | Maximum time (ms) to
wait for a Bulk API job to reach a terminal state. Default 60 minutes. |
+
+\* Exactly one of `object_name` or `tables_configs` must be provided.
+
+The connector always issues `SELECT FIELDS(ALL) FROM <object> [WHERE <filter>]`
+so the emitted rows stay positionally aligned with the schema produced from
+`/describe`. Custom projection-style queries are intentionally out of scope for
+this first version.
+
+### tables_configs entry options
+
+| Name | Type | Required | Description
|
+|-------------|--------|----------|-------------------------------------------------------------------|
+| table_path | String | Yes | Format: `database.ObjectName`, e.g.
`salesforce.Account`. |
+| filter | String | No | SOQL WHERE clause for this object.
|
+
+## Example
+
+### Single-object
+
+```hocon
+source {
+ Salesforce {
+ client_id = "your_client_id"
+ client_secret = "your_client_secret"
+ username = "[email protected]"
+ password = "yourpassword"
+ instance_url = "https://yourorg.salesforce.com"
+
+ object_name = "Account"
+ filter = "AnnualRevenue > 1000000"
+ }
+}
+```
+
+### Multi-object
+
+```hocon
+source {
+ Salesforce {
+ client_id = "your_client_id"
+ client_secret = "your_client_secret"
+ username = "[email protected]"
+ password = "yourpassword"
+ instance_url = "https://yourorg.salesforce.com"
+
+ tables_configs = [
+ {
+ table_path = "salesforce.Account"
+ filter = "AnnualRevenue > 1000000"
+ },
+ {
+ table_path = "salesforce.Contact"
+ filter = "IsDeleted = false"
+ },
+ {
+ table_path = "salesforce.Opportunity"
+ }
+ ]
+ }
+}
+```
+
+## Changelog
+
+### next version
+
+- Add Salesforce source connector with Bulk API 2.0 and multi-object support
diff --git a/docs/zh/connector-v2/source/Salesforce.md
b/docs/zh/connector-v2/source/Salesforce.md
new file mode 100644
index 0000000000..ac6160dce3
--- /dev/null
+++ b/docs/zh/connector-v2/source/Salesforce.md
@@ -0,0 +1,115 @@
+# Salesforce
+
+> Salesforce 源连接器
+
+## 支持的引擎
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+## 关键特性
+
+- [x] [批处理](../../introduction/concepts/connector-v2-features.md)
+- [ ] [流处理](../../introduction/concepts/connector-v2-features.md)
+- [ ] [精确一次](../../introduction/concepts/connector-v2-features.md)
+- [x] [列投影](../../introduction/concepts/connector-v2-features.md)
+- [ ] [并行度](../../introduction/concepts/connector-v2-features.md)
+- [x] [支持多表读取](../../introduction/concepts/connector-v2-features.md)
+
+## 描述
+
+使用 Salesforce Bulk API 2.0 从 Salesforce 对象读取数据。支持单对象模式以及通过
+`tables_configs` 进行多对象批量读取。
+
+身份验证使用 OAuth 2.0 用户名-密码模式。
+
+## 支持的数据源信息
+
+| 数据源 | 支持的版本 |
+|------------|------------------|
+| Salesforce | REST API v50.0+ |
+
+## 源选项
+
+| 名称 | 类型 | 必填 | 默认值 | 描述
|
+|--------------------|---------|-------|---------|-----------------------------------------------------------------------------------------------|
+| client_id | String | 是 | - | Salesforce Connected App
的客户端 ID (consumer key)。 |
+| client_secret | String | 是 | - | Salesforce Connected App
的客户端密钥 (consumer secret)。 |
+| username | String | 是 | - | Salesforce 用户名。
|
+| password | String | 是 | - | Salesforce 密码。
|
+| security_token | String | 否 | "" | 附加到密码后用于身份验证的安全令牌。如果组织 IP
受信任则可留空。 |
+| instance_url | String | 是 | - | Salesforce 实例 URL,例如
`https://yourorg.salesforce.com`。 |
+| api_version | String | 否 | v59.0 | Salesforce REST API 版本。
|
+| object_name | String | 否\* | - | 单对象模式下要读取的 Salesforce 对象,例如
`Account`。与 `tables_configs` 互斥。 |
+| tables_configs | List | 否\* | - | 多对象配置列表。每一项必须提供
`table_path`。与 `object_name` 互斥。 |
+| filter | String | 否 | - | 附加到自动构造的 `SELECT FIELDS(ALL)
FROM <object>` 查询上的 SOQL WHERE 条件。 |
+| request_timeout_ms | Integer | 否 | 60000 | HTTP 请求超时时间(毫秒)。
|
+| poll_interval_ms | Long | 否 | 5000 | Bulk API 作业状态轮询间隔(毫秒)。
|
+| job_completion_timeout_ms | Long | 否 | 3600000 | 等待 Bulk API
作业达到终止状态的最长时间(毫秒)。默认 60 分钟。 |
+
+\* `object_name` 与 `tables_configs` 必须且只能提供其中一个。
+
+连接器始终发起 `SELECT FIELDS(ALL) FROM <object> [WHERE <filter>]` 查询,以保证
+发出的行与基于 `/describe` 构造的 schema 在位置上保持一致。自定义投影式查询暂不
+属于此版本的支持范围。
+
+### tables_configs 项选项
+
+| 名称 | 类型 | 必填 | 描述
|
+|-------------|--------|------|---------------------------------------------------------------------|
+| table_path | String | 是 | 格式为 `database.ObjectName`,例如
`salesforce.Account`。 |
+| filter | String | 否 | 针对该对象的 SOQL WHERE 条件。
|
+
+## 示例
+
+### 单对象
+
+```hocon
+source {
+ Salesforce {
+ client_id = "your_client_id"
+ client_secret = "your_client_secret"
+ username = "[email protected]"
+ password = "yourpassword"
+ instance_url = "https://yourorg.salesforce.com"
+
+ object_name = "Account"
+ filter = "AnnualRevenue > 1000000"
+ }
+}
+```
+
+### 多对象
+
+```hocon
+source {
+ Salesforce {
+ client_id = "your_client_id"
+ client_secret = "your_client_secret"
+ username = "[email protected]"
+ password = "yourpassword"
+ instance_url = "https://yourorg.salesforce.com"
+
+ tables_configs = [
+ {
+ table_path = "salesforce.Account"
+ filter = "AnnualRevenue > 1000000"
+ },
+ {
+ table_path = "salesforce.Contact"
+ filter = "IsDeleted = false"
+ },
+ {
+ table_path = "salesforce.Opportunity"
+ }
+ ]
+ }
+}
+```
+
+## 变更日志
+
+### 下一个版本
+
+- 新增 Salesforce 源连接器,基于 Bulk API 2.0,支持多对象读取
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index fe181d223c..800f50b7be 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -84,6 +84,7 @@ seatunnel.source.AmazonDynamodb = connector-amazondynamodb
seatunnel.sink.AmazonDynamodb = connector-amazondynamodb
seatunnel.source.Cassandra = connector-cassandra
seatunnel.sink.Cassandra = connector-cassandra
+seatunnel.source.Salesforce = connector-salesforce
seatunnel.sink.StarRocks = connector-starrocks
seatunnel.source.MyHours = connector-http-myhours
seatunnel.sink.InfluxDB = connector-influxdb
diff --git a/seatunnel-connectors-v2/connector-salesforce/pom.xml
b/seatunnel-connectors-v2/connector-salesforce/pom.xml
new file mode 100644
index 0000000000..854e733ca0
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-salesforce/pom.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connectors-v2</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>connector-salesforce</artifactId>
+ <name>SeaTunnel : Connectors V2 : Salesforce</name>
+
+ <properties>
+ <httpclient.version>4.5.13</httpclient.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>${httpclient.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-csv</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git
a/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/client/SalesforceClient.java
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/client/SalesforceClient.java
new file mode 100644
index 0000000000..03aa7c5509
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/client/SalesforceClient.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.salesforce.client;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import
org.apache.seatunnel.connectors.seatunnel.salesforce.config.SalesforceParameters;
+import
org.apache.seatunnel.connectors.seatunnel.salesforce.exception.SalesforceConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.salesforce.exception.SalesforceConnectorException;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+import org.apache.http.Header;
+import org.apache.http.HttpHeaders;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.StringReader;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+
+@Slf4j
+public class SalesforceClient implements Closeable {
+
+ private static final String TOKEN_PATH = "/services/oauth2/token";
+ private static final String JOBS_QUERY_PATH =
"/services/data/%s/jobs/query";
+ private static final String JOB_PATH = "/services/data/%s/jobs/query/%s";
+ private static final String JOB_RESULTS_PATH =
"/services/data/%s/jobs/query/%s/results";
+ private static final String DESCRIBE_PATH =
"/services/data/%s/sobjects/%s/describe";
+ private static final String PLUGIN_NAME = "Salesforce";
+
+ private final SalesforceParameters params;
+ private final CloseableHttpClient httpClient;
+ private final ObjectMapper objectMapper = new ObjectMapper();
+ private String accessToken;
+ private String authorizedInstanceUrl;
+
+ public SalesforceClient(SalesforceParameters params) {
+ this.params = params;
+ RequestConfig requestConfig =
+ RequestConfig.custom()
+ .setConnectTimeout(params.getRequestTimeoutMs())
+ .setSocketTimeout(params.getRequestTimeoutMs())
+ .build();
+ this.httpClient =
HttpClients.custom().setDefaultRequestConfig(requestConfig).build();
+ }
+
+ public void authenticate() {
+ String tokenUrl = params.getInstanceUrl() + TOKEN_PATH;
+ HttpPost post = new HttpPost(tokenUrl);
+
+ List<NameValuePair> form = new ArrayList<>();
+ form.add(new BasicNameValuePair("grant_type", "password"));
+ form.add(new BasicNameValuePair("client_id", params.getClientId()));
+ form.add(new BasicNameValuePair("client_secret",
params.getClientSecret()));
+ form.add(new BasicNameValuePair("username", params.getUsername()));
+ form.add(
+ new BasicNameValuePair(
+ "password", params.getPassword() +
params.getSecurityToken()));
+
+ try {
+ post.setEntity(new UrlEncodedFormEntity(form,
StandardCharsets.UTF_8));
+ try (CloseableHttpResponse response = httpClient.execute(post)) {
+ int status = response.getStatusLine().getStatusCode();
+ String body = EntityUtils.toString(response.getEntity(),
StandardCharsets.UTF_8);
+ if (status != 200) {
+ throw new SalesforceConnectorException(
+ SalesforceConnectorErrorCode.AUTH_FAILED,
+ "HTTP " + status + ": " + body);
+ }
+ JsonNode json = objectMapper.readTree(body);
+ this.accessToken = json.get("access_token").asText();
+ this.authorizedInstanceUrl = json.get("instance_url").asText();
+ log.info("Authenticated with Salesforce instance {}",
authorizedInstanceUrl);
+ }
+ } catch (SalesforceConnectorException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new
SalesforceConnectorException(SalesforceConnectorErrorCode.AUTH_FAILED, e);
+ }
+ }
+
+ public CatalogTable describeObject(String database, String objectName) {
+ String url =
+ authorizedInstanceUrl
+ + String.format(DESCRIBE_PATH, params.getApiVersion(),
objectName);
+ HttpGet get = new HttpGet(url);
+ get.setHeader(HttpHeaders.AUTHORIZATION, "Bearer " + accessToken);
+
+ try (CloseableHttpResponse response = httpClient.execute(get)) {
+ int status = response.getStatusLine().getStatusCode();
+ String body = EntityUtils.toString(response.getEntity(),
StandardCharsets.UTF_8);
+ if (status != 200) {
+ throw new SalesforceConnectorException(
+ SalesforceConnectorErrorCode.DESCRIBE_OBJECT_FAILED,
+ "HTTP " + status + " describing " + objectName + ": "
+ body);
+ }
+ return buildCatalogTable(database, objectName,
objectMapper.readTree(body));
+ } catch (SalesforceConnectorException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new SalesforceConnectorException(
+ SalesforceConnectorErrorCode.DESCRIBE_OBJECT_FAILED, e);
+ }
+ }
+
+ private CatalogTable buildCatalogTable(String database, String objectName,
JsonNode describe) {
+ TableSchema.Builder schemaBuilder = TableSchema.builder();
+ for (JsonNode field : describe.get("fields")) {
+ String name = field.get("name").asText();
+ String sfType = field.get("type").asText();
+ SeaTunnelDataType<?> seaType = mapSalesforceType(sfType);
+ schemaBuilder.column(PhysicalColumn.of(name, seaType, null, null,
true, null, null));
+ }
+ return CatalogTable.of(
+ TableIdentifier.of(PLUGIN_NAME, database, objectName),
+ schemaBuilder.build(),
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ "");
+ }
+
+ private SeaTunnelDataType<?> mapSalesforceType(String sfType) {
+ switch (sfType) {
+ case "boolean":
+ return BasicType.BOOLEAN_TYPE;
+ case "int":
+ return BasicType.INT_TYPE;
+ case "double":
+ case "currency":
+ case "percent":
+ return BasicType.DOUBLE_TYPE;
+ case "date":
+ return LocalTimeType.LOCAL_DATE_TYPE;
+ case "datetime":
+ return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+ case "time":
+ return LocalTimeType.LOCAL_TIME_TYPE;
+ case "base64":
+ return PrimitiveByteArrayType.INSTANCE;
+ default:
+ return BasicType.STRING_TYPE;
+ }
+ }
+
+ /**
+ * Runs one Bulk API query end-to-end: creates the job, polls until it
reaches a terminal state,
+ * then streams the result pages to rowConsumer. Rows are emitted as they
are parsed rather than
+ * buffered, so callers can forward each row to a downstream collector
without holding the full
+ * result set in memory.
+ */
+ public void executeBulkQuery(String soql, int columnCount,
Consumer<Object[]> rowConsumer) {
+ String jobId = createBulkQueryJob(soql);
+ waitForJobCompletion(jobId);
+ downloadResults(jobId, columnCount, rowConsumer);
+ }
+
+ private String createBulkQueryJob(String soql) {
+ String url = authorizedInstanceUrl + String.format(JOBS_QUERY_PATH,
params.getApiVersion());
+ HttpPost post = new HttpPost(url);
+ post.setHeader(HttpHeaders.AUTHORIZATION, "Bearer " + accessToken);
+ post.setHeader(HttpHeaders.CONTENT_TYPE,
ContentType.APPLICATION_JSON.getMimeType());
+ post.setHeader(HttpHeaders.ACCEPT,
ContentType.APPLICATION_JSON.getMimeType());
+
+ try {
+ ObjectNode body = objectMapper.createObjectNode();
+ body.put("operation", "query");
+ body.put("query", soql);
+ post.setEntity(
+ new StringEntity(
+ objectMapper.writeValueAsString(body),
ContentType.APPLICATION_JSON));
+
+ try (CloseableHttpResponse response = httpClient.execute(post)) {
+ int status = response.getStatusLine().getStatusCode();
+ String responseBody =
+ EntityUtils.toString(response.getEntity(),
StandardCharsets.UTF_8);
+ if (status != 200) {
+ throw new SalesforceConnectorException(
+
SalesforceConnectorErrorCode.BULK_JOB_CREATE_FAILED,
+ "HTTP " + status + ": " + responseBody);
+ }
+ String jobId =
objectMapper.readTree(responseBody).get("id").asText();
+ log.info("Created Bulk API query job {} for SOQL: {}", jobId,
soql);
+ return jobId;
+ }
+ } catch (SalesforceConnectorException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new SalesforceConnectorException(
+ SalesforceConnectorErrorCode.BULK_JOB_CREATE_FAILED, e);
+ }
+ }
+
+ private void waitForJobCompletion(String jobId) {
+ String url = authorizedInstanceUrl + String.format(JOB_PATH,
params.getApiVersion(), jobId);
+ long deadline = System.currentTimeMillis() +
params.getJobCompletionTimeoutMs();
+ while (true) {
+ try {
+ Thread.sleep(params.getPollIntervalMs());
+ HttpGet get = new HttpGet(url);
+ get.setHeader(HttpHeaders.AUTHORIZATION, "Bearer " +
accessToken);
+ try (CloseableHttpResponse response = httpClient.execute(get))
{
+ String responseBody =
+ EntityUtils.toString(response.getEntity(),
StandardCharsets.UTF_8);
+ String state =
objectMapper.readTree(responseBody).get("state").asText();
+ log.debug("Bulk API job {} state: {}", jobId, state);
+ if ("JobComplete".equals(state)) {
+ return;
+ }
+ if ("Failed".equals(state) || "Aborted".equals(state)) {
+ throw new SalesforceConnectorException(
+ SalesforceConnectorErrorCode.BULK_JOB_FAILED,
+ "Job " + jobId + " ended with state: " +
state);
+ }
+ }
+ if (System.currentTimeMillis() > deadline) {
+ throw new SalesforceConnectorException(
+ SalesforceConnectorErrorCode.BULK_JOB_FAILED,
+ "Job "
+ + jobId
+ + " did not complete within "
+ + params.getJobCompletionTimeoutMs()
+ + "ms");
+ }
+ } catch (SalesforceConnectorException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new SalesforceConnectorException(
+ SalesforceConnectorErrorCode.BULK_JOB_FAILED, e);
+ }
+ }
+ }
+
+ /**
+ * Pulls every result page for a completed Bulk API query job, walking
forward via the
+ * Sforce-Locator header. Each parsed row is pushed to rowConsumer
immediately; no whole-job
+ * buffering happens at this layer, only one page's CSV body is held at a
time.
+ */
+ private void downloadResults(String jobId, int columnCount,
Consumer<Object[]> rowConsumer) {
+ String url =
+ authorizedInstanceUrl
+ + String.format(JOB_RESULTS_PATH,
params.getApiVersion(), jobId);
+ String locator = null;
+
+ do {
+ String requestUrl = locator == null ? url : url + "?locator=" +
locator;
+ HttpGet get = new HttpGet(requestUrl);
+ get.setHeader(HttpHeaders.AUTHORIZATION, "Bearer " + accessToken);
+ get.setHeader(HttpHeaders.ACCEPT, "text/csv");
+
+ try (CloseableHttpResponse response = httpClient.execute(get)) {
+ int status = response.getStatusLine().getStatusCode();
+ String body = EntityUtils.toString(response.getEntity(),
StandardCharsets.UTF_8);
+ if (status != 200) {
+ throw new SalesforceConnectorException(
+ SalesforceConnectorErrorCode.BULK_RESULTS_FAILED,
+ "HTTP " + status + ": " + body);
+ }
+ Header locatorHeader =
response.getFirstHeader("Sforce-Locator");
+ locator =
+ (locatorHeader != null &&
!"null".equals(locatorHeader.getValue()))
+ ? locatorHeader.getValue()
+ : null;
+ parseCsvInto(body, columnCount, rowConsumer);
+ } catch (SalesforceConnectorException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new SalesforceConnectorException(
+ SalesforceConnectorErrorCode.BULK_RESULTS_FAILED, e);
+ }
+ } while (locator != null);
+ }
+
+ /**
+ * Parses one Bulk API CSV result page (header row plus data rows) and
emits each data row to
+ * rowConsumer as a fixed-width Object[]. Missing trailing columns are
filled with null and
+ * empty cells also become null, so the type-converting reader can
distinguish them from real
+ * string values.
+ */
+ private void parseCsvInto(String csv, int columnCount, Consumer<Object[]>
rowConsumer)
+ throws IOException {
+ try (CSVParser parser =
+ CSVFormat.DEFAULT.withFirstRecordAsHeader().parse(new
StringReader(csv))) {
+ for (CSVRecord record : parser) {
+ Object[] row = new Object[columnCount];
+ for (int i = 0; i < columnCount; i++) {
+ String val = i < record.size() ? record.get(i) : null;
+ row[i] = (val == null || val.isEmpty()) ? null : val;
+ }
+ rowConsumer.accept(row);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ httpClient.close();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/config/SalesforceParameters.java
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/config/SalesforceParameters.java
new file mode 100644
index 0000000000..c452936823
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/config/SalesforceParameters.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.salesforce.config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import lombok.Getter;
+
+import java.io.Serializable;
+
+@Getter
+public class SalesforceParameters implements Serializable {
+
+ private String clientId;
+ private String clientSecret;
+ private String username;
+ private String password;
+ private String securityToken;
+ private String instanceUrl;
+ private String apiVersion;
+ private int requestTimeoutMs;
+ private long pollIntervalMs;
+ private long jobCompletionTimeoutMs;
+
+ public void buildWithConfig(ReadonlyConfig config) {
+ this.clientId = config.get(SalesforceSourceOptions.CLIENT_ID);
+ this.clientSecret = config.get(SalesforceSourceOptions.CLIENT_SECRET);
+ this.username = config.get(SalesforceSourceOptions.USERNAME);
+ this.password = config.get(SalesforceSourceOptions.PASSWORD);
+ this.securityToken =
config.get(SalesforceSourceOptions.SECURITY_TOKEN);
+ this.instanceUrl = config.get(SalesforceSourceOptions.INSTANCE_URL);
+ this.apiVersion = config.get(SalesforceSourceOptions.API_VERSION);
+ this.requestTimeoutMs =
config.get(SalesforceSourceOptions.REQUEST_TIMEOUT_MS);
+ this.pollIntervalMs =
config.get(SalesforceSourceOptions.POLL_INTERVAL_MS);
+ this.jobCompletionTimeoutMs =
config.get(SalesforceSourceOptions.JOB_COMPLETION_TIMEOUT_MS);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/config/SalesforceSourceOptions.java
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/config/SalesforceSourceOptions.java
new file mode 100644
index 0000000000..614de8bd50
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/config/SalesforceSourceOptions.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.salesforce.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class SalesforceSourceOptions {
+
+ public static final Option<String> CLIENT_ID =
+ Options.key("client_id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Salesforce Connected App client ID
(consumer key).");
+
+ public static final Option<String> CLIENT_SECRET =
+ Options.key("client_secret")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Salesforce Connected App client secret
(consumer secret).");
+
+ public static final Option<String> USERNAME =
+ Options.key("username")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Salesforce username.");
+
+ public static final Option<String> PASSWORD =
+ Options.key("password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Salesforce password.");
+
+ public static final Option<String> SECURITY_TOKEN =
+ Options.key("security_token")
+ .stringType()
+ .defaultValue("")
+ .withDescription(
+ "Salesforce security token appended to the
password during "
+ + "authentication. Leave empty if your org
IP is trusted.");
+
+ public static final Option<String> INSTANCE_URL =
+ Options.key("instance_url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Salesforce instance URL, e.g.
https://yourorg.salesforce.com.");
+
+ public static final Option<String> API_VERSION =
+ Options.key("api_version")
+ .stringType()
+ .defaultValue("v59.0")
+ .withDescription("Salesforce REST API version, e.g.
v59.0.");
+
+ public static final Option<String> OBJECT_NAME =
+ Options.key("object_name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Salesforce object name for single-object mode,
e.g. Account. "
+ + "Mutually exclusive with
tables_configs.");
+
+ public static final Option<String> FILTER =
+ Options.key("filter")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "SOQL WHERE clause appended to the auto-built "
+ + "SELECT FIELDS(ALL) FROM <object>
query.");
+
+ public static final Option<Integer> REQUEST_TIMEOUT_MS =
+ Options.key("request_timeout_ms")
+ .intType()
+ .defaultValue(60000)
+ .withDescription("HTTP request timeout in milliseconds.");
+
+ public static final Option<Long> POLL_INTERVAL_MS =
+ Options.key("poll_interval_ms")
+ .longType()
+ .defaultValue(5000L)
+ .withDescription("Interval in milliseconds between Bulk
API job status polls.");
+
+ public static final Option<Long> JOB_COMPLETION_TIMEOUT_MS =
+ Options.key("job_completion_timeout_ms")
+ .longType()
+ .defaultValue(3_600_000L)
+ .withDescription(
+ "Maximum time in milliseconds to wait for a Bulk
API job to reach a "
+ + "terminal state. Default 3600000 (60
minutes).");
+
+ public static final Option<String> TABLE_PATH =
+ Options.key("table_path")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Salesforce object path in 'database.ObjectName'
format, "
+ + "e.g. salesforce.Account. Used in
tables_configs entries.");
+}
diff --git
a/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/config/SalesforceTableConfig.java
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/config/SalesforceTableConfig.java
new file mode 100644
index 0000000000..f088014821
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/config/SalesforceTableConfig.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.salesforce.config;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+
+import lombok.Getter;
+
+import java.io.Serializable;
+
+@Getter
+public class SalesforceTableConfig implements Serializable {
+
+ private final String soql;
+ private final String objectName;
+ private final CatalogTable catalogTable;
+
+ public SalesforceTableConfig(String soql, String objectName, CatalogTable
catalogTable) {
+ this.soql = soql;
+ this.objectName = objectName;
+ this.catalogTable = catalogTable;
+ }
+
+ public String getTableId() {
+ return catalogTable.getTableId().toTablePath().toString();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/exception/SalesforceConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/exception/SalesforceConnectorErrorCode.java
new file mode 100644
index 0000000000..0ee618b4c0
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/exception/SalesforceConnectorErrorCode.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.salesforce.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum SalesforceConnectorErrorCode implements SeaTunnelErrorCode {
+ AUTH_FAILED("SALESFORCE-01", "Failed to authenticate with Salesforce"),
+ DESCRIBE_OBJECT_FAILED("SALESFORCE-02", "Failed to describe Salesforce
object schema"),
+ BULK_JOB_CREATE_FAILED("SALESFORCE-03", "Failed to create Salesforce Bulk
API query job"),
+ BULK_JOB_FAILED("SALESFORCE-04", "Salesforce Bulk API query job failed or
was aborted"),
+ BULK_RESULTS_FAILED("SALESFORCE-05", "Failed to retrieve Salesforce Bulk
API job results"),
+ INVALID_TABLE_PATH("SALESFORCE-06", "Invalid table_path; expected format:
database.ObjectName"),
+ DUPLICATE_OBJECT("SALESFORCE-07", "Duplicate object found in
tables_configs");
+
+ private final String code;
+ private final String description;
+
+ SalesforceConnectorErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return code;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/exception/SalesforceConnectorException.java
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/exception/SalesforceConnectorException.java
new file mode 100644
index 0000000000..edc0e349e6
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/exception/SalesforceConnectorException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.salesforce.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class SalesforceConnectorException extends SeaTunnelRuntimeException {
+ public SalesforceConnectorException(
+ SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
+ super(seaTunnelErrorCode, errorMessage);
+ }
+
+ public SalesforceConnectorException(
+ SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage,
Throwable cause) {
+ super(seaTunnelErrorCode, errorMessage, cause);
+ }
+
+ public SalesforceConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
Throwable cause) {
+ super(seaTunnelErrorCode, cause);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/source/SalesforceSource.java
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/source/SalesforceSource.java
new file mode 100644
index 0000000000..890c711aa1
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/source/SalesforceSource.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.salesforce.source;
+
+import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.options.ConnectorCommonOptions;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SupportColumnProjection;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import
org.apache.seatunnel.connectors.seatunnel.salesforce.client.SalesforceClient;
+import
org.apache.seatunnel.connectors.seatunnel.salesforce.config.SalesforceParameters;
+import
org.apache.seatunnel.connectors.seatunnel.salesforce.config.SalesforceSourceOptions;
+import
org.apache.seatunnel.connectors.seatunnel.salesforce.config.SalesforceTableConfig;
+import
org.apache.seatunnel.connectors.seatunnel.salesforce.exception.SalesforceConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.salesforce.exception.SalesforceConnectorException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class SalesforceSource extends AbstractSingleSplitSource<SeaTunnelRow>
+ implements SupportColumnProjection {
+
+ private static final String PLUGIN_NAME = "Salesforce";
+
+ private final SalesforceParameters params;
+ private final List<SalesforceTableConfig> tableConfigs;
+
+ public SalesforceSource(SalesforceParameters params, ReadonlyConfig
config) {
+ this.params = params;
+ this.tableConfigs = buildTableConfigs(params, config);
+ }
+
+ /**
+ * Resolves the user-facing object_name / tables_configs options into one
or more
+ * SalesforceTableConfig instances, calling /describe per object to derive
the CatalogTable
+ * schema. Runs once during factory createSource and uses a one-shot
SalesforceClient whose
+ * lifetime is scoped to this call (try-with-resources).
+ */
+ private List<SalesforceTableConfig> buildTableConfigs(
+ SalesforceParameters params, ReadonlyConfig config) {
+ try (SalesforceClient client = new SalesforceClient(params)) {
+ client.authenticate();
+
+ if
(config.getOptional(ConnectorCommonOptions.TABLE_CONFIGS).isPresent()) {
+ List<Map<String, Object>> tableConfigMaps =
+ config.get(ConnectorCommonOptions.TABLE_CONFIGS);
+ List<SalesforceTableConfig> configs = new ArrayList<>();
+ for (Map<String, Object> map : tableConfigMaps) {
+ ReadonlyConfig tableConfig = ReadonlyConfig.fromMap(map);
+ SalesforceTableConfig built =
buildOneTableConfig(tableConfig, client);
+ String tableId = built.getTableId();
+ boolean duplicate =
+ configs.stream().anyMatch(c ->
c.getTableId().equals(tableId));
+ if (duplicate) {
+ throw new SalesforceConnectorException(
+ SalesforceConnectorErrorCode.DUPLICATE_OBJECT,
+ "Duplicate object in tables_configs: " +
tableId);
+ }
+ configs.add(built);
+ }
+ return configs;
+ } else {
+ String objectName =
config.get(SalesforceSourceOptions.OBJECT_NAME);
+ String soql = buildSoql(config, objectName);
+ CatalogTable table = client.describeObject(PLUGIN_NAME,
objectName);
+ return Collections.singletonList(
+ new SalesforceTableConfig(soql, objectName, table));
+ }
+ } catch (SalesforceConnectorException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new SalesforceConnectorException(
+ SalesforceConnectorErrorCode.DESCRIBE_OBJECT_FAILED,
+ "Failed to build Salesforce table configs",
+ e);
+ }
+ }
+
+ private SalesforceTableConfig buildOneTableConfig(
+ ReadonlyConfig tableConfig, SalesforceClient client) {
+ String tablePath =
+ tableConfig
+ .getOptional(SalesforceSourceOptions.TABLE_PATH)
+ .orElseThrow(
+ () ->
+ new SalesforceConnectorException(
+
SalesforceConnectorErrorCode.INVALID_TABLE_PATH,
+ "table_path is required in
tables_configs"));
+ String[] parts = tablePath.split("\\.", 2);
+ if (parts.length != 2 || StringUtils.isBlank(parts[1])) {
+ throw new SalesforceConnectorException(
+ SalesforceConnectorErrorCode.INVALID_TABLE_PATH,
+ "table_path must be 'database.ObjectName', got: " +
tablePath);
+ }
+ String database = parts[0];
+ String objectName = parts[1];
+ String soql = buildSoql(tableConfig, objectName);
+ CatalogTable table = client.describeObject(database, objectName);
+ return new SalesforceTableConfig(soql, objectName, table);
+ }
+
+ /**
+ * Builds the SOQL the Bulk API job will run for one object. Always
selects every
+ * describe-discovered field via FIELDS(ALL) so the emitted rows stay
positionally aligned with
+ * the CatalogTable schema. An optional filter is appended verbatim as the
WHERE clause.
+ */
+ private String buildSoql(ReadonlyConfig config, String objectName) {
+ StringBuilder soql = new StringBuilder("SELECT FIELDS(ALL) FROM
").append(objectName);
+ String filter =
config.getOptional(SalesforceSourceOptions.FILTER).orElse(null);
+ if (filter != null) {
+ soql.append(" WHERE ").append(filter);
+ }
+ return soql.toString();
+ }
+
+ @Override
+ public String getPluginName() {
+ return PLUGIN_NAME;
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Override
+ public List<CatalogTable> getProducedCatalogTables() {
+ return tableConfigs.stream()
+ .map(SalesforceTableConfig::getCatalogTable)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public AbstractSingleSplitReader<SeaTunnelRow> createReader(
+ SingleSplitReaderContext readerContext) throws Exception {
+ return new SalesforceSourceReader(params, tableConfigs, readerContext);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/source/SalesforceSourceFactory.java
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/source/SalesforceSourceFactory.java
new file mode 100644
index 0000000000..d58422ec1f
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/source/SalesforceSourceFactory.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.salesforce.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.options.ConnectorCommonOptions;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import
org.apache.seatunnel.connectors.seatunnel.salesforce.config.SalesforceParameters;
+import
org.apache.seatunnel.connectors.seatunnel.salesforce.config.SalesforceSourceOptions;
+
+import com.google.auto.service.AutoService;
+
+import java.io.Serializable;
+
+@AutoService(Factory.class)
+public class SalesforceSourceFactory implements TableSourceFactory {
+
+ @Override
+ public String factoryIdentifier() {
+ return "Salesforce";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(
+ SalesforceSourceOptions.CLIENT_ID,
+ SalesforceSourceOptions.CLIENT_SECRET,
+ SalesforceSourceOptions.USERNAME,
+ SalesforceSourceOptions.PASSWORD,
+ SalesforceSourceOptions.INSTANCE_URL)
+ .exclusive(
+ SalesforceSourceOptions.OBJECT_NAME,
ConnectorCommonOptions.TABLE_CONFIGS)
+ .optional(
+ SalesforceSourceOptions.SECURITY_TOKEN,
+ SalesforceSourceOptions.API_VERSION,
+ SalesforceSourceOptions.FILTER,
+ SalesforceSourceOptions.REQUEST_TIMEOUT_MS,
+ SalesforceSourceOptions.POLL_INTERVAL_MS,
+ SalesforceSourceOptions.JOB_COMPLETION_TIMEOUT_MS)
+ .build();
+ }
+
+ @Override
+ public <T, SplitT extends SourceSplit, StateT extends Serializable>
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
+ SalesforceParameters params = new SalesforceParameters();
+ params.buildWithConfig(context.getOptions());
+ return () ->
+ (SeaTunnelSource<T, SplitT, StateT>)
+ new SalesforceSource(params, context.getOptions());
+ }
+
+ @Override
+ public Class<? extends SeaTunnelSource> getSourceClass() {
+ return SalesforceSource.class;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/source/SalesforceSourceReader.java
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/source/SalesforceSourceReader.java
new file mode 100644
index 0000000000..439b3d62c3
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/source/SalesforceSourceReader.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.salesforce.source;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import
org.apache.seatunnel.connectors.seatunnel.salesforce.client.SalesforceClient;
+import
org.apache.seatunnel.connectors.seatunnel.salesforce.config.SalesforceParameters;
+import
org.apache.seatunnel.connectors.seatunnel.salesforce.config.SalesforceTableConfig;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Base64;
+import java.util.List;
+
+@Slf4j
+public class SalesforceSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow> {
+
+ private static final DateTimeFormatter DATE_FMT =
DateTimeFormatter.ofPattern("yyyy-MM-dd");
+ private static final DateTimeFormatter DATETIME_FMT =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
+ private static final DateTimeFormatter TIME_FMT =
+ DateTimeFormatter.ofPattern("HH:mm:ss.SSS'Z'");
+
+ private final SalesforceParameters params;
+ private final List<SalesforceTableConfig> tableConfigs;
+ private final SingleSplitReaderContext readerContext;
+ private SalesforceClient client;
+
+ SalesforceSourceReader(
+ SalesforceParameters params,
+ List<SalesforceTableConfig> tableConfigs,
+ SingleSplitReaderContext readerContext) {
+ this.params = params;
+ this.tableConfigs = tableConfigs;
+ this.readerContext = readerContext;
+ }
+
+ @Override
+ public void open() throws Exception {
+ client = new SalesforceClient(params);
+ client.authenticate();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (client != null) {
+ client.close();
+ }
+ }
+
+ /**
+ * Single-pass bounded read for the assigned split. For each configured
table, submits one Bulk
+ * API query job and hands the client a per-row consumer that converts the
raw CSV Object[] to a
+ * SeaTunnelRow, tags it with the table id, and forwards it to the
downstream collector. Rows
+ * flow through without buffering the whole result set in the reader.
After every table has
+ * drained, signals no-more-elements so the framework can close the split.
+ */
+ @Override
+ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+ try {
+ for (SalesforceTableConfig tableConfig : tableConfigs) {
+ CatalogTable catalogTable = tableConfig.getCatalogTable();
+ SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
+ int columnCount = rowType.getTotalFields();
+ String tableId = tableConfig.getTableId();
+ log.info("Reading rows from Salesforce object {}",
tableConfig.getObjectName());
+ client.executeBulkQuery(
+ tableConfig.getSoql(),
+ columnCount,
+ raw -> {
+ SeaTunnelRow row = convertRow(raw, rowType);
+ row.setTableId(tableId);
+ output.collect(row);
+ });
+ }
+ } finally {
+ readerContext.signalNoMoreElement();
+ }
+ }
+
+ private SeaTunnelRow convertRow(Object[] raw, SeaTunnelRowType rowType) {
+ Object[] fields = new Object[raw.length];
+ for (int i = 0; i < raw.length; i++) {
+ String val = raw[i] == null ? null : raw[i].toString();
+ if (val == null) {
+ fields[i] = null;
+ continue;
+ }
+ SeaTunnelDataType<?> type = rowType.getFieldType(i);
+ fields[i] = parseValue(val, type);
+ }
+ return new SeaTunnelRow(fields);
+ }
+
+ private Object parseValue(String val, SeaTunnelDataType<?> type) {
+ switch (type.getSqlType()) {
+ case BOOLEAN:
+ return Boolean.parseBoolean(val);
+ case INT:
+ return Integer.parseInt(val);
+ case DOUBLE:
+ return Double.parseDouble(val);
+ case DATE:
+ return LocalDate.parse(val, DATE_FMT);
+ case TIMESTAMP:
+ return LocalDateTime.parse(val, DATETIME_FMT);
+ case TIME:
+ return LocalTime.parse(val, TIME_FMT);
+ case BYTES:
+ return Base64.getDecoder().decode(val);
+ default:
+ return val;
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {}
+}
diff --git
a/seatunnel-connectors-v2/connector-salesforce/src/test/java/org/apache/seatunnel/connectors/seatunnel/salesforce/SalesforceSourceFactoryTest.java
b/seatunnel-connectors-v2/connector-salesforce/src/test/java/org/apache/seatunnel/connectors/seatunnel/salesforce/SalesforceSourceFactoryTest.java
new file mode 100644
index 0000000000..a816cc52ae
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-salesforce/src/test/java/org/apache/seatunnel/connectors/seatunnel/salesforce/SalesforceSourceFactoryTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.salesforce;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.configuration.util.OptionValidationException;
+import
org.apache.seatunnel.connectors.seatunnel.salesforce.source.SalesforceSourceFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+class SalesforceSourceFactoryTest {
+
+ private static final SalesforceSourceFactory FACTORY = new
SalesforceSourceFactory();
+
+ @Test
+ void testOptionRuleIsNotNull() {
+ Assertions.assertNotNull(FACTORY.optionRule());
+ }
+
+ @Test
+ void testFactoryIdentifier() {
+ Assertions.assertEquals("Salesforce", FACTORY.factoryIdentifier());
+ }
+
+ @Test
+ void testSingleObjectModeValid() {
+ Map<String, Object> config = requiredAuthConfig();
+ config.put("object_name", "Account");
+
+ Assertions.assertDoesNotThrow(
+ () ->
+ ConfigValidator.of(ReadonlyConfig.fromMap(config))
+ .validate(FACTORY.optionRule()));
+ }
+
+ @Test
+ void testSingleObjectModeWithOptionalParamsValid() {
+ Map<String, Object> config = requiredAuthConfig();
+ config.put("object_name", "Account");
+ config.put("filter", "AnnualRevenue > 1000000");
+ config.put("api_version", "v58.0");
+ config.put("max_retries", 5);
+ config.put("request_timeout_ms", 30000);
+
+ Assertions.assertDoesNotThrow(
+ () ->
+ ConfigValidator.of(ReadonlyConfig.fromMap(config))
+ .validate(FACTORY.optionRule()));
+ }
+
+ @Test
+ void testMultiObjectModeValid() {
+ Map<String, Object> config = requiredAuthConfig();
+ config.put("tables_configs", multiObjectTablesConfigs());
+
+ Assertions.assertDoesNotThrow(
+ () ->
+ ConfigValidator.of(ReadonlyConfig.fromMap(config))
+ .validate(FACTORY.optionRule()));
+ }
+
+ @Test
+ void testBothObjectNameAndTablesConfigsThrows() {
+ Map<String, Object> config = requiredAuthConfig();
+ config.put("object_name", "Account");
+ config.put("tables_configs", multiObjectTablesConfigs());
+
+ Assertions.assertThrows(
+ OptionValidationException.class,
+ () ->
+ ConfigValidator.of(ReadonlyConfig.fromMap(config))
+ .validate(FACTORY.optionRule()));
+ }
+
+ @Test
+ void testNeitherObjectNameNorTablesConfigsThrows() {
+ Map<String, Object> config = requiredAuthConfig();
+
+ Assertions.assertThrows(
+ OptionValidationException.class,
+ () ->
+ ConfigValidator.of(ReadonlyConfig.fromMap(config))
+ .validate(FACTORY.optionRule()));
+ }
+
+ @Test
+ void testMissingClientIdThrows() {
+ Map<String, Object> config = requiredAuthConfig();
+ config.remove("client_id");
+ config.put("object_name", "Account");
+
+ Assertions.assertThrows(
+ OptionValidationException.class,
+ () ->
+ ConfigValidator.of(ReadonlyConfig.fromMap(config))
+ .validate(FACTORY.optionRule()));
+ }
+
+ @Test
+ void testMissingInstanceUrlThrows() {
+ Map<String, Object> config = requiredAuthConfig();
+ config.remove("instance_url");
+ config.put("object_name", "Account");
+
+ Assertions.assertThrows(
+ OptionValidationException.class,
+ () ->
+ ConfigValidator.of(ReadonlyConfig.fromMap(config))
+ .validate(FACTORY.optionRule()));
+ }
+
+ @Test
+ void testMissingUsernameThrows() {
+ Map<String, Object> config = requiredAuthConfig();
+ config.remove("username");
+ config.put("object_name", "Account");
+
+ Assertions.assertThrows(
+ OptionValidationException.class,
+ () ->
+ ConfigValidator.of(ReadonlyConfig.fromMap(config))
+ .validate(FACTORY.optionRule()));
+ }
+
+ private static Map<String, Object> requiredAuthConfig() {
+ Map<String, Object> config = new HashMap<>();
+ config.put("client_id", "test_client_id");
+ config.put("client_secret", "test_client_secret");
+ config.put("username", "[email protected]");
+ config.put("password", "test_password");
+ config.put("instance_url", "https://test.salesforce.com");
+ return config;
+ }
+
+ private static List<Map<String, Object>> multiObjectTablesConfigs() {
+ Map<String, Object> account = new HashMap<>();
+ account.put("table_path", "salesforce.Account");
+
+ Map<String, Object> contact = new HashMap<>();
+ contact.put("table_path", "salesforce.Contact");
+ contact.put("filter", "IsDeleted = false");
+
+ return Arrays.asList(account, contact);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-salesforce/src/test/java/org/apache/seatunnel/connectors/seatunnel/salesforce/client/SalesforceClientTest.java
b/seatunnel-connectors-v2/connector-salesforce/src/test/java/org/apache/seatunnel/connectors/seatunnel/salesforce/client/SalesforceClientTest.java
new file mode 100644
index 0000000000..630cae17c0
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-salesforce/src/test/java/org/apache/seatunnel/connectors/seatunnel/salesforce/client/SalesforceClientTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.salesforce.client;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
+import
org.apache.seatunnel.connectors.seatunnel.salesforce.config.SalesforceParameters;
+import
org.apache.seatunnel.connectors.seatunnel.salesforce.exception.SalesforceConnectorException;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpServer;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+class SalesforceClientTest {
+
+ private HttpServer server;
+ private String baseUrl;
+
+ @BeforeEach
+ void setUp() throws IOException {
+ server = HttpServer.create(new InetSocketAddress("127.0.0.1", 0), 0);
+ server.start();
+ baseUrl = "http://127.0.0.1:" + server.getAddress().getPort();
+ }
+
+ @AfterEach
+ void tearDown() {
+ if (server != null) {
+ server.stop(0);
+ }
+ }
+
+ @Test
+ void authenticateThenDescribeStoresAccessTokenAndInstanceUrl() throws
IOException {
+ server.createContext(
+ "/services/oauth2/token",
+ exchange -> respondJson(exchange, 200, tokenResponse(baseUrl,
"tok-1")));
+ server.createContext(
+ "/services/data/v59.0/sobjects/Account/describe",
+ exchange -> respondJson(exchange, 200, describeJson("Id",
"id")));
+
+ try (SalesforceClient client = new
SalesforceClient(buildParams(baseUrl))) {
+ client.authenticate();
+ CatalogTable table = client.describeObject("salesforce",
"Account");
+ SeaTunnelRowType rowType = table.getSeaTunnelRowType();
+ Assertions.assertEquals(1, rowType.getTotalFields());
+ Assertions.assertEquals("Id", rowType.getFieldName(0));
+ }
+ }
+
+ @Test
+ void authenticateFailureRaisesSalesforceException() {
+ server.createContext(
+ "/services/oauth2/token",
+ exchange -> respondJson(exchange, 401,
"{\"error\":\"invalid_grant\"}"));
+
+ SalesforceClient client = new SalesforceClient(buildParams(baseUrl));
+ SalesforceConnectorException ex =
+ Assertions.assertThrows(SalesforceConnectorException.class,
client::authenticate);
+ Assertions.assertTrue(ex.getMessage().contains("401"));
+ }
+
+ @Test
+ void describeObjectMapsSalesforceTypesToSeaTunnel() throws IOException {
+ server.createContext(
+ "/services/oauth2/token",
+ exchange -> respondJson(exchange, 200, tokenResponse(baseUrl,
"tok-1")));
+ server.createContext(
+ "/services/data/v59.0/sobjects/Opportunity/describe",
+ exchange -> {
+ String body =
+ "{\"fields\":["
+ + "{\"name\":\"Id\",\"type\":\"id\"},"
+ +
"{\"name\":\"IsClosed\",\"type\":\"boolean\"},"
+ +
"{\"name\":\"Amount\",\"type\":\"double\"},"
+ +
"{\"name\":\"CreatedDate\",\"type\":\"datetime\"},"
+ +
"{\"name\":\"CloseDate\",\"type\":\"date\"}"
+ + "]}";
+ respondJson(exchange, 200, body);
+ });
+
+ try (SalesforceClient client = new
SalesforceClient(buildParams(baseUrl))) {
+ client.authenticate();
+ CatalogTable table = client.describeObject("salesforce",
"Opportunity");
+ SeaTunnelRowType rowType = table.getSeaTunnelRowType();
+ Assertions.assertEquals(5, rowType.getTotalFields());
+ Assertions.assertEquals(SqlType.STRING,
rowType.getFieldType(0).getSqlType());
+ Assertions.assertEquals(SqlType.BOOLEAN,
rowType.getFieldType(1).getSqlType());
+ Assertions.assertEquals(SqlType.DOUBLE,
rowType.getFieldType(2).getSqlType());
+ Assertions.assertEquals(SqlType.TIMESTAMP,
rowType.getFieldType(3).getSqlType());
+ Assertions.assertEquals(SqlType.DATE,
rowType.getFieldType(4).getSqlType());
+ }
+ }
+
+ @Test
+ void executeBulkQueryReturnsParsedRows() throws IOException {
+ registerBulkJobHandlers("job-1", "JobComplete");
+ server.createContext(
+ "/services/data/v59.0/jobs/query/job-1/results",
+ exchange -> respondCsv(exchange, 200,
"Id,Name\n001,Acme\n002,Globex\n", null));
+
+ try (SalesforceClient client = new
SalesforceClient(buildParams(baseUrl))) {
+ client.authenticate();
+ List<Object[]> rows = new ArrayList<>();
+ client.executeBulkQuery("SELECT Id, Name FROM Account", 2,
rows::add);
+ Assertions.assertEquals(2, rows.size());
+ Assertions.assertArrayEquals(new Object[] {"001", "Acme"},
rows.get(0));
+ Assertions.assertArrayEquals(new Object[] {"002", "Globex"},
rows.get(1));
+ }
+ }
+
+ @Test
+ void executeBulkQueryFollowsSforceLocatorPagination() throws IOException {
+ registerBulkJobHandlers("job-1", "JobComplete");
+ AtomicInteger resultsHits = new AtomicInteger();
+ server.createContext(
+ "/services/data/v59.0/jobs/query/job-1/results",
+ exchange -> {
+ int hit = resultsHits.incrementAndGet();
+ if (hit == 1) {
+ respondCsv(exchange, 200, "Id\n001\n", "locator-2");
+ } else {
+ respondCsv(exchange, 200, "Id\n002\n", null);
+ }
+ });
+
+ try (SalesforceClient client = new
SalesforceClient(buildParams(baseUrl))) {
+ client.authenticate();
+ List<Object[]> rows = new ArrayList<>();
+ client.executeBulkQuery("SELECT Id FROM Account", 1, rows::add);
+ Assertions.assertEquals(2, rows.size());
+ Assertions.assertEquals(2, resultsHits.get());
+ }
+ }
+
+ @Test
+ void executeBulkQueryThrowsWhenJobFails() throws IOException {
+ registerBulkJobHandlers("job-1", "Failed");
+
+ try (SalesforceClient client = new
SalesforceClient(buildParams(baseUrl))) {
+ client.authenticate();
+ Assertions.assertThrows(
+ SalesforceConnectorException.class,
+ () -> client.executeBulkQuery("SELECT Id FROM Account", 1,
row -> {}));
+ }
+ }
+
+ @Test
+ void executeBulkQueryTimesOutWhenJobNeverCompletes() throws IOException {
+ registerBulkJobHandlers("job-1", "InProgress");
+
+ try (SalesforceClient client = new
SalesforceClient(buildParams(baseUrl))) {
+ client.authenticate();
+ Assertions.assertThrows(
+ SalesforceConnectorException.class,
+ () -> client.executeBulkQuery("SELECT Id FROM Account", 1,
row -> {}));
+ }
+ }
+
+ private void registerBulkJobHandlers(String jobId, String terminalState) {
+ server.createContext(
+ "/services/oauth2/token",
+ exchange -> respondJson(exchange, 200, tokenResponse(baseUrl,
"tok-1")));
+ server.createContext(
+ "/services/data/v59.0/jobs/query",
+ exchange -> respondJson(exchange, 200, "{\"id\":\"" + jobId +
"\"}"));
+ server.createContext(
+ "/services/data/v59.0/jobs/query/" + jobId,
+ exchange -> respondJson(exchange, 200, "{\"state\":\"" +
terminalState + "\"}"));
+ }
+
+ private static String tokenResponse(String instanceUrl, String token) {
+ return "{\"access_token\":\"" + token + "\",\"instance_url\":\"" +
instanceUrl + "\"}";
+ }
+
+ private static String describeJson(String fieldName, String fieldType) {
+ return "{\"fields\":[{\"name\":\"" + fieldName + "\",\"type\":\"" +
fieldType + "\"}]}";
+ }
+
+ private static void respondJson(HttpExchange exchange, int status, String
body)
+ throws IOException {
+ byte[] bytes = body.getBytes(StandardCharsets.UTF_8);
+ exchange.getResponseHeaders().add("Content-Type", "application/json");
+ exchange.sendResponseHeaders(status, bytes.length);
+ try (OutputStream out = exchange.getResponseBody()) {
+ out.write(bytes);
+ }
+ }
+
+ private static void respondCsv(HttpExchange exchange, int status, String
body, String locator)
+ throws IOException {
+ byte[] bytes = body.getBytes(StandardCharsets.UTF_8);
+ exchange.getResponseHeaders().add("Content-Type", "text/csv");
+ exchange.getResponseHeaders().add("Sforce-Locator", locator == null ?
"null" : locator);
+ exchange.sendResponseHeaders(status, bytes.length);
+ try (OutputStream out = exchange.getResponseBody()) {
+ out.write(bytes);
+ }
+ }
+
+ private static SalesforceParameters buildParams(String instanceUrl) {
+ Map<String, Object> map = new HashMap<>();
+ map.put("client_id", "cid");
+ map.put("client_secret", "csec");
+ map.put("username", "[email protected]");
+ map.put("password", "pwd");
+ map.put("instance_url", instanceUrl);
+ map.put("poll_interval_ms", 10L);
+ map.put("job_completion_timeout_ms", 500L);
+ SalesforceParameters p = new SalesforceParameters();
+ p.buildWithConfig(ReadonlyConfig.fromMap(map));
+ return p;
+ }
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index 47fa67dea9..6e0266fa6d 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -63,6 +63,7 @@
<module>connector-amazondynamodb</module>
<module>connector-tablestore</module>
<module>connector-cassandra</module>
+ <module>connector-salesforce</module>
<module>connector-s3-redshift</module>
<module>connector-starrocks</module>
<module>connector-google-sheets</module>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index 524805ac6f..ec8fd7a951 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -454,6 +454,12 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-salesforce</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-file-s3</artifactId>