This is an automated email from the ASF dual-hosted git repository.

davidzollo pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 93039e90fc [Feature][Connector] Add Salesforce Source Connector 
(#10938)
93039e90fc is described below

commit 93039e90fc2a46250edf0f5f690e84e7651738ba
Author: NixonWahome <[email protected]>
AuthorDate: Tue Jun 9 12:10:40 2026 +0300

    [Feature][Connector] Add Salesforce Source Connector (#10938)
---
 .github/workflows/labeler/label-scope-conf.yml     |   5 +
 config/plugin_config                               |   1 +
 docs/en/connector-v2/source/Salesforce.md          | 116 +++++++
 docs/zh/connector-v2/source/Salesforce.md          | 115 +++++++
 plugin-mapping.properties                          |   1 +
 .../connector-salesforce/pom.xml                   |  59 ++++
 .../salesforce/client/SalesforceClient.java        | 342 +++++++++++++++++++++
 .../salesforce/config/SalesforceParameters.java    |  52 ++++
 .../salesforce/config/SalesforceSourceOptions.java | 113 +++++++
 .../salesforce/config/SalesforceTableConfig.java   |  42 +++
 .../exception/SalesforceConnectorErrorCode.java    |  48 +++
 .../exception/SalesforceConnectorException.java    |  37 +++
 .../salesforce/source/SalesforceSource.java        | 162 ++++++++++
 .../salesforce/source/SalesforceSourceFactory.java |  78 +++++
 .../salesforce/source/SalesforceSourceReader.java  | 144 +++++++++
 .../salesforce/SalesforceSourceFactoryTest.java    | 167 ++++++++++
 .../salesforce/client/SalesforceClientTest.java    | 245 +++++++++++++++
 seatunnel-connectors-v2/pom.xml                    |   1 +
 seatunnel-dist/pom.xml                             |   6 +
 19 files changed, 1734 insertions(+)

diff --git a/.github/workflows/labeler/label-scope-conf.yml 
b/.github/workflows/labeler/label-scope-conf.yml
index bd0e87633a..d8e386907b 100644
--- a/.github/workflows/labeler/label-scope-conf.yml
+++ b/.github/workflows/labeler/label-scope-conf.yml
@@ -83,6 +83,11 @@ cassandra:
       - changed-files:
           - any-glob-to-any-file: 
seatunnel-connectors-v2/connector-cassandra/**
           - all-globs-to-all-files: 
'!seatunnel-connectors-v2/connector-!(cassandra)/**'
+salesforce:
+  - all:
+      - changed-files:
+          - any-glob-to-any-file: 
seatunnel-connectors-v2/connector-salesforce/**
+          - all-globs-to-all-files: 
'!seatunnel-connectors-v2/connector-!(salesforce)/**'
 cdc:
   - all:
       - changed-files:
diff --git a/config/plugin_config b/config/plugin_config
index 88402cbf45..fc6bf5c7c2 100644
--- a/config/plugin_config
+++ b/config/plugin_config
@@ -23,6 +23,7 @@
 connector-amazondynamodb
 connector-assert
 connector-cassandra
+connector-salesforce
 connector-cdc-mysql
 connector-cdc-mongodb
 connector-cdc-sqlserver
diff --git a/docs/en/connector-v2/source/Salesforce.md 
b/docs/en/connector-v2/source/Salesforce.md
new file mode 100644
index 0000000000..ab87e5abcb
--- /dev/null
+++ b/docs/en/connector-v2/source/Salesforce.md
@@ -0,0 +1,116 @@
+# Salesforce
+
+> Salesforce source connector
+
+## Support Those Engines
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+## Key Features
+
+- [x] [batch](../../introduction/concepts/connector-v2-features.md)
+- [ ] [stream](../../introduction/concepts/connector-v2-features.md)
+- [ ] [exactly-once](../../introduction/concepts/connector-v2-features.md)
+- [x] [column projection](../../introduction/concepts/connector-v2-features.md)
+- [ ] [parallelism](../../introduction/concepts/connector-v2-features.md)
+- [x] [support multiple table 
read](../../introduction/concepts/connector-v2-features.md)
+
+## Description
+
+Reads data from Salesforce objects using the Salesforce Bulk API 2.0. Supports
+single-object and multi-object (tables_configs) batch ingestion.
+
+Authentication uses the OAuth 2.0 Username-Password flow.
+
+## Supported DataSource Info
+
+| Datasource | Supported Versions |
+|------------|-------------------|
+| Salesforce | REST API v50.0+   |
+
+## Source Options
+
+| Name               | Type    | Required | Default | Description              
                                                                     |
+|--------------------|---------|----------|---------|-----------------------------------------------------------------------------------------------|
+| client_id          | String  | Yes      | -       | Salesforce Connected App 
client ID (consumer key).                                            |
+| client_secret      | String  | Yes      | -       | Salesforce Connected App 
client secret.                                                       |
+| username           | String  | Yes      | -       | Salesforce username.     
                                                                     |
+| password           | String  | Yes      | -       | Salesforce password.     
                                                                     |
+| security_token     | String  | No       | ""      | Salesforce security 
token appended to the password. Leave empty if your org IP is trusted.   |
+| instance_url       | String  | Yes      | -       | Salesforce instance URL, 
e.g. `https://yourorg.salesforce.com`.                               |
+| api_version        | String  | No       | v59.0   | Salesforce REST API 
version.                                                                  |
+| object_name        | String  | No*      | -       | Salesforce object for 
single-object mode, e.g. `Account`. Exclusive with `tables_configs`.   |
+| tables_configs     | List    | No*      | -       | Multi-object 
configuration list. Each entry requires `table_path`. Exclusive with 
`object_name`. |
+| filter             | String  | No       | -       | SOQL WHERE clause 
appended to the auto-built `SELECT FIELDS(ALL) FROM <object>` query.       |
+| request_timeout_ms | Integer | No       | 60000   | HTTP request timeout in 
milliseconds.                                                         |
+| poll_interval_ms   | Long    | No       | 5000    | Interval between Bulk 
API job status polls (ms).                                              |
+| job_completion_timeout_ms | Long | No   | 3600000 | Maximum time (ms) to 
wait for a Bulk API job to reach a terminal state. Default 60 minutes. |
+
+\* Exactly one of `object_name` or `tables_configs` must be provided.
+
+The connector always issues `SELECT FIELDS(ALL) FROM <object> [WHERE <filter>]`
+so the emitted rows stay positionally aligned with the schema produced from
+`/describe`. Custom projection-style queries are intentionally out of scope for
+this first version.
+
+### tables_configs entry options
+
+| Name        | Type   | Required | Description                                
                       |
+|-------------|--------|----------|-------------------------------------------------------------------|
+| table_path  | String | Yes      | Format: `database.ObjectName`, e.g. 
`salesforce.Account`.        |
+| filter      | String | No       | SOQL WHERE clause for this object.         
                       |
+
+## Example
+
+### Single-object
+
+```hocon
+source {
+  Salesforce {
+    client_id     = "your_client_id"
+    client_secret = "your_client_secret"
+    username      = "[email protected]"
+    password      = "yourpassword"
+    instance_url  = "https://yourorg.salesforce.com";
+
+    object_name = "Account"
+    filter      = "AnnualRevenue > 1000000"
+  }
+}
+```
+
+### Multi-object
+
+```hocon
+source {
+  Salesforce {
+    client_id     = "your_client_id"
+    client_secret = "your_client_secret"
+    username      = "[email protected]"
+    password      = "yourpassword"
+    instance_url  = "https://yourorg.salesforce.com";
+
+    tables_configs = [
+      {
+        table_path = "salesforce.Account"
+        filter     = "AnnualRevenue > 1000000"
+      },
+      {
+        table_path = "salesforce.Contact"
+        filter     = "IsDeleted = false"
+      },
+      {
+        table_path = "salesforce.Opportunity"
+      }
+    ]
+  }
+}
+```
+
+## Changelog
+
+### next version
+
+- Add Salesforce source connector with Bulk API 2.0 and multi-object support
diff --git a/docs/zh/connector-v2/source/Salesforce.md 
b/docs/zh/connector-v2/source/Salesforce.md
new file mode 100644
index 0000000000..ac6160dce3
--- /dev/null
+++ b/docs/zh/connector-v2/source/Salesforce.md
@@ -0,0 +1,115 @@
+# Salesforce
+
+> Salesforce 源连接器
+
+## 支持的引擎
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+## 关键特性
+
+- [x] [批处理](../../introduction/concepts/connector-v2-features.md)
+- [ ] [流处理](../../introduction/concepts/connector-v2-features.md)
+- [ ] [精确一次](../../introduction/concepts/connector-v2-features.md)
+- [x] [列投影](../../introduction/concepts/connector-v2-features.md)
+- [ ] [并行度](../../introduction/concepts/connector-v2-features.md)
+- [x] [支持多表读取](../../introduction/concepts/connector-v2-features.md)
+
+## 描述
+
+使用 Salesforce Bulk API 2.0 从 Salesforce 对象读取数据。支持单对象模式以及通过
+`tables_configs` 进行多对象批量读取。
+
+身份验证使用 OAuth 2.0 用户名-密码模式。
+
+## 支持的数据源信息
+
+| 数据源     | 支持的版本        |
+|------------|------------------|
+| Salesforce | REST API v50.0+  |
+
+## 源选项
+
+| 名称               | 类型    | 必填  | 默认值  | 描述                                   
                                                       |
+|--------------------|---------|-------|---------|-----------------------------------------------------------------------------------------------|
+| client_id          | String  | 是    | -       | Salesforce Connected App 
的客户端 ID (consumer key)。                                          |
+| client_secret      | String  | 是    | -       | Salesforce Connected App 
的客户端密钥 (consumer secret)。                                       |
+| username           | String  | 是    | -       | Salesforce 用户名。              
                                                              |
+| password           | String  | 是    | -       | Salesforce 密码。               
                                                               |
+| security_token     | String  | 否    | ""      | 附加到密码后用于身份验证的安全令牌。如果组织 IP 
受信任则可留空。                                |
+| instance_url       | String  | 是    | -       | Salesforce 实例 URL,例如 
`https://yourorg.salesforce.com`。                                   |
+| api_version        | String  | 否    | v59.0   | Salesforce REST API 版本。      
                                                              |
+| object_name        | String  | 否\*  | -       | 单对象模式下要读取的 Salesforce 对象,例如 
`Account`。与 `tables_configs` 互斥。                |
+| tables_configs     | List    | 否\*  | -       | 多对象配置列表。每一项必须提供 
`table_path`。与 `object_name` 互斥。                            |
+| filter             | String  | 否    | -       | 附加到自动构造的 `SELECT FIELDS(ALL) 
FROM <object>` 查询上的 SOQL WHERE 条件。                 |
+| request_timeout_ms | Integer | 否    | 60000   | HTTP 请求超时时间(毫秒)。             
                                                       |
+| poll_interval_ms   | Long    | 否    | 5000    | Bulk API 作业状态轮询间隔(毫秒)。       
                                                     |
+| job_completion_timeout_ms | Long | 否  | 3600000 | 等待 Bulk API 
作业达到终止状态的最长时间(毫秒)。默认 60 分钟。                                |
+
+\* `object_name` 与 `tables_configs` 必须且只能提供其中一个。
+
+连接器始终发起 `SELECT FIELDS(ALL) FROM <object> [WHERE <filter>]` 查询,以保证
+发出的行与基于 `/describe` 构造的 schema 在位置上保持一致。自定义投影式查询暂不
+属于此版本的支持范围。
+
+### tables_configs 项选项
+
+| 名称        | 类型   | 必填 | 描述                                                   
             |
+|-------------|--------|------|---------------------------------------------------------------------|
+| table_path  | String | 是   | 格式为 `database.ObjectName`,例如 
`salesforce.Account`。            |
+| filter      | String | 否   | 针对该对象的 SOQL WHERE 条件。                           
            |
+
+## 示例
+
+### 单对象
+
+```hocon
+source {
+  Salesforce {
+    client_id     = "your_client_id"
+    client_secret = "your_client_secret"
+    username      = "[email protected]"
+    password      = "yourpassword"
+    instance_url  = "https://yourorg.salesforce.com";
+
+    object_name = "Account"
+    filter      = "AnnualRevenue > 1000000"
+  }
+}
+```
+
+### 多对象
+
+```hocon
+source {
+  Salesforce {
+    client_id     = "your_client_id"
+    client_secret = "your_client_secret"
+    username      = "[email protected]"
+    password      = "yourpassword"
+    instance_url  = "https://yourorg.salesforce.com";
+
+    tables_configs = [
+      {
+        table_path = "salesforce.Account"
+        filter     = "AnnualRevenue > 1000000"
+      },
+      {
+        table_path = "salesforce.Contact"
+        filter     = "IsDeleted = false"
+      },
+      {
+        table_path = "salesforce.Opportunity"
+      }
+    ]
+  }
+}
+```
+
+## 变更日志
+
+### 下一个版本
+
+- 新增 Salesforce 源连接器,基于 Bulk API 2.0,支持多对象读取
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index fe181d223c..800f50b7be 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -84,6 +84,7 @@ seatunnel.source.AmazonDynamodb = connector-amazondynamodb
 seatunnel.sink.AmazonDynamodb = connector-amazondynamodb
 seatunnel.source.Cassandra = connector-cassandra
 seatunnel.sink.Cassandra = connector-cassandra
+seatunnel.source.Salesforce = connector-salesforce
 seatunnel.sink.StarRocks = connector-starrocks
 seatunnel.source.MyHours = connector-http-myhours
 seatunnel.sink.InfluxDB = connector-influxdb
diff --git a/seatunnel-connectors-v2/connector-salesforce/pom.xml 
b/seatunnel-connectors-v2/connector-salesforce/pom.xml
new file mode 100644
index 0000000000..854e733ca0
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-salesforce/pom.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connectors-v2</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-salesforce</artifactId>
+    <name>SeaTunnel : Connectors V2 : Salesforce</name>
+
+    <properties>
+        <httpclient.version>4.5.13</httpclient.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>${httpclient.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-csv</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git 
a/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/client/SalesforceClient.java
 
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/client/SalesforceClient.java
new file mode 100644
index 0000000000..03aa7c5509
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/client/SalesforceClient.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.salesforce.client;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import 
org.apache.seatunnel.connectors.seatunnel.salesforce.config.SalesforceParameters;
+import 
org.apache.seatunnel.connectors.seatunnel.salesforce.exception.SalesforceConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.salesforce.exception.SalesforceConnectorException;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+import org.apache.http.Header;
+import org.apache.http.HttpHeaders;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.StringReader;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+
+@Slf4j
+public class SalesforceClient implements Closeable {
+
+    private static final String TOKEN_PATH = "/services/oauth2/token";
+    private static final String JOBS_QUERY_PATH = 
"/services/data/%s/jobs/query";
+    private static final String JOB_PATH = "/services/data/%s/jobs/query/%s";
+    private static final String JOB_RESULTS_PATH = 
"/services/data/%s/jobs/query/%s/results";
+    private static final String DESCRIBE_PATH = 
"/services/data/%s/sobjects/%s/describe";
+    private static final String PLUGIN_NAME = "Salesforce";
+
+    private final SalesforceParameters params;
+    private final CloseableHttpClient httpClient;
+    private final ObjectMapper objectMapper = new ObjectMapper();
+    private String accessToken;
+    private String authorizedInstanceUrl;
+
+    public SalesforceClient(SalesforceParameters params) {
+        this.params = params;
+        RequestConfig requestConfig =
+                RequestConfig.custom()
+                        .setConnectTimeout(params.getRequestTimeoutMs())
+                        .setSocketTimeout(params.getRequestTimeoutMs())
+                        .build();
+        this.httpClient = 
HttpClients.custom().setDefaultRequestConfig(requestConfig).build();
+    }
+
+    public void authenticate() {
+        String tokenUrl = params.getInstanceUrl() + TOKEN_PATH;
+        HttpPost post = new HttpPost(tokenUrl);
+
+        List<NameValuePair> form = new ArrayList<>();
+        form.add(new BasicNameValuePair("grant_type", "password"));
+        form.add(new BasicNameValuePair("client_id", params.getClientId()));
+        form.add(new BasicNameValuePair("client_secret", 
params.getClientSecret()));
+        form.add(new BasicNameValuePair("username", params.getUsername()));
+        form.add(
+                new BasicNameValuePair(
+                        "password", params.getPassword() + 
params.getSecurityToken()));
+
+        try {
+            post.setEntity(new UrlEncodedFormEntity(form, 
StandardCharsets.UTF_8));
+            try (CloseableHttpResponse response = httpClient.execute(post)) {
+                int status = response.getStatusLine().getStatusCode();
+                String body = EntityUtils.toString(response.getEntity(), 
StandardCharsets.UTF_8);
+                if (status != 200) {
+                    throw new SalesforceConnectorException(
+                            SalesforceConnectorErrorCode.AUTH_FAILED,
+                            "HTTP " + status + ": " + body);
+                }
+                JsonNode json = objectMapper.readTree(body);
+                this.accessToken = json.get("access_token").asText();
+                this.authorizedInstanceUrl = json.get("instance_url").asText();
+                log.info("Authenticated with Salesforce instance {}", 
authorizedInstanceUrl);
+            }
+        } catch (SalesforceConnectorException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new 
SalesforceConnectorException(SalesforceConnectorErrorCode.AUTH_FAILED, e);
+        }
+    }
+
+    public CatalogTable describeObject(String database, String objectName) {
+        String url =
+                authorizedInstanceUrl
+                        + String.format(DESCRIBE_PATH, params.getApiVersion(), 
objectName);
+        HttpGet get = new HttpGet(url);
+        get.setHeader(HttpHeaders.AUTHORIZATION, "Bearer " + accessToken);
+
+        try (CloseableHttpResponse response = httpClient.execute(get)) {
+            int status = response.getStatusLine().getStatusCode();
+            String body = EntityUtils.toString(response.getEntity(), 
StandardCharsets.UTF_8);
+            if (status != 200) {
+                throw new SalesforceConnectorException(
+                        SalesforceConnectorErrorCode.DESCRIBE_OBJECT_FAILED,
+                        "HTTP " + status + " describing " + objectName + ": " 
+ body);
+            }
+            return buildCatalogTable(database, objectName, 
objectMapper.readTree(body));
+        } catch (SalesforceConnectorException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new SalesforceConnectorException(
+                    SalesforceConnectorErrorCode.DESCRIBE_OBJECT_FAILED, e);
+        }
+    }
+
+    private CatalogTable buildCatalogTable(String database, String objectName, 
JsonNode describe) {
+        TableSchema.Builder schemaBuilder = TableSchema.builder();
+        for (JsonNode field : describe.get("fields")) {
+            String name = field.get("name").asText();
+            String sfType = field.get("type").asText();
+            SeaTunnelDataType<?> seaType = mapSalesforceType(sfType);
+            schemaBuilder.column(PhysicalColumn.of(name, seaType, null, null, 
true, null, null));
+        }
+        return CatalogTable.of(
+                TableIdentifier.of(PLUGIN_NAME, database, objectName),
+                schemaBuilder.build(),
+                Collections.emptyMap(),
+                Collections.emptyList(),
+                "");
+    }
+
+    private SeaTunnelDataType<?> mapSalesforceType(String sfType) {
+        switch (sfType) {
+            case "boolean":
+                return BasicType.BOOLEAN_TYPE;
+            case "int":
+                return BasicType.INT_TYPE;
+            case "double":
+            case "currency":
+            case "percent":
+                return BasicType.DOUBLE_TYPE;
+            case "date":
+                return LocalTimeType.LOCAL_DATE_TYPE;
+            case "datetime":
+                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+            case "time":
+                return LocalTimeType.LOCAL_TIME_TYPE;
+            case "base64":
+                return PrimitiveByteArrayType.INSTANCE;
+            default:
+                return BasicType.STRING_TYPE;
+        }
+    }
+
+    /**
+     * Runs one Bulk API query end-to-end: creates the job, polls until it 
reaches a terminal state,
+     * then streams the result pages to rowConsumer. Rows are emitted as they 
are parsed rather than
+     * buffered, so callers can forward each row to a downstream collector 
without holding the full
+     * result set in memory.
+     */
+    public void executeBulkQuery(String soql, int columnCount, 
Consumer<Object[]> rowConsumer) {
+        String jobId = createBulkQueryJob(soql);
+        waitForJobCompletion(jobId);
+        downloadResults(jobId, columnCount, rowConsumer);
+    }
+
+    private String createBulkQueryJob(String soql) {
+        String url = authorizedInstanceUrl + String.format(JOBS_QUERY_PATH, 
params.getApiVersion());
+        HttpPost post = new HttpPost(url);
+        post.setHeader(HttpHeaders.AUTHORIZATION, "Bearer " + accessToken);
+        post.setHeader(HttpHeaders.CONTENT_TYPE, 
ContentType.APPLICATION_JSON.getMimeType());
+        post.setHeader(HttpHeaders.ACCEPT, 
ContentType.APPLICATION_JSON.getMimeType());
+
+        try {
+            ObjectNode body = objectMapper.createObjectNode();
+            body.put("operation", "query");
+            body.put("query", soql);
+            post.setEntity(
+                    new StringEntity(
+                            objectMapper.writeValueAsString(body), 
ContentType.APPLICATION_JSON));
+
+            try (CloseableHttpResponse response = httpClient.execute(post)) {
+                int status = response.getStatusLine().getStatusCode();
+                String responseBody =
+                        EntityUtils.toString(response.getEntity(), 
StandardCharsets.UTF_8);
+                if (status != 200) {
+                    throw new SalesforceConnectorException(
+                            
SalesforceConnectorErrorCode.BULK_JOB_CREATE_FAILED,
+                            "HTTP " + status + ": " + responseBody);
+                }
+                String jobId = 
objectMapper.readTree(responseBody).get("id").asText();
+                log.info("Created Bulk API query job {} for SOQL: {}", jobId, 
soql);
+                return jobId;
+            }
+        } catch (SalesforceConnectorException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new SalesforceConnectorException(
+                    SalesforceConnectorErrorCode.BULK_JOB_CREATE_FAILED, e);
+        }
+    }
+
+    private void waitForJobCompletion(String jobId) {
+        String url = authorizedInstanceUrl + String.format(JOB_PATH, 
params.getApiVersion(), jobId);
+        long deadline = System.currentTimeMillis() + 
params.getJobCompletionTimeoutMs();
+        while (true) {
+            try {
+                Thread.sleep(params.getPollIntervalMs());
+                HttpGet get = new HttpGet(url);
+                get.setHeader(HttpHeaders.AUTHORIZATION, "Bearer " + 
accessToken);
+                try (CloseableHttpResponse response = httpClient.execute(get)) 
{
+                    String responseBody =
+                            EntityUtils.toString(response.getEntity(), 
StandardCharsets.UTF_8);
+                    String state = 
objectMapper.readTree(responseBody).get("state").asText();
+                    log.debug("Bulk API job {} state: {}", jobId, state);
+                    if ("JobComplete".equals(state)) {
+                        return;
+                    }
+                    if ("Failed".equals(state) || "Aborted".equals(state)) {
+                        throw new SalesforceConnectorException(
+                                SalesforceConnectorErrorCode.BULK_JOB_FAILED,
+                                "Job " + jobId + " ended with state: " + 
state);
+                    }
+                }
+                if (System.currentTimeMillis() > deadline) {
+                    throw new SalesforceConnectorException(
+                            SalesforceConnectorErrorCode.BULK_JOB_FAILED,
+                            "Job "
+                                    + jobId
+                                    + " did not complete within "
+                                    + params.getJobCompletionTimeoutMs()
+                                    + "ms");
+                }
+            } catch (SalesforceConnectorException e) {
+                throw e;
+            } catch (Exception e) {
+                throw new SalesforceConnectorException(
+                        SalesforceConnectorErrorCode.BULK_JOB_FAILED, e);
+            }
+        }
+    }
+
+    /**
+     * Pulls every result page for a completed Bulk API query job, walking 
forward via the
+     * Sforce-Locator header. Each parsed row is pushed to rowConsumer 
immediately; no whole-job
+     * buffering happens at this layer, only one page's CSV body is held at a 
time.
+     */
+    private void downloadResults(String jobId, int columnCount, 
Consumer<Object[]> rowConsumer) {
+        String url =
+                authorizedInstanceUrl
+                        + String.format(JOB_RESULTS_PATH, 
params.getApiVersion(), jobId);
+        String locator = null;
+
+        do {
+            String requestUrl = locator == null ? url : url + "?locator=" + 
locator;
+            HttpGet get = new HttpGet(requestUrl);
+            get.setHeader(HttpHeaders.AUTHORIZATION, "Bearer " + accessToken);
+            get.setHeader(HttpHeaders.ACCEPT, "text/csv");
+
+            try (CloseableHttpResponse response = httpClient.execute(get)) {
+                int status = response.getStatusLine().getStatusCode();
+                String body = EntityUtils.toString(response.getEntity(), 
StandardCharsets.UTF_8);
+                if (status != 200) {
+                    throw new SalesforceConnectorException(
+                            SalesforceConnectorErrorCode.BULK_RESULTS_FAILED,
+                            "HTTP " + status + ": " + body);
+                }
+                Header locatorHeader = 
response.getFirstHeader("Sforce-Locator");
+                locator =
+                        (locatorHeader != null && 
!"null".equals(locatorHeader.getValue()))
+                                ? locatorHeader.getValue()
+                                : null;
+                parseCsvInto(body, columnCount, rowConsumer);
+            } catch (SalesforceConnectorException e) {
+                throw e;
+            } catch (Exception e) {
+                throw new SalesforceConnectorException(
+                        SalesforceConnectorErrorCode.BULK_RESULTS_FAILED, e);
+            }
+        } while (locator != null);
+    }
+
+    /**
+     * Parses one Bulk API CSV result page (header row plus data rows) and 
emits each data row to
+     * rowConsumer as a fixed-width Object[]. Missing trailing columns are 
filled with null and
+     * empty cells also become null, so the type-converting reader can 
distinguish them from real
+     * string values.
+     */
+    private void parseCsvInto(String csv, int columnCount, Consumer<Object[]> 
rowConsumer)
+            throws IOException {
+        try (CSVParser parser =
+                CSVFormat.DEFAULT.withFirstRecordAsHeader().parse(new 
StringReader(csv))) {
+            for (CSVRecord record : parser) {
+                Object[] row = new Object[columnCount];
+                for (int i = 0; i < columnCount; i++) {
+                    String val = i < record.size() ? record.get(i) : null;
+                    row[i] = (val == null || val.isEmpty()) ? null : val;
+                }
+                rowConsumer.accept(row);
+            }
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        httpClient.close();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/config/SalesforceParameters.java
 
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/config/SalesforceParameters.java
new file mode 100644
index 0000000000..c452936823
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/config/SalesforceParameters.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.salesforce.config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import lombok.Getter;
+
+import java.io.Serializable;
+
+@Getter
+public class SalesforceParameters implements Serializable {
+
+    private String clientId;
+    private String clientSecret;
+    private String username;
+    private String password;
+    private String securityToken;
+    private String instanceUrl;
+    private String apiVersion;
+    private int requestTimeoutMs;
+    private long pollIntervalMs;
+    private long jobCompletionTimeoutMs;
+
+    public void buildWithConfig(ReadonlyConfig config) {
+        this.clientId = config.get(SalesforceSourceOptions.CLIENT_ID);
+        this.clientSecret = config.get(SalesforceSourceOptions.CLIENT_SECRET);
+        this.username = config.get(SalesforceSourceOptions.USERNAME);
+        this.password = config.get(SalesforceSourceOptions.PASSWORD);
+        this.securityToken = 
config.get(SalesforceSourceOptions.SECURITY_TOKEN);
+        this.instanceUrl = config.get(SalesforceSourceOptions.INSTANCE_URL);
+        this.apiVersion = config.get(SalesforceSourceOptions.API_VERSION);
+        this.requestTimeoutMs = 
config.get(SalesforceSourceOptions.REQUEST_TIMEOUT_MS);
+        this.pollIntervalMs = 
config.get(SalesforceSourceOptions.POLL_INTERVAL_MS);
+        this.jobCompletionTimeoutMs = 
config.get(SalesforceSourceOptions.JOB_COMPLETION_TIMEOUT_MS);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/config/SalesforceSourceOptions.java
 
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/config/SalesforceSourceOptions.java
new file mode 100644
index 0000000000..614de8bd50
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/config/SalesforceSourceOptions.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.salesforce.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class SalesforceSourceOptions {
+
+    public static final Option<String> CLIENT_ID =
+            Options.key("client_id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Salesforce Connected App client ID 
(consumer key).");
+
+    public static final Option<String> CLIENT_SECRET =
+            Options.key("client_secret")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Salesforce Connected App client secret 
(consumer secret).");
+
+    public static final Option<String> USERNAME =
+            Options.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Salesforce username.");
+
+    public static final Option<String> PASSWORD =
+            Options.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Salesforce password.");
+
+    public static final Option<String> SECURITY_TOKEN =
+            Options.key("security_token")
+                    .stringType()
+                    .defaultValue("")
+                    .withDescription(
+                            "Salesforce security token appended to the 
password during "
+                                    + "authentication. Leave empty if your org 
IP is trusted.");
+
+    public static final Option<String> INSTANCE_URL =
+            Options.key("instance_url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Salesforce instance URL, e.g. 
https://yourorg.salesforce.com.";);
+
+    public static final Option<String> API_VERSION =
+            Options.key("api_version")
+                    .stringType()
+                    .defaultValue("v59.0")
+                    .withDescription("Salesforce REST API version, e.g. 
v59.0.");
+
+    public static final Option<String> OBJECT_NAME =
+            Options.key("object_name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Salesforce object name for single-object mode, 
e.g. Account. "
+                                    + "Mutually exclusive with 
tables_configs.");
+
+    public static final Option<String> FILTER =
+            Options.key("filter")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "SOQL WHERE clause appended to the auto-built "
+                                    + "SELECT FIELDS(ALL) FROM <object> 
query.");
+
+    public static final Option<Integer> REQUEST_TIMEOUT_MS =
+            Options.key("request_timeout_ms")
+                    .intType()
+                    .defaultValue(60000)
+                    .withDescription("HTTP request timeout in milliseconds.");
+
+    public static final Option<Long> POLL_INTERVAL_MS =
+            Options.key("poll_interval_ms")
+                    .longType()
+                    .defaultValue(5000L)
+                    .withDescription("Interval in milliseconds between Bulk 
API job status polls.");
+
+    public static final Option<Long> JOB_COMPLETION_TIMEOUT_MS =
+            Options.key("job_completion_timeout_ms")
+                    .longType()
+                    .defaultValue(3_600_000L)
+                    .withDescription(
+                            "Maximum time in milliseconds to wait for a Bulk 
API job to reach a "
+                                    + "terminal state. Default 3600000 (60 
minutes).");
+
+    public static final Option<String> TABLE_PATH =
+            Options.key("table_path")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Salesforce object path in 'database.ObjectName' 
format, "
+                                    + "e.g. salesforce.Account. Used in 
tables_configs entries.");
+}
diff --git 
a/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/config/SalesforceTableConfig.java
 
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/config/SalesforceTableConfig.java
new file mode 100644
index 0000000000..f088014821
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/config/SalesforceTableConfig.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.salesforce.config;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+
+import lombok.Getter;
+
+import java.io.Serializable;
+
+@Getter
+public class SalesforceTableConfig implements Serializable {
+
+    private final String soql;
+    private final String objectName;
+    private final CatalogTable catalogTable;
+
+    public SalesforceTableConfig(String soql, String objectName, CatalogTable 
catalogTable) {
+        this.soql = soql;
+        this.objectName = objectName;
+        this.catalogTable = catalogTable;
+    }
+
+    public String getTableId() {
+        return catalogTable.getTableId().toTablePath().toString();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/exception/SalesforceConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/exception/SalesforceConnectorErrorCode.java
new file mode 100644
index 0000000000..0ee618b4c0
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/exception/SalesforceConnectorErrorCode.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.salesforce.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum SalesforceConnectorErrorCode implements SeaTunnelErrorCode {
+    AUTH_FAILED("SALESFORCE-01", "Failed to authenticate with Salesforce"),
+    DESCRIBE_OBJECT_FAILED("SALESFORCE-02", "Failed to describe Salesforce 
object schema"),
+    BULK_JOB_CREATE_FAILED("SALESFORCE-03", "Failed to create Salesforce Bulk 
API query job"),
+    BULK_JOB_FAILED("SALESFORCE-04", "Salesforce Bulk API query job failed or 
was aborted"),
+    BULK_RESULTS_FAILED("SALESFORCE-05", "Failed to retrieve Salesforce Bulk 
API job results"),
+    INVALID_TABLE_PATH("SALESFORCE-06", "Invalid table_path; expected format: 
database.ObjectName"),
+    DUPLICATE_OBJECT("SALESFORCE-07", "Duplicate object found in 
tables_configs");
+
+    private final String code;
+    private final String description;
+
+    SalesforceConnectorErrorCode(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+    @Override
+    public String getCode() {
+        return code;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/exception/SalesforceConnectorException.java
 
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/exception/SalesforceConnectorException.java
new file mode 100644
index 0000000000..edc0e349e6
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/exception/SalesforceConnectorException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.salesforce.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class SalesforceConnectorException extends SeaTunnelRuntimeException {
+    public SalesforceConnectorException(
+            SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
+        super(seaTunnelErrorCode, errorMessage);
+    }
+
+    public SalesforceConnectorException(
+            SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, 
Throwable cause) {
+        super(seaTunnelErrorCode, errorMessage, cause);
+    }
+
+    public SalesforceConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
Throwable cause) {
+        super(seaTunnelErrorCode, cause);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/source/SalesforceSource.java
 
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/source/SalesforceSource.java
new file mode 100644
index 0000000000..890c711aa1
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/source/SalesforceSource.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.salesforce.source;
+
+import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.options.ConnectorCommonOptions;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SupportColumnProjection;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import 
org.apache.seatunnel.connectors.seatunnel.salesforce.client.SalesforceClient;
+import 
org.apache.seatunnel.connectors.seatunnel.salesforce.config.SalesforceParameters;
+import 
org.apache.seatunnel.connectors.seatunnel.salesforce.config.SalesforceSourceOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.salesforce.config.SalesforceTableConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.salesforce.exception.SalesforceConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.salesforce.exception.SalesforceConnectorException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class SalesforceSource extends AbstractSingleSplitSource<SeaTunnelRow>
+        implements SupportColumnProjection {
+
+    private static final String PLUGIN_NAME = "Salesforce";
+
+    private final SalesforceParameters params;
+    private final List<SalesforceTableConfig> tableConfigs;
+
+    public SalesforceSource(SalesforceParameters params, ReadonlyConfig 
config) {
+        this.params = params;
+        this.tableConfigs = buildTableConfigs(params, config);
+    }
+
+    /**
+     * Resolves the user-facing object_name / tables_configs options into one 
or more
+     * SalesforceTableConfig instances, calling /describe per object to derive 
the CatalogTable
+     * schema. Runs once during factory createSource and uses a one-shot 
SalesforceClient whose
+     * lifetime is scoped to this call (try-with-resources).
+     */
+    private List<SalesforceTableConfig> buildTableConfigs(
+            SalesforceParameters params, ReadonlyConfig config) {
+        try (SalesforceClient client = new SalesforceClient(params)) {
+            client.authenticate();
+
+            if 
(config.getOptional(ConnectorCommonOptions.TABLE_CONFIGS).isPresent()) {
+                List<Map<String, Object>> tableConfigMaps =
+                        config.get(ConnectorCommonOptions.TABLE_CONFIGS);
+                List<SalesforceTableConfig> configs = new ArrayList<>();
+                for (Map<String, Object> map : tableConfigMaps) {
+                    ReadonlyConfig tableConfig = ReadonlyConfig.fromMap(map);
+                    SalesforceTableConfig built = 
buildOneTableConfig(tableConfig, client);
+                    String tableId = built.getTableId();
+                    boolean duplicate =
+                            configs.stream().anyMatch(c -> 
c.getTableId().equals(tableId));
+                    if (duplicate) {
+                        throw new SalesforceConnectorException(
+                                SalesforceConnectorErrorCode.DUPLICATE_OBJECT,
+                                "Duplicate object in tables_configs: " + 
tableId);
+                    }
+                    configs.add(built);
+                }
+                return configs;
+            } else {
+                String objectName = 
config.get(SalesforceSourceOptions.OBJECT_NAME);
+                String soql = buildSoql(config, objectName);
+                CatalogTable table = client.describeObject(PLUGIN_NAME, 
objectName);
+                return Collections.singletonList(
+                        new SalesforceTableConfig(soql, objectName, table));
+            }
+        } catch (SalesforceConnectorException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new SalesforceConnectorException(
+                    SalesforceConnectorErrorCode.DESCRIBE_OBJECT_FAILED,
+                    "Failed to build Salesforce table configs",
+                    e);
+        }
+    }
+
+    private SalesforceTableConfig buildOneTableConfig(
+            ReadonlyConfig tableConfig, SalesforceClient client) {
+        String tablePath =
+                tableConfig
+                        .getOptional(SalesforceSourceOptions.TABLE_PATH)
+                        .orElseThrow(
+                                () ->
+                                        new SalesforceConnectorException(
+                                                
SalesforceConnectorErrorCode.INVALID_TABLE_PATH,
+                                                "table_path is required in 
tables_configs"));
+        String[] parts = tablePath.split("\\.", 2);
+        if (parts.length != 2 || StringUtils.isBlank(parts[1])) {
+            throw new SalesforceConnectorException(
+                    SalesforceConnectorErrorCode.INVALID_TABLE_PATH,
+                    "table_path must be 'database.ObjectName', got: " + 
tablePath);
+        }
+        String database = parts[0];
+        String objectName = parts[1];
+        String soql = buildSoql(tableConfig, objectName);
+        CatalogTable table = client.describeObject(database, objectName);
+        return new SalesforceTableConfig(soql, objectName, table);
+    }
+
+    /**
+     * Builds the SOQL the Bulk API job will run for one object. Always 
selects every
+     * describe-discovered field via FIELDS(ALL) so the emitted rows stay 
positionally aligned with
+     * the CatalogTable schema. An optional filter is appended verbatim as the 
WHERE clause.
+     */
+    private String buildSoql(ReadonlyConfig config, String objectName) {
+        StringBuilder soql = new StringBuilder("SELECT FIELDS(ALL) FROM 
").append(objectName);
+        String filter = 
config.getOptional(SalesforceSourceOptions.FILTER).orElse(null);
+        if (filter != null) {
+            soql.append(" WHERE ").append(filter);
+        }
+        return soql.toString();
+    }
+
+    @Override
+    public String getPluginName() {
+        return PLUGIN_NAME;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public List<CatalogTable> getProducedCatalogTables() {
+        return tableConfigs.stream()
+                .map(SalesforceTableConfig::getCatalogTable)
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public AbstractSingleSplitReader<SeaTunnelRow> createReader(
+            SingleSplitReaderContext readerContext) throws Exception {
+        return new SalesforceSourceReader(params, tableConfigs, readerContext);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/source/SalesforceSourceFactory.java
 
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/source/SalesforceSourceFactory.java
new file mode 100644
index 0000000000..d58422ec1f
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/source/SalesforceSourceFactory.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.salesforce.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.options.ConnectorCommonOptions;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import 
org.apache.seatunnel.connectors.seatunnel.salesforce.config.SalesforceParameters;
+import 
org.apache.seatunnel.connectors.seatunnel.salesforce.config.SalesforceSourceOptions;
+
+import com.google.auto.service.AutoService;
+
+import java.io.Serializable;
+
+@AutoService(Factory.class)
+public class SalesforceSourceFactory implements TableSourceFactory {
+
+    @Override
+    public String factoryIdentifier() {
+        return "Salesforce";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(
+                        SalesforceSourceOptions.CLIENT_ID,
+                        SalesforceSourceOptions.CLIENT_SECRET,
+                        SalesforceSourceOptions.USERNAME,
+                        SalesforceSourceOptions.PASSWORD,
+                        SalesforceSourceOptions.INSTANCE_URL)
+                .exclusive(
+                        SalesforceSourceOptions.OBJECT_NAME, 
ConnectorCommonOptions.TABLE_CONFIGS)
+                .optional(
+                        SalesforceSourceOptions.SECURITY_TOKEN,
+                        SalesforceSourceOptions.API_VERSION,
+                        SalesforceSourceOptions.FILTER,
+                        SalesforceSourceOptions.REQUEST_TIMEOUT_MS,
+                        SalesforceSourceOptions.POLL_INTERVAL_MS,
+                        SalesforceSourceOptions.JOB_COMPLETION_TIMEOUT_MS)
+                .build();
+    }
+
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        SalesforceParameters params = new SalesforceParameters();
+        params.buildWithConfig(context.getOptions());
+        return () ->
+                (SeaTunnelSource<T, SplitT, StateT>)
+                        new SalesforceSource(params, context.getOptions());
+    }
+
+    @Override
+    public Class<? extends SeaTunnelSource> getSourceClass() {
+        return SalesforceSource.class;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/source/SalesforceSourceReader.java
 
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/source/SalesforceSourceReader.java
new file mode 100644
index 0000000000..439b3d62c3
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-salesforce/src/main/java/org/apache/seatunnel/connectors/seatunnel/salesforce/source/SalesforceSourceReader.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.salesforce.source;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import 
org.apache.seatunnel.connectors.seatunnel.salesforce.client.SalesforceClient;
+import 
org.apache.seatunnel.connectors.seatunnel.salesforce.config.SalesforceParameters;
+import 
org.apache.seatunnel.connectors.seatunnel.salesforce.config.SalesforceTableConfig;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Base64;
+import java.util.List;
+
+@Slf4j
+public class SalesforceSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
+
+    private static final DateTimeFormatter DATE_FMT = 
DateTimeFormatter.ofPattern("yyyy-MM-dd");
+    private static final DateTimeFormatter DATETIME_FMT =
+            DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
+    private static final DateTimeFormatter TIME_FMT =
+            DateTimeFormatter.ofPattern("HH:mm:ss.SSS'Z'");
+
+    private final SalesforceParameters params;
+    private final List<SalesforceTableConfig> tableConfigs;
+    private final SingleSplitReaderContext readerContext;
+    private SalesforceClient client;
+
+    SalesforceSourceReader(
+            SalesforceParameters params,
+            List<SalesforceTableConfig> tableConfigs,
+            SingleSplitReaderContext readerContext) {
+        this.params = params;
+        this.tableConfigs = tableConfigs;
+        this.readerContext = readerContext;
+    }
+
+    @Override
+    public void open() throws Exception {
+        client = new SalesforceClient(params);
+        client.authenticate();
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (client != null) {
+            client.close();
+        }
+    }
+
+    /**
+     * Single-pass bounded read for the assigned split. For each configured 
table, submits one Bulk
+     * API query job and hands the client a per-row consumer that converts the 
raw CSV Object[] to a
+     * SeaTunnelRow, tags it with the table id, and forwards it to the 
downstream collector. Rows
+     * flow through without buffering the whole result set in the reader. 
After every table has
+     * drained, signals no-more-elements so the framework can close the split.
+     */
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+        try {
+            for (SalesforceTableConfig tableConfig : tableConfigs) {
+                CatalogTable catalogTable = tableConfig.getCatalogTable();
+                SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
+                int columnCount = rowType.getTotalFields();
+                String tableId = tableConfig.getTableId();
+                log.info("Reading rows from Salesforce object {}", 
tableConfig.getObjectName());
+                client.executeBulkQuery(
+                        tableConfig.getSoql(),
+                        columnCount,
+                        raw -> {
+                            SeaTunnelRow row = convertRow(raw, rowType);
+                            row.setTableId(tableId);
+                            output.collect(row);
+                        });
+            }
+        } finally {
+            readerContext.signalNoMoreElement();
+        }
+    }
+
+    private SeaTunnelRow convertRow(Object[] raw, SeaTunnelRowType rowType) {
+        Object[] fields = new Object[raw.length];
+        for (int i = 0; i < raw.length; i++) {
+            String val = raw[i] == null ? null : raw[i].toString();
+            if (val == null) {
+                fields[i] = null;
+                continue;
+            }
+            SeaTunnelDataType<?> type = rowType.getFieldType(i);
+            fields[i] = parseValue(val, type);
+        }
+        return new SeaTunnelRow(fields);
+    }
+
+    private Object parseValue(String val, SeaTunnelDataType<?> type) {
+        switch (type.getSqlType()) {
+            case BOOLEAN:
+                return Boolean.parseBoolean(val);
+            case INT:
+                return Integer.parseInt(val);
+            case DOUBLE:
+                return Double.parseDouble(val);
+            case DATE:
+                return LocalDate.parse(val, DATE_FMT);
+            case TIMESTAMP:
+                return LocalDateTime.parse(val, DATETIME_FMT);
+            case TIME:
+                return LocalTime.parse(val, TIME_FMT);
+            case BYTES:
+                return Base64.getDecoder().decode(val);
+            default:
+                return val;
+        }
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {}
+}
diff --git 
a/seatunnel-connectors-v2/connector-salesforce/src/test/java/org/apache/seatunnel/connectors/seatunnel/salesforce/SalesforceSourceFactoryTest.java
 
b/seatunnel-connectors-v2/connector-salesforce/src/test/java/org/apache/seatunnel/connectors/seatunnel/salesforce/SalesforceSourceFactoryTest.java
new file mode 100644
index 0000000000..a816cc52ae
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-salesforce/src/test/java/org/apache/seatunnel/connectors/seatunnel/salesforce/SalesforceSourceFactoryTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.salesforce;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.configuration.util.OptionValidationException;
+import 
org.apache.seatunnel.connectors.seatunnel.salesforce.source.SalesforceSourceFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+class SalesforceSourceFactoryTest {
+
+    private static final SalesforceSourceFactory FACTORY = new 
SalesforceSourceFactory();
+
+    @Test
+    void testOptionRuleIsNotNull() {
+        Assertions.assertNotNull(FACTORY.optionRule());
+    }
+
+    @Test
+    void testFactoryIdentifier() {
+        Assertions.assertEquals("Salesforce", FACTORY.factoryIdentifier());
+    }
+
+    @Test
+    void testSingleObjectModeValid() {
+        Map<String, Object> config = requiredAuthConfig();
+        config.put("object_name", "Account");
+
+        Assertions.assertDoesNotThrow(
+                () ->
+                        ConfigValidator.of(ReadonlyConfig.fromMap(config))
+                                .validate(FACTORY.optionRule()));
+    }
+
+    @Test
+    void testSingleObjectModeWithOptionalParamsValid() {
+        Map<String, Object> config = requiredAuthConfig();
+        config.put("object_name", "Account");
+        config.put("filter", "AnnualRevenue > 1000000");
+        config.put("api_version", "v58.0");
+        config.put("max_retries", 5);
+        config.put("request_timeout_ms", 30000);
+
+        Assertions.assertDoesNotThrow(
+                () ->
+                        ConfigValidator.of(ReadonlyConfig.fromMap(config))
+                                .validate(FACTORY.optionRule()));
+    }
+
+    @Test
+    void testMultiObjectModeValid() {
+        Map<String, Object> config = requiredAuthConfig();
+        config.put("tables_configs", multiObjectTablesConfigs());
+
+        Assertions.assertDoesNotThrow(
+                () ->
+                        ConfigValidator.of(ReadonlyConfig.fromMap(config))
+                                .validate(FACTORY.optionRule()));
+    }
+
+    @Test
+    void testBothObjectNameAndTablesConfigsThrows() {
+        Map<String, Object> config = requiredAuthConfig();
+        config.put("object_name", "Account");
+        config.put("tables_configs", multiObjectTablesConfigs());
+
+        Assertions.assertThrows(
+                OptionValidationException.class,
+                () ->
+                        ConfigValidator.of(ReadonlyConfig.fromMap(config))
+                                .validate(FACTORY.optionRule()));
+    }
+
+    @Test
+    void testNeitherObjectNameNorTablesConfigsThrows() {
+        Map<String, Object> config = requiredAuthConfig();
+
+        Assertions.assertThrows(
+                OptionValidationException.class,
+                () ->
+                        ConfigValidator.of(ReadonlyConfig.fromMap(config))
+                                .validate(FACTORY.optionRule()));
+    }
+
+    @Test
+    void testMissingClientIdThrows() {
+        Map<String, Object> config = requiredAuthConfig();
+        config.remove("client_id");
+        config.put("object_name", "Account");
+
+        Assertions.assertThrows(
+                OptionValidationException.class,
+                () ->
+                        ConfigValidator.of(ReadonlyConfig.fromMap(config))
+                                .validate(FACTORY.optionRule()));
+    }
+
+    @Test
+    void testMissingInstanceUrlThrows() {
+        Map<String, Object> config = requiredAuthConfig();
+        config.remove("instance_url");
+        config.put("object_name", "Account");
+
+        Assertions.assertThrows(
+                OptionValidationException.class,
+                () ->
+                        ConfigValidator.of(ReadonlyConfig.fromMap(config))
+                                .validate(FACTORY.optionRule()));
+    }
+
+    @Test
+    void testMissingUsernameThrows() {
+        Map<String, Object> config = requiredAuthConfig();
+        config.remove("username");
+        config.put("object_name", "Account");
+
+        Assertions.assertThrows(
+                OptionValidationException.class,
+                () ->
+                        ConfigValidator.of(ReadonlyConfig.fromMap(config))
+                                .validate(FACTORY.optionRule()));
+    }
+
+    private static Map<String, Object> requiredAuthConfig() {
+        Map<String, Object> config = new HashMap<>();
+        config.put("client_id", "test_client_id");
+        config.put("client_secret", "test_client_secret");
+        config.put("username", "[email protected]");
+        config.put("password", "test_password");
+        config.put("instance_url", "https://test.salesforce.com";);
+        return config;
+    }
+
+    private static List<Map<String, Object>> multiObjectTablesConfigs() {
+        Map<String, Object> account = new HashMap<>();
+        account.put("table_path", "salesforce.Account");
+
+        Map<String, Object> contact = new HashMap<>();
+        contact.put("table_path", "salesforce.Contact");
+        contact.put("filter", "IsDeleted = false");
+
+        return Arrays.asList(account, contact);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-salesforce/src/test/java/org/apache/seatunnel/connectors/seatunnel/salesforce/client/SalesforceClientTest.java
 
b/seatunnel-connectors-v2/connector-salesforce/src/test/java/org/apache/seatunnel/connectors/seatunnel/salesforce/client/SalesforceClientTest.java
new file mode 100644
index 0000000000..630cae17c0
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-salesforce/src/test/java/org/apache/seatunnel/connectors/seatunnel/salesforce/client/SalesforceClientTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.salesforce.client;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
+import 
org.apache.seatunnel.connectors.seatunnel.salesforce.config.SalesforceParameters;
+import 
org.apache.seatunnel.connectors.seatunnel.salesforce.exception.SalesforceConnectorException;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpServer;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+class SalesforceClientTest {
+
+    private HttpServer server;
+    private String baseUrl;
+
+    @BeforeEach
+    void setUp() throws IOException {
+        server = HttpServer.create(new InetSocketAddress("127.0.0.1", 0), 0);
+        server.start();
+        baseUrl = "http://127.0.0.1:"; + server.getAddress().getPort();
+    }
+
+    @AfterEach
+    void tearDown() {
+        if (server != null) {
+            server.stop(0);
+        }
+    }
+
+    @Test
+    void authenticateThenDescribeStoresAccessTokenAndInstanceUrl() throws 
IOException {
+        server.createContext(
+                "/services/oauth2/token",
+                exchange -> respondJson(exchange, 200, tokenResponse(baseUrl, 
"tok-1")));
+        server.createContext(
+                "/services/data/v59.0/sobjects/Account/describe",
+                exchange -> respondJson(exchange, 200, describeJson("Id", 
"id")));
+
+        try (SalesforceClient client = new 
SalesforceClient(buildParams(baseUrl))) {
+            client.authenticate();
+            CatalogTable table = client.describeObject("salesforce", 
"Account");
+            SeaTunnelRowType rowType = table.getSeaTunnelRowType();
+            Assertions.assertEquals(1, rowType.getTotalFields());
+            Assertions.assertEquals("Id", rowType.getFieldName(0));
+        }
+    }
+
+    @Test
+    void authenticateFailureRaisesSalesforceException() {
+        server.createContext(
+                "/services/oauth2/token",
+                exchange -> respondJson(exchange, 401, 
"{\"error\":\"invalid_grant\"}"));
+
+        SalesforceClient client = new SalesforceClient(buildParams(baseUrl));
+        SalesforceConnectorException ex =
+                Assertions.assertThrows(SalesforceConnectorException.class, 
client::authenticate);
+        Assertions.assertTrue(ex.getMessage().contains("401"));
+    }
+
+    @Test
+    void describeObjectMapsSalesforceTypesToSeaTunnel() throws IOException {
+        server.createContext(
+                "/services/oauth2/token",
+                exchange -> respondJson(exchange, 200, tokenResponse(baseUrl, 
"tok-1")));
+        server.createContext(
+                "/services/data/v59.0/sobjects/Opportunity/describe",
+                exchange -> {
+                    String body =
+                            "{\"fields\":["
+                                    + "{\"name\":\"Id\",\"type\":\"id\"},"
+                                    + 
"{\"name\":\"IsClosed\",\"type\":\"boolean\"},"
+                                    + 
"{\"name\":\"Amount\",\"type\":\"double\"},"
+                                    + 
"{\"name\":\"CreatedDate\",\"type\":\"datetime\"},"
+                                    + 
"{\"name\":\"CloseDate\",\"type\":\"date\"}"
+                                    + "]}";
+                    respondJson(exchange, 200, body);
+                });
+
+        try (SalesforceClient client = new 
SalesforceClient(buildParams(baseUrl))) {
+            client.authenticate();
+            CatalogTable table = client.describeObject("salesforce", 
"Opportunity");
+            SeaTunnelRowType rowType = table.getSeaTunnelRowType();
+            Assertions.assertEquals(5, rowType.getTotalFields());
+            Assertions.assertEquals(SqlType.STRING, 
rowType.getFieldType(0).getSqlType());
+            Assertions.assertEquals(SqlType.BOOLEAN, 
rowType.getFieldType(1).getSqlType());
+            Assertions.assertEquals(SqlType.DOUBLE, 
rowType.getFieldType(2).getSqlType());
+            Assertions.assertEquals(SqlType.TIMESTAMP, 
rowType.getFieldType(3).getSqlType());
+            Assertions.assertEquals(SqlType.DATE, 
rowType.getFieldType(4).getSqlType());
+        }
+    }
+
+    @Test
+    void executeBulkQueryReturnsParsedRows() throws IOException {
+        registerBulkJobHandlers("job-1", "JobComplete");
+        server.createContext(
+                "/services/data/v59.0/jobs/query/job-1/results",
+                exchange -> respondCsv(exchange, 200, 
"Id,Name\n001,Acme\n002,Globex\n", null));
+
+        try (SalesforceClient client = new 
SalesforceClient(buildParams(baseUrl))) {
+            client.authenticate();
+            List<Object[]> rows = new ArrayList<>();
+            client.executeBulkQuery("SELECT Id, Name FROM Account", 2, 
rows::add);
+            Assertions.assertEquals(2, rows.size());
+            Assertions.assertArrayEquals(new Object[] {"001", "Acme"}, 
rows.get(0));
+            Assertions.assertArrayEquals(new Object[] {"002", "Globex"}, 
rows.get(1));
+        }
+    }
+
+    @Test
+    void executeBulkQueryFollowsSforceLocatorPagination() throws IOException {
+        registerBulkJobHandlers("job-1", "JobComplete");
+        AtomicInteger resultsHits = new AtomicInteger();
+        server.createContext(
+                "/services/data/v59.0/jobs/query/job-1/results",
+                exchange -> {
+                    int hit = resultsHits.incrementAndGet();
+                    if (hit == 1) {
+                        respondCsv(exchange, 200, "Id\n001\n", "locator-2");
+                    } else {
+                        respondCsv(exchange, 200, "Id\n002\n", null);
+                    }
+                });
+
+        try (SalesforceClient client = new 
SalesforceClient(buildParams(baseUrl))) {
+            client.authenticate();
+            List<Object[]> rows = new ArrayList<>();
+            client.executeBulkQuery("SELECT Id FROM Account", 1, rows::add);
+            Assertions.assertEquals(2, rows.size());
+            Assertions.assertEquals(2, resultsHits.get());
+        }
+    }
+
+    @Test
+    void executeBulkQueryThrowsWhenJobFails() throws IOException {
+        registerBulkJobHandlers("job-1", "Failed");
+
+        try (SalesforceClient client = new 
SalesforceClient(buildParams(baseUrl))) {
+            client.authenticate();
+            Assertions.assertThrows(
+                    SalesforceConnectorException.class,
+                    () -> client.executeBulkQuery("SELECT Id FROM Account", 1, 
row -> {}));
+        }
+    }
+
+    @Test
+    void executeBulkQueryTimesOutWhenJobNeverCompletes() throws IOException {
+        registerBulkJobHandlers("job-1", "InProgress");
+
+        try (SalesforceClient client = new 
SalesforceClient(buildParams(baseUrl))) {
+            client.authenticate();
+            Assertions.assertThrows(
+                    SalesforceConnectorException.class,
+                    () -> client.executeBulkQuery("SELECT Id FROM Account", 1, 
row -> {}));
+        }
+    }
+
+    private void registerBulkJobHandlers(String jobId, String terminalState) {
+        server.createContext(
+                "/services/oauth2/token",
+                exchange -> respondJson(exchange, 200, tokenResponse(baseUrl, 
"tok-1")));
+        server.createContext(
+                "/services/data/v59.0/jobs/query",
+                exchange -> respondJson(exchange, 200, "{\"id\":\"" + jobId + 
"\"}"));
+        server.createContext(
+                "/services/data/v59.0/jobs/query/" + jobId,
+                exchange -> respondJson(exchange, 200, "{\"state\":\"" + 
terminalState + "\"}"));
+    }
+
+    private static String tokenResponse(String instanceUrl, String token) {
+        return "{\"access_token\":\"" + token + "\",\"instance_url\":\"" + 
instanceUrl + "\"}";
+    }
+
+    private static String describeJson(String fieldName, String fieldType) {
+        return "{\"fields\":[{\"name\":\"" + fieldName + "\",\"type\":\"" + 
fieldType + "\"}]}";
+    }
+
+    private static void respondJson(HttpExchange exchange, int status, String 
body)
+            throws IOException {
+        byte[] bytes = body.getBytes(StandardCharsets.UTF_8);
+        exchange.getResponseHeaders().add("Content-Type", "application/json");
+        exchange.sendResponseHeaders(status, bytes.length);
+        try (OutputStream out = exchange.getResponseBody()) {
+            out.write(bytes);
+        }
+    }
+
+    private static void respondCsv(HttpExchange exchange, int status, String 
body, String locator)
+            throws IOException {
+        byte[] bytes = body.getBytes(StandardCharsets.UTF_8);
+        exchange.getResponseHeaders().add("Content-Type", "text/csv");
+        exchange.getResponseHeaders().add("Sforce-Locator", locator == null ? 
"null" : locator);
+        exchange.sendResponseHeaders(status, bytes.length);
+        try (OutputStream out = exchange.getResponseBody()) {
+            out.write(bytes);
+        }
+    }
+
+    private static SalesforceParameters buildParams(String instanceUrl) {
+        Map<String, Object> map = new HashMap<>();
+        map.put("client_id", "cid");
+        map.put("client_secret", "csec");
+        map.put("username", "[email protected]");
+        map.put("password", "pwd");
+        map.put("instance_url", instanceUrl);
+        map.put("poll_interval_ms", 10L);
+        map.put("job_completion_timeout_ms", 500L);
+        SalesforceParameters p = new SalesforceParameters();
+        p.buildWithConfig(ReadonlyConfig.fromMap(map));
+        return p;
+    }
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index 47fa67dea9..6e0266fa6d 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -63,6 +63,7 @@
         <module>connector-amazondynamodb</module>
         <module>connector-tablestore</module>
         <module>connector-cassandra</module>
+        <module>connector-salesforce</module>
         <module>connector-s3-redshift</module>
         <module>connector-starrocks</module>
         <module>connector-google-sheets</module>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index 524805ac6f..ec8fd7a951 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -454,6 +454,12 @@
                     <version>${project.version}</version>
                     <scope>provided</scope>
                 </dependency>
+                <dependency>
+                    <groupId>org.apache.seatunnel</groupId>
+                    <artifactId>connector-salesforce</artifactId>
+                    <version>${project.version}</version>
+                    <scope>provided</scope>
+                </dependency>
                 <dependency>
                     <groupId>org.apache.seatunnel</groupId>
                     <artifactId>connector-file-s3</artifactId>

Reply via email to