This is an automated email from the ASF dual-hosted git repository.
chl-wxp pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f9730fa5f1 [Improve][Connector-V2][Jdbc] Support Oracle append values
insert mode (#10996)
f9730fa5f1 is described below
commit f9730fa5f175059e806adc77cda47f54bb4d5eff
Author: Jast <[email protected]>
AuthorDate: Thu Jun 4 11:07:06 2026 +0800
[Improve][Connector-V2][Jdbc] Support Oracle append values insert mode
(#10996)
---
docs/en/connectors/sink/Jdbc.md | 13 ++
docs/zh/connectors/sink/Jdbc.md | 13 ++
.../seatunnel/jdbc/config/JdbcSinkConfig.java | 7 ++
.../seatunnel/jdbc/config/JdbcSinkOptions.java | 8 ++
.../jdbc/internal/JdbcOutputFormatBuilder.java | 67 ++++++++++-
.../seatunnel/jdbc/sink/JdbcSinkFactory.java | 1 +
.../jdbc/internal/JdbcOutputFormatBuilderTest.java | 134 +++++++++++++++++++++
.../connectors/seatunnel/jdbc/JdbcOracleIT.java | 20 +++
.../jdbc_oracle_source_to_sink_append_values.conf | 66 ++++++++++
9 files changed, 328 insertions(+), 1 deletion(-)
diff --git a/docs/en/connectors/sink/Jdbc.md b/docs/en/connectors/sink/Jdbc.md
index c5f3c79acb..7d5396f57e 100644
--- a/docs/en/connectors/sink/Jdbc.md
+++ b/docs/en/connectors/sink/Jdbc.md
@@ -61,6 +61,7 @@ support `Xa transactions`. You can set `is_exactly_once=true`
to enable it.
| custom_sql | String | No | -
|
| enable_upsert | Boolean | No | true
|
| use_copy_statement | Boolean | No | false
|
+| oracle_insert_mode | Enum | No |
CONVENTIONAL |
| create_index | Boolean | No | true
|
| access_key_id | String | No |
|
| secret_access_key | String | No |
|
@@ -239,6 +240,18 @@ Use `COPY ${table} FROM STDIN` statement to import data.
Only drivers with `getC
NOTICE: `MAP`, `ARRAY`, `ROW` types are not supported.
+### oracle_insert_mode [Enum]
+
+Oracle insert mode. The default value is `CONVENTIONAL`, which keeps the
existing JDBC insert behavior.
+
+When set to `APPEND_VALUES`, SeaTunnel adds the Oracle `APPEND_VALUES` hint to
generated insert SQL:
+
+```sql
+INSERT /*+ APPEND_VALUES */ INTO ...
+```
+
+This option is only supported for Oracle JDBC sink insert-only writes. It
requires `generate_sink_sql = true`, `auto_commit = true`, no custom `query`,
no `primary_keys`, `is_exactly_once = false`, and
`support_upsert_by_insert_only = false`.
+
### create_index [boolean]
Create the index(contains primary key and any other indexes) or not when
auto-create table. You can use this option to improve the performance of jdbc
writes when migrating large tables.
diff --git a/docs/zh/connectors/sink/Jdbc.md b/docs/zh/connectors/sink/Jdbc.md
index 36b134e0d0..43d3d3240c 100644
--- a/docs/zh/connectors/sink/Jdbc.md
+++ b/docs/zh/connectors/sink/Jdbc.md
@@ -59,6 +59,7 @@ import ChangeLog from '../changelog/connector-jdbc.md';
| custom_sql | String | 否 | -
|
| enable_upsert | Boolean | 否 | true
|
| use_copy_statement | Boolean | 否 | false
|
+| oracle_insert_mode | Enum | 否 | CONVENTIONAL
|
| access_key_id | String | 否 |
|
| secret_access_key | String | 否 |
|
| region | String | 否 |
|
@@ -234,6 +235,18 @@ Sink插件常用参数,请参考 [Sink常用选项](../common-options/sink-com
注意:不支持 `MAP`、`ARRAY`、`ROW`类型
+### oracle_insert_mode [Enum]
+
+Oracle 插入模式。默认值为 `CONVENTIONAL`,保持现有 JDBC insert 行为。
+
+设置为 `APPEND_VALUES` 时,SeaTunnel 会为自动生成的 Oracle insert SQL 添加 `APPEND_VALUES`
hint:
+
+```sql
+INSERT /*+ APPEND_VALUES */ INTO ...
+```
+
+该选项仅支持 Oracle JDBC Sink 的 insert-only 写入。使用时必须配置 `generate_sink_sql =
true`、`auto_commit = true`,不能配置自定义 `query`,不能配置 `primary_keys`,并且
`is_exactly_once = false`、`support_upsert_by_insert_only = false`。
+
### access_key_id [String]
AWS IAM 认证中所需要的access_key_id 。 该参考仅适用于 dialect="dsql"
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkConfig.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkConfig.java
index 3f5e9f97d7..51b35b5d6f 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkConfig.java
@@ -41,6 +41,7 @@ public class JdbcSinkConfig implements Serializable {
private boolean supportUpsertByInsertOnly;
private boolean useCopyStatement;
@Builder.Default private boolean createIndex = true;
+ @Builder.Default private OracleInsertMode oracleInsertMode =
OracleInsertMode.CONVENTIONAL;
public static JdbcSinkConfig of(ReadonlyConfig config) {
JdbcSinkConfigBuilder builder = JdbcSinkConfig.builder();
@@ -56,6 +57,12 @@ public class JdbcSinkConfig implements Serializable {
builder.simpleSql(config.get(JdbcSinkOptions.QUERY));
builder.useCopyStatement(config.get(JdbcSinkOptions.USE_COPY_STATEMENT));
builder.createIndex(config.get(JdbcSinkOptions.CREATE_INDEX));
+
builder.oracleInsertMode(config.get(JdbcSinkOptions.ORACLE_INSERT_MODE));
return builder.build();
}
+
+ public enum OracleInsertMode {
+ CONVENTIONAL,
+ APPEND_VALUES
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkOptions.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkOptions.java
index 78a0f7d543..f77dbf97c7 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkOptions.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkOptions.java
@@ -127,6 +127,14 @@ public class JdbcSinkOptions extends JdbcCommonOptions {
.defaultValue(false)
.withDescription("support copy in statement (postgresql)");
+ public static final Option<JdbcSinkConfig.OracleInsertMode>
ORACLE_INSERT_MODE =
+ Options.key("oracle_insert_mode")
+ .enumType(JdbcSinkConfig.OracleInsertMode.class)
+ .defaultValue(JdbcSinkConfig.OracleInsertMode.CONVENTIONAL)
+ .withDescription(
+ "Oracle insert mode. CONVENTIONAL uses normal
insert statements. "
+ + "APPEND_VALUES adds the Oracle
APPEND_VALUES hint for insert-only writes.");
+
public static final Option<FieldIdeEnum> FIELD_IDE =
Options.key("field_ide")
.enumType(FieldIdeEnum.class)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
index 7b3376313f..84dcf0a6de 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferReducedBatchStatementExecutor;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferedBatchStatementExecutor;
@@ -62,6 +63,7 @@ public class JdbcOutputFormatBuilder {
final String database = jdbcSinkConfig.getDatabase();
final String table = jdbcSinkConfig.getTable();
final List<String> primaryKeys = jdbcSinkConfig.getPrimaryKeys();
+ validateOracleInsertMode(dialect, jdbcSinkConfig, primaryKeys);
if (jdbcSinkConfig.isUseCopyStatement()) {
statementExecutorFactory =
() ->
@@ -80,7 +82,12 @@ public class JdbcOutputFormatBuilder {
statementExecutorFactory =
() ->
createSimpleBufferedExecutor(
- dialect, database, table, tableSchema,
databaseTableSchema);
+ dialect,
+ jdbcSinkConfig,
+ database,
+ table,
+ tableSchema,
+ databaseTableSchema);
} else {
statementExecutorFactory =
() ->
@@ -104,12 +111,14 @@ public class JdbcOutputFormatBuilder {
private static JdbcBatchStatementExecutor<SeaTunnelRow>
createSimpleBufferedExecutor(
JdbcDialect dialect,
+ JdbcSinkConfig jdbcSinkConfig,
String database,
String table,
TableSchema tableSchema,
TableSchema databaseTableSchema) {
String insertSQL =
dialect.getInsertIntoStatement(database, table,
tableSchema.getFieldNames());
+ insertSQL = applyOracleAppendValuesHintIfNeeded(jdbcSinkConfig,
insertSQL);
return createSimpleBufferedExecutor(
insertSQL, tableSchema, databaseTableSchema,
dialect.getRowConverter());
}
@@ -346,6 +355,62 @@ public class JdbcOutputFormatBuilder {
rowConverter);
}
+ static String applyOracleAppendValuesHintIfNeeded(
+ JdbcSinkConfig jdbcSinkConfig, String insertSQL) {
+ if (!isOracleAppendValuesConfigured(jdbcSinkConfig)) {
+ return insertSQL;
+ }
+ final String insertPrefix = "INSERT";
+ if (!insertSQL.regionMatches(true, 0, insertPrefix, 0,
insertPrefix.length())) {
+ throw new IllegalArgumentException(
+ "oracle_insert_mode=APPEND_VALUES only supports generated
INSERT statements.");
+ }
+ String appendValuesSQL =
+ insertPrefix + " /*+ APPEND_VALUES */" +
insertSQL.substring(insertPrefix.length());
+ log.info("Oracle APPEND_VALUES insert mode is enabled, generated SQL:
{}", appendValuesSQL);
+ return appendValuesSQL;
+ }
+
+ private static void validateOracleInsertMode(
+ JdbcDialect dialect, JdbcSinkConfig jdbcSinkConfig, List<String>
primaryKeys) {
+ if (!isOracleAppendValuesConfigured(jdbcSinkConfig)) {
+ return;
+ }
+ if (!DatabaseIdentifier.ORACLE.equals(dialect.dialectName())) {
+ throw new IllegalArgumentException(
+ "oracle_insert_mode=APPEND_VALUES only supports Oracle
JDBC sink.");
+ }
+ if (jdbcSinkConfig.isUseCopyStatement()) {
+ throw new IllegalArgumentException(
+ "oracle_insert_mode=APPEND_VALUES does not support copy
statement.");
+ }
+ if (StringUtils.isNotBlank(jdbcSinkConfig.getSimpleSql())) {
+ throw new IllegalArgumentException(
+ "oracle_insert_mode=APPEND_VALUES does not support custom
query.");
+ }
+ if (jdbcSinkConfig.isExactlyOnce()) {
+ throw new IllegalArgumentException(
+ "oracle_insert_mode=APPEND_VALUES does not support
exactly-once JDBC sink.");
+ }
+ if (!jdbcSinkConfig.getJdbcConnectionConfig().isAutoCommit()) {
+ throw new IllegalArgumentException(
+ "oracle_insert_mode=APPEND_VALUES requires
auto_commit=true.");
+ }
+ if (primaryKeys != null && !primaryKeys.isEmpty()) {
+ throw new IllegalArgumentException(
+ "oracle_insert_mode=APPEND_VALUES only supports
insert-only writes without primary keys.");
+ }
+ if (jdbcSinkConfig.isSupportUpsertByInsertOnly()) {
+ throw new IllegalArgumentException(
+ "oracle_insert_mode=APPEND_VALUES does not support
insert-only upsert paths.");
+ }
+ }
+
+ private static boolean isOracleAppendValuesConfigured(JdbcSinkConfig
jdbcSinkConfig) {
+ return JdbcSinkConfig.OracleInsertMode.APPEND_VALUES.equals(
+ jdbcSinkConfig.getOracleInsertMode());
+ }
+
static Function<SeaTunnelRow, SeaTunnelRow> createKeyExtractor(int[]
pkFields) {
return row -> {
Object[] fields = new Object[pkFields.length];
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
index 5a7d1e756d..61bae380bf 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
@@ -227,6 +227,7 @@ public class JdbcSinkFactory implements TableSinkFactory {
JdbcSinkOptions.IS_PRIMARY_KEY_UPDATED,
JdbcSinkOptions.SUPPORT_UPSERT_BY_INSERT_ONLY,
JdbcSinkOptions.USE_COPY_STATEMENT,
+ JdbcSinkOptions.ORACLE_INSERT_MODE,
JdbcSinkOptions.COMPATIBLE_MODE,
JdbcSinkOptions.ENABLE_UPSERT,
JdbcSinkOptions.FIELD_IDE,
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilderTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilderTest.java
index 6b855fc3fb..6f23eca849 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilderTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilderTest.java
@@ -26,8 +26,11 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.TestConnection;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleDialect;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlServerDialect;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlserverJdbcRowConverter;
@@ -39,6 +42,7 @@ import org.mockito.Mockito;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
@@ -129,4 +133,134 @@ public class JdbcOutputFormatBuilderTest {
Assertions.assertEquals("databasewith.dot", database.getValue());
Assertions.assertEquals("dbo.tableName", table.getValue());
}
+
+ @Test
+ public void testOracleAppendValuesKeepsConventionalInsertSql() {
+ JdbcSinkConfig config =
+ JdbcSinkConfig.builder()
+
.oracleInsertMode(JdbcSinkConfig.OracleInsertMode.CONVENTIONAL)
+ .build();
+
+ String sql =
+ JdbcOutputFormatBuilder.applyOracleAppendValuesHintIfNeeded(
+ config, "INSERT INTO TEST_TABLE (ID) VALUES (:ID)");
+
+ Assertions.assertEquals("INSERT INTO TEST_TABLE (ID) VALUES (:ID)",
sql);
+ }
+
+ @Test
+ public void testOracleAppendValuesAddsHintToGeneratedInsertSql() {
+ JdbcSinkConfig config =
+ JdbcSinkConfig.builder()
+
.oracleInsertMode(JdbcSinkConfig.OracleInsertMode.APPEND_VALUES)
+ .build();
+
+ String sql =
+ JdbcOutputFormatBuilder.applyOracleAppendValuesHintIfNeeded(
+ config, "INSERT INTO TEST_TABLE (ID) VALUES (:ID)");
+
+ Assertions.assertEquals(
+ "INSERT /*+ APPEND_VALUES */ INTO TEST_TABLE (ID) VALUES
(:ID)", sql);
+ }
+
+ @Test
+ public void testOracleAppendValuesRejectsNonOracleDialect() {
+ JdbcSinkConfig config = appendValuesConfigBuilder().build();
+
+ IllegalArgumentException exception =
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> newBuilder(new MysqlDialect(), config).build());
+
+ Assertions.assertEquals(
+ "oracle_insert_mode=APPEND_VALUES only supports Oracle JDBC
sink.",
+ exception.getMessage());
+ }
+
+ @Test
+ public void testOracleAppendValuesRejectsCustomQuery() {
+ JdbcSinkConfig config =
+ appendValuesConfigBuilder().simpleSql("INSERT INTO TEST_TABLE
VALUES (?)").build();
+
+ IllegalArgumentException exception =
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> newBuilder(new OracleDialect(), config).build());
+
+ Assertions.assertEquals(
+ "oracle_insert_mode=APPEND_VALUES does not support custom
query.",
+ exception.getMessage());
+ }
+
+ @Test
+ public void testOracleAppendValuesRejectsAutoCommitDisabled() {
+ JdbcSinkConfig config =
+ appendValuesConfigBuilder()
+ .jdbcConnectionConfig(
+
JdbcConnectionConfig.builder().autoCommit(false).build())
+ .build();
+
+ IllegalArgumentException exception =
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> newBuilder(new OracleDialect(), config).build());
+
+ Assertions.assertEquals(
+ "oracle_insert_mode=APPEND_VALUES requires auto_commit=true.",
+ exception.getMessage());
+ }
+
+ @Test
+ public void testOracleAppendValuesRejectsPrimaryKeys() {
+ JdbcSinkConfig config =
+
appendValuesConfigBuilder().primaryKeys(Collections.singletonList("ID")).build();
+
+ IllegalArgumentException exception =
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> newBuilder(new OracleDialect(), config).build());
+
+ Assertions.assertEquals(
+ "oracle_insert_mode=APPEND_VALUES only supports insert-only
writes without primary keys.",
+ exception.getMessage());
+ }
+
+ @Test
+ public void testOracleAppendValuesRejectsExactlyOnce() {
+ JdbcSinkConfig config =
appendValuesConfigBuilder().isExactlyOnce(true).build();
+
+ IllegalArgumentException exception =
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> newBuilder(new OracleDialect(), config).build());
+
+ Assertions.assertEquals(
+ "oracle_insert_mode=APPEND_VALUES does not support
exactly-once JDBC sink.",
+ exception.getMessage());
+ }
+
+ private static JdbcSinkConfig.JdbcSinkConfigBuilder
appendValuesConfigBuilder() {
+ return JdbcSinkConfig.builder()
+ .database("TEST_SCHEMA")
+ .table("TEST_TABLE")
+
.jdbcConnectionConfig(JdbcConnectionConfig.builder().autoCommit(true).build())
+
.oracleInsertMode(JdbcSinkConfig.OracleInsertMode.APPEND_VALUES);
+ }
+
+ private static JdbcOutputFormatBuilder newBuilder(
+
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect
dialect,
+ JdbcSinkConfig config) {
+ return new JdbcOutputFormatBuilder(
+ dialect,
+ Mockito.mock(SimpleJdbcConnectionProvider.class),
+ config,
+ oracleAppendValuesTableSchema(),
+ null);
+ }
+
+ private static TableSchema oracleAppendValuesTableSchema() {
+ return TableSchema.builder()
+ .column(PhysicalColumn.of("ID", BasicType.INT_TYPE, 22L,
false, null, "ID"))
+ .build();
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
index 255bcc9c00..7e3e6fe595 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
@@ -45,6 +45,7 @@ import org.testcontainers.utility.MountableFile;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
+import java.sql.ResultSet;
import java.sql.Statement;
import java.sql.Timestamp;
import java.time.LocalDate;
@@ -188,6 +189,25 @@ public class JdbcOracleIT extends AbstractJdbcIT {
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
}
+ @TestTemplate
+ public void testOracleAppendValuesInsertMode(TestContainer container)
throws Exception {
+ try {
+ Container.ExecResult execResult =
+
container.executeJob("/jdbc_oracle_source_to_sink_append_values.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ try (Statement statement = connection.createStatement();
+ ResultSet resultSet =
+ statement.executeQuery(
+ "SELECT COUNT(*) FROM "
+ + buildTableInfoWithSchema(SCHEMA,
SINK_TABLE))) {
+ Assertions.assertTrue(resultSet.next());
+ Assertions.assertEquals(100, resultSet.getInt(1));
+ }
+ } finally {
+ clearTable(jdbcCase.getDatabase(), jdbcCase.getSchema(),
jdbcCase.getSinkTable());
+ }
+ }
+
@Override
JdbcCase getJdbcCase() {
Map<String, String> containerEnv = new HashMap<>();
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_append_values.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_append_values.conf
new file mode 100644
index 0000000000..f8670897e3
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_append_values.conf
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file verifies Oracle APPEND_VALUES insert mode on generated
sink SQL.
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ row.num = 100
+ schema = {
+ columns = [
+ {
+ name = VARCHAR_10_COL
+ type = string
+ columnLength = 10
+ },
+ {
+ name = CHAR_10_COL
+ type = string
+ columnLength = 10
+ },
+ {
+ name = INTEGER_COL
+ type = int
+ }
+ ]
+ }
+ string.length = 10
+ }
+}
+
+sink {
+ Jdbc {
+ driver = oracle.jdbc.driver.OracleDriver
+ url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER"
+ username = testUser
+ password = testPassword
+ database = XE
+ table = "TESTUSER.E2E_TABLE_SINK"
+ generate_sink_sql = true
+ oracle_insert_mode = APPEND_VALUES
+ auto_commit = true
+ properties {
+ database.oracle.jdbc.timezoneAsRegion = "false"
+ }
+ }
+}