This is an automated email from the ASF dual-hosted git repository.

chl-wxp pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f9730fa5f1 [Improve][Connector-V2][Jdbc] Support Oracle append values 
insert mode (#10996)
f9730fa5f1 is described below

commit f9730fa5f175059e806adc77cda47f54bb4d5eff
Author: Jast <[email protected]>
AuthorDate: Thu Jun 4 11:07:06 2026 +0800

    [Improve][Connector-V2][Jdbc] Support Oracle append values insert mode 
(#10996)
---
 docs/en/connectors/sink/Jdbc.md                    |  13 ++
 docs/zh/connectors/sink/Jdbc.md                    |  13 ++
 .../seatunnel/jdbc/config/JdbcSinkConfig.java      |   7 ++
 .../seatunnel/jdbc/config/JdbcSinkOptions.java     |   8 ++
 .../jdbc/internal/JdbcOutputFormatBuilder.java     |  67 ++++++++++-
 .../seatunnel/jdbc/sink/JdbcSinkFactory.java       |   1 +
 .../jdbc/internal/JdbcOutputFormatBuilderTest.java | 134 +++++++++++++++++++++
 .../connectors/seatunnel/jdbc/JdbcOracleIT.java    |  20 +++
 .../jdbc_oracle_source_to_sink_append_values.conf  |  66 ++++++++++
 9 files changed, 328 insertions(+), 1 deletion(-)

diff --git a/docs/en/connectors/sink/Jdbc.md b/docs/en/connectors/sink/Jdbc.md
index c5f3c79acb..7d5396f57e 100644
--- a/docs/en/connectors/sink/Jdbc.md
+++ b/docs/en/connectors/sink/Jdbc.md
@@ -61,6 +61,7 @@ support `Xa transactions`. You can set `is_exactly_once=true` 
to enable it.
 | custom_sql                                | String  | No       | -           
                 |
 | enable_upsert                             | Boolean | No       | true        
                 |
 | use_copy_statement                        | Boolean | No       | false       
                 |
+| oracle_insert_mode                        | Enum    | No       | 
CONVENTIONAL                 |
 | create_index                              | Boolean | No       | true        
                 |
 | access_key_id                             | String  | No       |             
                 |
 | secret_access_key                         | String  | No       |             
                 |
@@ -239,6 +240,18 @@ Use `COPY ${table} FROM STDIN` statement to import data. 
Only drivers with `getC
 
 NOTICE: `MAP`, `ARRAY`, `ROW` types are not supported.
 
+### oracle_insert_mode [Enum]
+
+Oracle insert mode. The default value is `CONVENTIONAL`, which keeps the 
existing JDBC insert behavior.
+
+When set to `APPEND_VALUES`, SeaTunnel adds the Oracle `APPEND_VALUES` hint to 
generated insert SQL:
+
+```sql
+INSERT /*+ APPEND_VALUES */ INTO ...
+```
+
+This option is only supported for Oracle JDBC sink insert-only writes. It 
requires `generate_sink_sql = true`, `auto_commit = true`, no custom `query`, 
no `primary_keys`, `is_exactly_once = false`, and 
`support_upsert_by_insert_only = false`.
+
 ### create_index [boolean]
 
 Create the index(contains primary key and any other indexes) or not when 
auto-create table. You can use this option to improve the performance of jdbc 
writes when migrating large tables.
diff --git a/docs/zh/connectors/sink/Jdbc.md b/docs/zh/connectors/sink/Jdbc.md
index 36b134e0d0..43d3d3240c 100644
--- a/docs/zh/connectors/sink/Jdbc.md
+++ b/docs/zh/connectors/sink/Jdbc.md
@@ -59,6 +59,7 @@ import ChangeLog from '../changelog/connector-jdbc.md';
 | custom_sql                                | String  | 否    | -               
             |
 | enable_upsert                             | Boolean | 否    | true            
             |
 | use_copy_statement                        | Boolean | 否    | false           
             |
+| oracle_insert_mode                        | Enum    | 否    | CONVENTIONAL    
             |
 | access_key_id                             | String  | 否       |              
                |
 | secret_access_key                         | String  | 否       |              
                |
 | region                                    | String  | 否       |              
                |
@@ -234,6 +235,18 @@ Sink插件常用参数,请参考 [Sink常用选项](../common-options/sink-com
 
 注意:不支持 `MAP`、`ARRAY`、`ROW`类型
 
+### oracle_insert_mode [Enum]
+
+Oracle 插入模式。默认值为 `CONVENTIONAL`,保持现有 JDBC insert 行为。
+
+设置为 `APPEND_VALUES` 时,SeaTunnel 会为自动生成的 Oracle insert SQL 添加 `APPEND_VALUES` 
hint:
+
+```sql
+INSERT /*+ APPEND_VALUES */ INTO ...
+```
+
+该选项仅支持 Oracle JDBC Sink 的 insert-only 写入。使用时必须配置 `generate_sink_sql = 
true`、`auto_commit = true`,不能配置自定义 `query`,不能配置 `primary_keys`,并且 
`is_exactly_once = false`、`support_upsert_by_insert_only = false`。
+
 ### access_key_id [String]
 AWS IAM 认证中所需要的access_key_id 。 该参考仅适用于 dialect="dsql"
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkConfig.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkConfig.java
index 3f5e9f97d7..51b35b5d6f 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkConfig.java
@@ -41,6 +41,7 @@ public class JdbcSinkConfig implements Serializable {
     private boolean supportUpsertByInsertOnly;
     private boolean useCopyStatement;
     @Builder.Default private boolean createIndex = true;
+    @Builder.Default private OracleInsertMode oracleInsertMode = 
OracleInsertMode.CONVENTIONAL;
 
     public static JdbcSinkConfig of(ReadonlyConfig config) {
         JdbcSinkConfigBuilder builder = JdbcSinkConfig.builder();
@@ -56,6 +57,12 @@ public class JdbcSinkConfig implements Serializable {
         builder.simpleSql(config.get(JdbcSinkOptions.QUERY));
         
builder.useCopyStatement(config.get(JdbcSinkOptions.USE_COPY_STATEMENT));
         builder.createIndex(config.get(JdbcSinkOptions.CREATE_INDEX));
+        
builder.oracleInsertMode(config.get(JdbcSinkOptions.ORACLE_INSERT_MODE));
         return builder.build();
     }
+
+    public enum OracleInsertMode {
+        CONVENTIONAL,
+        APPEND_VALUES
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkOptions.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkOptions.java
index 78a0f7d543..f77dbf97c7 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkOptions.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkOptions.java
@@ -127,6 +127,14 @@ public class JdbcSinkOptions extends JdbcCommonOptions {
                     .defaultValue(false)
                     .withDescription("support copy in statement (postgresql)");
 
+    public static final Option<JdbcSinkConfig.OracleInsertMode> 
ORACLE_INSERT_MODE =
+            Options.key("oracle_insert_mode")
+                    .enumType(JdbcSinkConfig.OracleInsertMode.class)
+                    .defaultValue(JdbcSinkConfig.OracleInsertMode.CONVENTIONAL)
+                    .withDescription(
+                            "Oracle insert mode. CONVENTIONAL uses normal 
insert statements. "
+                                    + "APPEND_VALUES adds the Oracle 
APPEND_VALUES hint for insert-only writes.");
+
     public static final Option<FieldIdeEnum> FIELD_IDE =
             Options.key("field_ide")
                     .enumType(FieldIdeEnum.class)
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
index 7b3376313f..84dcf0a6de 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferReducedBatchStatementExecutor;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferedBatchStatementExecutor;
@@ -62,6 +63,7 @@ public class JdbcOutputFormatBuilder {
         final String database = jdbcSinkConfig.getDatabase();
         final String table = jdbcSinkConfig.getTable();
         final List<String> primaryKeys = jdbcSinkConfig.getPrimaryKeys();
+        validateOracleInsertMode(dialect, jdbcSinkConfig, primaryKeys);
         if (jdbcSinkConfig.isUseCopyStatement()) {
             statementExecutorFactory =
                     () ->
@@ -80,7 +82,12 @@ public class JdbcOutputFormatBuilder {
             statementExecutorFactory =
                     () ->
                             createSimpleBufferedExecutor(
-                                    dialect, database, table, tableSchema, 
databaseTableSchema);
+                                    dialect,
+                                    jdbcSinkConfig,
+                                    database,
+                                    table,
+                                    tableSchema,
+                                    databaseTableSchema);
         } else {
             statementExecutorFactory =
                     () ->
@@ -104,12 +111,14 @@ public class JdbcOutputFormatBuilder {
 
     private static JdbcBatchStatementExecutor<SeaTunnelRow> 
createSimpleBufferedExecutor(
             JdbcDialect dialect,
+            JdbcSinkConfig jdbcSinkConfig,
             String database,
             String table,
             TableSchema tableSchema,
             TableSchema databaseTableSchema) {
         String insertSQL =
                 dialect.getInsertIntoStatement(database, table, 
tableSchema.getFieldNames());
+        insertSQL = applyOracleAppendValuesHintIfNeeded(jdbcSinkConfig, 
insertSQL);
         return createSimpleBufferedExecutor(
                 insertSQL, tableSchema, databaseTableSchema, 
dialect.getRowConverter());
     }
@@ -346,6 +355,62 @@ public class JdbcOutputFormatBuilder {
                 rowConverter);
     }
 
+    static String applyOracleAppendValuesHintIfNeeded(
+            JdbcSinkConfig jdbcSinkConfig, String insertSQL) {
+        if (!isOracleAppendValuesConfigured(jdbcSinkConfig)) {
+            return insertSQL;
+        }
+        final String insertPrefix = "INSERT";
+        if (!insertSQL.regionMatches(true, 0, insertPrefix, 0, 
insertPrefix.length())) {
+            throw new IllegalArgumentException(
+                    "oracle_insert_mode=APPEND_VALUES only supports generated 
INSERT statements.");
+        }
+        String appendValuesSQL =
+                insertPrefix + " /*+ APPEND_VALUES */" + 
insertSQL.substring(insertPrefix.length());
+        log.info("Oracle APPEND_VALUES insert mode is enabled, generated SQL: 
{}", appendValuesSQL);
+        return appendValuesSQL;
+    }
+
+    private static void validateOracleInsertMode(
+            JdbcDialect dialect, JdbcSinkConfig jdbcSinkConfig, List<String> 
primaryKeys) {
+        if (!isOracleAppendValuesConfigured(jdbcSinkConfig)) {
+            return;
+        }
+        if (!DatabaseIdentifier.ORACLE.equals(dialect.dialectName())) {
+            throw new IllegalArgumentException(
+                    "oracle_insert_mode=APPEND_VALUES only supports Oracle 
JDBC sink.");
+        }
+        if (jdbcSinkConfig.isUseCopyStatement()) {
+            throw new IllegalArgumentException(
+                    "oracle_insert_mode=APPEND_VALUES does not support copy 
statement.");
+        }
+        if (StringUtils.isNotBlank(jdbcSinkConfig.getSimpleSql())) {
+            throw new IllegalArgumentException(
+                    "oracle_insert_mode=APPEND_VALUES does not support custom 
query.");
+        }
+        if (jdbcSinkConfig.isExactlyOnce()) {
+            throw new IllegalArgumentException(
+                    "oracle_insert_mode=APPEND_VALUES does not support 
exactly-once JDBC sink.");
+        }
+        if (!jdbcSinkConfig.getJdbcConnectionConfig().isAutoCommit()) {
+            throw new IllegalArgumentException(
+                    "oracle_insert_mode=APPEND_VALUES requires 
auto_commit=true.");
+        }
+        if (primaryKeys != null && !primaryKeys.isEmpty()) {
+            throw new IllegalArgumentException(
+                    "oracle_insert_mode=APPEND_VALUES only supports 
insert-only writes without primary keys.");
+        }
+        if (jdbcSinkConfig.isSupportUpsertByInsertOnly()) {
+            throw new IllegalArgumentException(
+                    "oracle_insert_mode=APPEND_VALUES does not support 
insert-only upsert paths.");
+        }
+    }
+
+    private static boolean isOracleAppendValuesConfigured(JdbcSinkConfig 
jdbcSinkConfig) {
+        return JdbcSinkConfig.OracleInsertMode.APPEND_VALUES.equals(
+                jdbcSinkConfig.getOracleInsertMode());
+    }
+
     static Function<SeaTunnelRow, SeaTunnelRow> createKeyExtractor(int[] 
pkFields) {
         return row -> {
             Object[] fields = new Object[pkFields.length];
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
index 5a7d1e756d..61bae380bf 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
@@ -227,6 +227,7 @@ public class JdbcSinkFactory implements TableSinkFactory {
                         JdbcSinkOptions.IS_PRIMARY_KEY_UPDATED,
                         JdbcSinkOptions.SUPPORT_UPSERT_BY_INSERT_ONLY,
                         JdbcSinkOptions.USE_COPY_STATEMENT,
+                        JdbcSinkOptions.ORACLE_INSERT_MODE,
                         JdbcSinkOptions.COMPATIBLE_MODE,
                         JdbcSinkOptions.ENABLE_UPSERT,
                         JdbcSinkOptions.FIELD_IDE,
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilderTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilderTest.java
index 6b855fc3fb..6f23eca849 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilderTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilderTest.java
@@ -26,8 +26,11 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.TestConnection;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlServerDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlserverJdbcRowConverter;
 
@@ -39,6 +42,7 @@ import org.mockito.Mockito;
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.function.Function;
@@ -129,4 +133,134 @@ public class JdbcOutputFormatBuilderTest {
         Assertions.assertEquals("databasewith.dot", database.getValue());
         Assertions.assertEquals("dbo.tableName", table.getValue());
     }
+
+    @Test
+    public void testOracleAppendValuesKeepsConventionalInsertSql() {
+        JdbcSinkConfig config =
+                JdbcSinkConfig.builder()
+                        
.oracleInsertMode(JdbcSinkConfig.OracleInsertMode.CONVENTIONAL)
+                        .build();
+
+        String sql =
+                JdbcOutputFormatBuilder.applyOracleAppendValuesHintIfNeeded(
+                        config, "INSERT INTO TEST_TABLE (ID) VALUES (:ID)");
+
+        Assertions.assertEquals("INSERT INTO TEST_TABLE (ID) VALUES (:ID)", 
sql);
+    }
+
+    @Test
+    public void testOracleAppendValuesAddsHintToGeneratedInsertSql() {
+        JdbcSinkConfig config =
+                JdbcSinkConfig.builder()
+                        
.oracleInsertMode(JdbcSinkConfig.OracleInsertMode.APPEND_VALUES)
+                        .build();
+
+        String sql =
+                JdbcOutputFormatBuilder.applyOracleAppendValuesHintIfNeeded(
+                        config, "INSERT INTO TEST_TABLE (ID) VALUES (:ID)");
+
+        Assertions.assertEquals(
+                "INSERT /*+ APPEND_VALUES */ INTO TEST_TABLE (ID) VALUES 
(:ID)", sql);
+    }
+
+    @Test
+    public void testOracleAppendValuesRejectsNonOracleDialect() {
+        JdbcSinkConfig config = appendValuesConfigBuilder().build();
+
+        IllegalArgumentException exception =
+                Assertions.assertThrows(
+                        IllegalArgumentException.class,
+                        () -> newBuilder(new MysqlDialect(), config).build());
+
+        Assertions.assertEquals(
+                "oracle_insert_mode=APPEND_VALUES only supports Oracle JDBC 
sink.",
+                exception.getMessage());
+    }
+
+    @Test
+    public void testOracleAppendValuesRejectsCustomQuery() {
+        JdbcSinkConfig config =
+                appendValuesConfigBuilder().simpleSql("INSERT INTO TEST_TABLE 
VALUES (?)").build();
+
+        IllegalArgumentException exception =
+                Assertions.assertThrows(
+                        IllegalArgumentException.class,
+                        () -> newBuilder(new OracleDialect(), config).build());
+
+        Assertions.assertEquals(
+                "oracle_insert_mode=APPEND_VALUES does not support custom 
query.",
+                exception.getMessage());
+    }
+
+    @Test
+    public void testOracleAppendValuesRejectsAutoCommitDisabled() {
+        JdbcSinkConfig config =
+                appendValuesConfigBuilder()
+                        .jdbcConnectionConfig(
+                                
JdbcConnectionConfig.builder().autoCommit(false).build())
+                        .build();
+
+        IllegalArgumentException exception =
+                Assertions.assertThrows(
+                        IllegalArgumentException.class,
+                        () -> newBuilder(new OracleDialect(), config).build());
+
+        Assertions.assertEquals(
+                "oracle_insert_mode=APPEND_VALUES requires auto_commit=true.",
+                exception.getMessage());
+    }
+
+    @Test
+    public void testOracleAppendValuesRejectsPrimaryKeys() {
+        JdbcSinkConfig config =
+                
appendValuesConfigBuilder().primaryKeys(Collections.singletonList("ID")).build();
+
+        IllegalArgumentException exception =
+                Assertions.assertThrows(
+                        IllegalArgumentException.class,
+                        () -> newBuilder(new OracleDialect(), config).build());
+
+        Assertions.assertEquals(
+                "oracle_insert_mode=APPEND_VALUES only supports insert-only 
writes without primary keys.",
+                exception.getMessage());
+    }
+
+    @Test
+    public void testOracleAppendValuesRejectsExactlyOnce() {
+        JdbcSinkConfig config = 
appendValuesConfigBuilder().isExactlyOnce(true).build();
+
+        IllegalArgumentException exception =
+                Assertions.assertThrows(
+                        IllegalArgumentException.class,
+                        () -> newBuilder(new OracleDialect(), config).build());
+
+        Assertions.assertEquals(
+                "oracle_insert_mode=APPEND_VALUES does not support 
exactly-once JDBC sink.",
+                exception.getMessage());
+    }
+
+    private static JdbcSinkConfig.JdbcSinkConfigBuilder 
appendValuesConfigBuilder() {
+        return JdbcSinkConfig.builder()
+                .database("TEST_SCHEMA")
+                .table("TEST_TABLE")
+                
.jdbcConnectionConfig(JdbcConnectionConfig.builder().autoCommit(true).build())
+                
.oracleInsertMode(JdbcSinkConfig.OracleInsertMode.APPEND_VALUES);
+    }
+
+    private static JdbcOutputFormatBuilder newBuilder(
+            
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect 
dialect,
+            JdbcSinkConfig config) {
+        return new JdbcOutputFormatBuilder(
+                dialect,
+                Mockito.mock(SimpleJdbcConnectionProvider.class),
+                config,
+                oracleAppendValuesTableSchema(),
+                null);
+    }
+
+    private static TableSchema oracleAppendValuesTableSchema() {
+        return TableSchema.builder()
+                .column(PhysicalColumn.of("ID", BasicType.INT_TYPE, 22L, 
false, null, "ID"))
+                .build();
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
index 255bcc9c00..7e3e6fe595 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
@@ -45,6 +45,7 @@ import org.testcontainers.utility.MountableFile;
 import java.math.BigDecimal;
 import java.nio.charset.StandardCharsets;
 import java.sql.Date;
+import java.sql.ResultSet;
 import java.sql.Statement;
 import java.sql.Timestamp;
 import java.time.LocalDate;
@@ -188,6 +189,25 @@ public class JdbcOracleIT extends AbstractJdbcIT {
         Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
     }
 
+    @TestTemplate
+    public void testOracleAppendValuesInsertMode(TestContainer container) 
throws Exception {
+        try {
+            Container.ExecResult execResult =
+                    
container.executeJob("/jdbc_oracle_source_to_sink_append_values.conf");
+            Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+            try (Statement statement = connection.createStatement();
+                    ResultSet resultSet =
+                            statement.executeQuery(
+                                    "SELECT COUNT(*) FROM "
+                                            + buildTableInfoWithSchema(SCHEMA, 
SINK_TABLE))) {
+                Assertions.assertTrue(resultSet.next());
+                Assertions.assertEquals(100, resultSet.getInt(1));
+            }
+        } finally {
+            clearTable(jdbcCase.getDatabase(), jdbcCase.getSchema(), 
jdbcCase.getSinkTable());
+        }
+    }
+
     @Override
     JdbcCase getJdbcCase() {
         Map<String, String> containerEnv = new HashMap<>();
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_append_values.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_append_values.conf
new file mode 100644
index 0000000000..f8670897e3
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_append_values.conf
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file verifies Oracle APPEND_VALUES insert mode on generated 
sink SQL.
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    row.num = 100
+    schema = {
+      columns = [
+        {
+          name = VARCHAR_10_COL
+          type = string
+          columnLength = 10
+        },
+        {
+          name = CHAR_10_COL
+          type = string
+          columnLength = 10
+        },
+        {
+          name = INTEGER_COL
+          type = int
+        }
+      ]
+    }
+    string.length = 10
+  }
+}
+
+sink {
+  Jdbc {
+    driver = oracle.jdbc.driver.OracleDriver
+    url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER"
+    username = testUser
+    password = testPassword
+    database = XE
+    table = "TESTUSER.E2E_TABLE_SINK"
+    generate_sink_sql = true
+    oracle_insert_mode = APPEND_VALUES
+    auto_commit = true
+    properties {
+       database.oracle.jdbc.timezoneAsRegion = "false"
+    }
+  }
+}

Reply via email to