This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1de056a9a4 [Improve][Connector-V2] The interface supports jdbc
respects the target database field type (#8031)
1de056a9a4 is described below
commit 1de056a9a4b3fb96106ac6e8f290b21818cd775e
Author: Jia Fan <[email protected]>
AuthorDate: Thu Dec 5 13:26:41 2024 +0800
[Improve][Connector-V2] The interface supports jdbc respects the target
database field type (#8031)
---
.../seatunnel/jdbc/catalog/dm/DamengCatalog.java | 97 +++++++-----
.../jdbc/internal/JdbcOutputFormatBuilder.java | 72 +++++++--
.../converter/AbstractJdbcRowConverter.java | 172 ++++++++++++---------
.../jdbc/internal/converter/JdbcRowConverter.java | 13 ++
.../dialect/hive/HiveJdbcRowConverter.java | 7 +-
.../dialect/inceptor/InceptorJdbcRowConverter.java | 7 +-
.../dialect/kingbase/KingbaseJdbcRowConverter.java | 7 +-
.../oceanbase/OceanBaseMysqlJdbcRowConverter.java | 7 +-
.../dialect/oracle/OracleJdbcRowConverter.java | 30 ++++
.../dialect/psql/PostgresJdbcRowConverter.java | 7 +-
.../executor/FieldNamedPreparedStatement.java | 4 +-
.../InsertOrUpdateBatchStatementExecutor.java | 11 +-
.../executor/SimpleBatchStatementExecutor.java | 5 +-
.../jdbc/sink/AbstractJdbcSinkWriter.java | 7 +-
.../jdbc/sink/JdbcExactlyOnceSinkWriter.java | 5 +-
.../connectors/seatunnel/jdbc/sink/JdbcSink.java | 85 ++++++++--
.../seatunnel/jdbc/sink/JdbcSinkWriter.java | 14 +-
.../connectors/seatunnel/jdbc/JdbcOracleIT.java | 11 ++
.../test/resources/jdbc_oracle_source_to_sink.conf | 4 +-
.../jdbc_oracle_source_to_sink_use_select1.conf | 4 +-
.../jdbc_oracle_source_to_sink_use_select2.conf | 2 +-
.../jdbc_oracle_source_to_sink_use_select3.conf | 2 +-
22 files changed, 416 insertions(+), 157 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
index 04b15dfe1c..ba237d3bfd 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
@@ -21,7 +21,6 @@ package
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.dm;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
@@ -34,7 +33,6 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTy
import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
-import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -44,6 +42,23 @@ import java.util.List;
@Slf4j
public class DamengCatalog extends AbstractJdbcCatalog {
+ private static final String SELECT_COLUMNS_SQL =
+ "SELECT COLUMNS.COLUMN_NAME, COLUMNS.DATA_TYPE,
COLUMNS.DATA_LENGTH, COLUMNS.DATA_PRECISION, COLUMNS.DATA_SCALE "
+ + ", COLUMNS.NULLABLE, COLUMNS.DATA_DEFAULT,
COMMENTS.COMMENTS ,"
+ + "CASE \n"
+ + " WHEN COLUMNS.DATA_TYPE IN ('CHAR', 'CHARACTER',
'VARCHAR', 'VARCHAR2', 'VARBINARY', 'BINARY') THEN COLUMNS.DATA_TYPE || '(' ||
COLUMNS.DATA_LENGTH || ')'\n"
+ + " WHEN COLUMNS.DATA_TYPE IN ('NUMERIC',
'DECIMAL', 'NUMBER') AND COLUMNS.DATA_PRECISION IS NOT NULL AND
COLUMNS.DATA_SCALE IS NOT NULL AND COLUMNS.DATA_PRECISION != 0 AND
COLUMNS.DATA_SCALE != 0 THEN COLUMNS.DATA_TYPE || '(' || COLUMNS.DATA_PRECISION
|| ', ' || COLUMNS.DATA_SCALE || ')'\n"
+ + " ELSE COLUMNS.DATA_TYPE\n"
+ + " END AS SOURCE_TYPE \n"
+ + "FROM ALL_TAB_COLUMNS COLUMNS "
+ + "LEFT JOIN ALL_COL_COMMENTS COMMENTS "
+ + "ON COLUMNS.OWNER = COMMENTS.SCHEMA_NAME "
+ + "AND COLUMNS.TABLE_NAME = COMMENTS.TABLE_NAME "
+ + "AND COLUMNS.COLUMN_NAME = COMMENTS.COLUMN_NAME "
+ + "WHERE COLUMNS.OWNER = '%s' "
+ + "AND COLUMNS.TABLE_NAME = '%s' "
+ + "ORDER BY COLUMNS.COLUMN_ID ASC";
+
public DamengCatalog(
String catalogName,
String username,
@@ -53,6 +68,23 @@ public class DamengCatalog extends AbstractJdbcCatalog {
super(catalogName, username, pwd, urlInfo, defaultSchema);
}
+ @Override
+ protected void createDatabaseInternal(String databaseName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected void dropDatabaseInternal(String databaseName) throws
CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getExistDataSql(TablePath tablePath) {
+ return String.format(
+ "select * from \"%s\".\"%s\" LIMIT 1",
+ tablePath.getSchemaName(), tablePath.getTableName());
+ }
+
@Override
protected String getDatabaseWithConditionSql(String databaseName) {
return String.format(getListDatabaseSql() + " where name = '%s'",
databaseName);
@@ -98,19 +130,34 @@ public class DamengCatalog extends AbstractJdbcCatalog {
return rs.getString(1) + "." + rs.getString(2);
}
+ @Override
+ protected String getSelectColumnsSql(TablePath tablePath) {
+ return String.format(
+ SELECT_COLUMNS_SQL, tablePath.getSchemaName(),
tablePath.getTableName());
+ }
+
+ @Override
+ protected String getTruncateTableSql(TablePath tablePath) {
+ return String.format(
+ "TRUNCATE TABLE \"%s\".\"%s\"",
+ tablePath.getSchemaName(), tablePath.getTableName());
+ }
+
@Override
protected Column buildColumn(ResultSet resultSet) throws SQLException {
String columnName = resultSet.getString("COLUMN_NAME");
- String typeName = resultSet.getString("TYPE_NAME");
- Long columnLength = resultSet.getLong("COLUMN_SIZE");
- Long columnPrecision = columnLength;
- Integer columnScale = resultSet.getObject("DECIMAL_DIGITS",
Integer.class);
- String columnComment = resultSet.getString("REMARKS");
- Object defaultValue = resultSet.getObject("COLUMN_DEF");
- boolean isNullable = (resultSet.getInt("NULLABLE") ==
DatabaseMetaData.columnNullable);
+ String typeName = resultSet.getString("DATA_TYPE");
+ long columnLength = resultSet.getLong("DATA_LENGTH");
+ long columnPrecision = resultSet.getLong("DATA_PRECISION");
+ int columnScale = resultSet.getInt("DATA_SCALE");
+ String columnComment = resultSet.getString("COMMENTS");
+ Object defaultValue = resultSet.getObject("DATA_DEFAULT");
+ boolean isNullable = resultSet.getString("NULLABLE").equals("Y");
+
BasicTypeDefine typeDefine =
BasicTypeDefine.builder()
.name(columnName)
+ .columnType(typeName)
.dataType(typeName)
.length(columnLength)
.precision(columnPrecision)
@@ -132,6 +179,11 @@ public class DamengCatalog extends AbstractJdbcCatalog {
return tablePath.getSchemaAndTableName();
}
+ private List<String> listTables() {
+ List<String> databases = listDatabases();
+ return listTables(databases.get(0));
+ }
+
@Override
public List<String> listTables(String databaseName)
throws CatalogException, DatabaseNotExistException {
@@ -161,31 +213,4 @@ public class DamengCatalog extends AbstractJdbcCatalog {
Connection defaultConnection = getConnection(defaultUrl);
return CatalogUtils.getCatalogTable(defaultConnection, sqlQuery, new
DmdbTypeMapper());
}
-
- @Override
- protected TableSchema.Builder buildColumnsReturnTablaSchemaBuilder(
- TablePath tablePath, Connection conn) throws SQLException {
- TableSchema.Builder columnsBuilder = TableSchema.builder();
- DatabaseMetaData metaData = conn.getMetaData();
- try (ResultSet resultSet =
- metaData.getColumns(
- null, tablePath.getSchemaName(),
tablePath.getTableName(), null)) {
- buildColumnsWithErrorCheck(tablePath, resultSet, columnsBuilder);
- }
- return columnsBuilder;
- }
-
- @Override
- protected String getTruncateTableSql(TablePath tablePath) {
- return String.format(
- "TRUNCATE TABLE \"%s\".\"%s\"",
- tablePath.getSchemaName(), tablePath.getTableName());
- }
-
- @Override
- protected String getExistDataSql(TablePath tablePath) {
- return String.format(
- "select * from \"%s\".\"%s\" WHERE rownum = 1",
- tablePath.getSchemaName(), tablePath.getTableName());
- }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
index dee1b58e0e..7748823ca4 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
@@ -39,6 +39,8 @@ import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import javax.annotation.Nullable;
+
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
@@ -53,6 +55,7 @@ public class JdbcOutputFormatBuilder {
@NonNull private final JdbcConnectionProvider connectionProvider;
@NonNull private final JdbcSinkConfig jdbcSinkConfig;
@NonNull private final TableSchema tableSchema;
+ @Nullable private final TableSchema databaseTableSchema;
public JdbcOutputFormat build() {
JdbcOutputFormat.StatementExecutorFactory statementExecutorFactory;
@@ -76,10 +79,13 @@ public class JdbcOutputFormatBuilder {
createSimpleBufferedExecutor(
jdbcSinkConfig.getSimpleSql(),
tableSchema,
+ databaseTableSchema,
dialect.getRowConverter());
} else if (primaryKeys == null || primaryKeys.isEmpty()) {
statementExecutorFactory =
- () -> createSimpleBufferedExecutor(dialect, database,
table, tableSchema);
+ () ->
+ createSimpleBufferedExecutor(
+ dialect, database, table, tableSchema,
databaseTableSchema);
} else {
statementExecutorFactory =
() ->
@@ -88,6 +94,7 @@ public class JdbcOutputFormatBuilder {
database,
table,
tableSchema,
+ databaseTableSchema,
primaryKeys.toArray(new String[0]),
jdbcSinkConfig.isEnableUpsert(),
jdbcSinkConfig.isPrimaryKeyUpdated(),
@@ -101,16 +108,24 @@ public class JdbcOutputFormatBuilder {
}
private static JdbcBatchStatementExecutor<SeaTunnelRow>
createSimpleBufferedExecutor(
- JdbcDialect dialect, String database, String table, TableSchema
tableSchema) {
+ JdbcDialect dialect,
+ String database,
+ String table,
+ TableSchema tableSchema,
+ TableSchema databaseTableSchema) {
String insertSQL =
dialect.getInsertIntoStatement(database, table,
tableSchema.getFieldNames());
- return createSimpleBufferedExecutor(insertSQL, tableSchema,
dialect.getRowConverter());
+ return createSimpleBufferedExecutor(
+ insertSQL, tableSchema, databaseTableSchema,
dialect.getRowConverter());
}
private static JdbcBatchStatementExecutor<SeaTunnelRow>
createSimpleBufferedExecutor(
- String sql, TableSchema tableSchema, JdbcRowConverter
rowConverter) {
+ String sql,
+ TableSchema tableSchema,
+ TableSchema databaseTableSchema,
+ JdbcRowConverter rowConverter) {
JdbcBatchStatementExecutor<SeaTunnelRow> simpleRowExecutor =
- createSimpleExecutor(sql, tableSchema, rowConverter);
+ createSimpleExecutor(sql, tableSchema, databaseTableSchema,
rowConverter);
return new BufferedBatchStatementExecutor(simpleRowExecutor,
Function.identity());
}
@@ -119,6 +134,7 @@ public class JdbcOutputFormatBuilder {
String database,
String table,
TableSchema tableSchema,
+ TableSchema databaseTableSchema,
String[] pkNames,
boolean enableUpsert,
boolean isPrimaryKeyUpdated,
@@ -139,13 +155,15 @@ public class JdbcOutputFormatBuilder {
Function<SeaTunnelRow, SeaTunnelRow> keyExtractor =
createKeyExtractor(pkFields);
JdbcBatchStatementExecutor<SeaTunnelRow> deleteExecutor =
- createDeleteExecutor(dialect, database, table, pkNames,
pkSchema);
+ createDeleteExecutor(
+ dialect, database, table, pkNames, pkSchema,
databaseTableSchema);
JdbcBatchStatementExecutor<SeaTunnelRow> upsertExecutor =
createUpsertExecutor(
dialect,
database,
table,
tableSchema,
+ databaseTableSchema,
pkNames,
pkSchema,
keyExtractor,
@@ -161,6 +179,7 @@ public class JdbcOutputFormatBuilder {
String database,
String table,
TableSchema tableSchema,
+ TableSchema databaseTableSchema,
String[] pkNames,
TableSchema pkTableSchema,
Function<SeaTunnelRow, SeaTunnelRow> keyExtractor,
@@ -168,7 +187,8 @@ public class JdbcOutputFormatBuilder {
boolean isPrimaryKeyUpdated,
boolean supportUpsertByInsertOnly) {
if (supportUpsertByInsertOnly) {
- return createInsertOnlyExecutor(dialect, database, table,
tableSchema);
+ return createInsertOnlyExecutor(
+ dialect, database, table, tableSchema,
databaseTableSchema);
}
if (enableUpsert) {
Optional<String> upsertSQL =
@@ -176,20 +196,30 @@ public class JdbcOutputFormatBuilder {
database, table, tableSchema.getFieldNames(),
pkNames);
if (upsertSQL.isPresent()) {
return createSimpleExecutor(
- upsertSQL.get(), tableSchema,
dialect.getRowConverter());
+ upsertSQL.get(),
+ tableSchema,
+ databaseTableSchema,
+ dialect.getRowConverter());
}
return createInsertOrUpdateByQueryExecutor(
dialect,
database,
table,
tableSchema,
+ databaseTableSchema,
pkNames,
pkTableSchema,
keyExtractor,
isPrimaryKeyUpdated);
}
return createInsertOrUpdateExecutor(
- dialect, database, table, tableSchema, pkNames,
isPrimaryKeyUpdated);
+ dialect,
+ database,
+ table,
+ tableSchema,
+ databaseTableSchema,
+ pkNames,
+ isPrimaryKeyUpdated);
}
private static JdbcBatchStatementExecutor<SeaTunnelRow>
createCopyInBufferStatementExecutor(
@@ -209,8 +239,11 @@ public class JdbcOutputFormatBuilder {
}
private static JdbcBatchStatementExecutor<SeaTunnelRow>
createInsertOnlyExecutor(
- JdbcDialect dialect, String database, String table, TableSchema
tableSchema) {
-
+ JdbcDialect dialect,
+ String database,
+ String table,
+ TableSchema tableSchema,
+ TableSchema databaseTableSchema) {
return new SimpleBatchStatementExecutor(
connection ->
FieldNamedPreparedStatement.prepareStatement(
@@ -219,6 +252,7 @@ public class JdbcOutputFormatBuilder {
database, table,
tableSchema.getFieldNames()),
tableSchema.getFieldNames()),
tableSchema,
+ databaseTableSchema,
dialect.getRowConverter());
}
@@ -227,6 +261,7 @@ public class JdbcOutputFormatBuilder {
String database,
String table,
TableSchema tableSchema,
+ TableSchema databaseTableSchema,
String[] pkNames,
boolean isPrimaryKeyUpdated) {
@@ -248,6 +283,7 @@ public class JdbcOutputFormatBuilder {
isPrimaryKeyUpdated),
tableSchema.getFieldNames()),
tableSchema,
+ databaseTableSchema,
dialect.getRowConverter());
}
@@ -256,6 +292,7 @@ public class JdbcOutputFormatBuilder {
String database,
String table,
TableSchema tableSchema,
+ TableSchema databaseTableSchema,
String[] pkNames,
TableSchema pkTableSchema,
Function<SeaTunnelRow, SeaTunnelRow> keyExtractor,
@@ -285,6 +322,7 @@ public class JdbcOutputFormatBuilder {
pkTableSchema,
keyExtractor,
tableSchema,
+ databaseTableSchema,
dialect.getRowConverter());
}
@@ -293,18 +331,24 @@ public class JdbcOutputFormatBuilder {
String database,
String table,
String[] pkNames,
- TableSchema pkTableSchema) {
+ TableSchema pkTableSchema,
+ TableSchema databaseTableSchema) {
String deleteSQL = dialect.getDeleteStatement(database, table,
pkNames);
- return createSimpleExecutor(deleteSQL, pkTableSchema,
dialect.getRowConverter());
+ return createSimpleExecutor(
+ deleteSQL, pkTableSchema, databaseTableSchema,
dialect.getRowConverter());
}
private static JdbcBatchStatementExecutor<SeaTunnelRow>
createSimpleExecutor(
- String sql, TableSchema tableSchema, JdbcRowConverter
rowConverter) {
+ String sql,
+ TableSchema tableSchema,
+ TableSchema databaseTableSchema,
+ JdbcRowConverter rowConverter) {
return new SimpleBatchStatementExecutor(
connection ->
FieldNamedPreparedStatement.prepareStatement(
connection, sql, tableSchema.getFieldNames()),
tableSchema,
+ databaseTableSchema,
rowConverter);
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
index 691de6b77c..42bcf2d894 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
@@ -31,6 +31,8 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils;
import lombok.extern.slf4j.Slf4j;
+import javax.annotation.Nullable;
+
import java.math.BigDecimal;
import java.sql.Array;
import java.sql.Date;
@@ -186,89 +188,37 @@ public abstract class AbstractJdbcRowConverter implements
JdbcRowConverter {
public PreparedStatement toExternal(
TableSchema tableSchema, SeaTunnelRow row, PreparedStatement
statement)
throws SQLException {
+ return toExternal(tableSchema, null, row, statement);
+ }
+
+ @Override
+ public PreparedStatement toExternal(
+ TableSchema tableSchema,
+ @Nullable TableSchema databaseTableSchema,
+ SeaTunnelRow row,
+ PreparedStatement statement)
+ throws SQLException {
SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields();
fieldIndex++) {
try {
SeaTunnelDataType<?> seaTunnelDataType =
rowType.getFieldType(fieldIndex);
+ String fieldName = rowType.getFieldName(fieldIndex);
int statementIndex = fieldIndex + 1;
Object fieldValue = row.getField(fieldIndex);
if (fieldValue == null) {
statement.setObject(statementIndex, null);
continue;
}
-
- switch (seaTunnelDataType.getSqlType()) {
- case STRING:
- statement.setString(statementIndex, (String)
row.getField(fieldIndex));
- break;
- case BOOLEAN:
- statement.setBoolean(statementIndex, (Boolean)
row.getField(fieldIndex));
- break;
- case TINYINT:
- statement.setByte(statementIndex, (Byte)
row.getField(fieldIndex));
- break;
- case SMALLINT:
- statement.setShort(statementIndex, (Short)
row.getField(fieldIndex));
- break;
- case INT:
- statement.setInt(statementIndex, (Integer)
row.getField(fieldIndex));
- break;
- case BIGINT:
- statement.setLong(statementIndex, (Long)
row.getField(fieldIndex));
- break;
- case FLOAT:
- statement.setFloat(statementIndex, (Float)
row.getField(fieldIndex));
- break;
- case DOUBLE:
- statement.setDouble(statementIndex, (Double)
row.getField(fieldIndex));
- break;
- case DECIMAL:
- statement.setBigDecimal(
- statementIndex, (BigDecimal)
row.getField(fieldIndex));
- break;
- case DATE:
- LocalDate localDate = (LocalDate)
row.getField(fieldIndex);
- statement.setDate(statementIndex,
java.sql.Date.valueOf(localDate));
- break;
- case TIME:
- writeTime(statement, statementIndex, (LocalTime)
row.getField(fieldIndex));
- break;
- case TIMESTAMP:
- LocalDateTime localDateTime = (LocalDateTime)
row.getField(fieldIndex);
- statement.setTimestamp(
- statementIndex,
java.sql.Timestamp.valueOf(localDateTime));
- break;
- case BYTES:
- statement.setBytes(statementIndex, (byte[])
row.getField(fieldIndex));
- break;
- case NULL:
- statement.setNull(statementIndex, java.sql.Types.NULL);
- break;
- case ARRAY:
- SeaTunnelDataType elementType =
- ((ArrayType)
seaTunnelDataType).getElementType();
- Object[] array = (Object[]) row.getField(fieldIndex);
- if (array == null) {
- statement.setNull(statementIndex,
java.sql.Types.ARRAY);
- break;
- }
- if (SqlType.TINYINT.equals(elementType.getSqlType())) {
- Short[] shortArray = new Short[array.length];
- for (int i = 0; i < array.length; i++) {
- shortArray[i] =
Short.valueOf(array[i].toString());
- }
- statement.setObject(statementIndex, shortArray);
- } else {
- statement.setObject(statementIndex, array);
- }
- break;
- case MAP:
- case ROW:
- default:
- throw new JdbcConnectorException(
-
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
- "Unexpected value: " + seaTunnelDataType);
+ String sourceType = null;
+ if (databaseTableSchema != null &&
databaseTableSchema.contains(fieldName)) {
+ sourceType =
databaseTableSchema.getColumn(fieldName).getSourceType();
}
+ setValueToStatementByDataType(
+ row.getField(fieldIndex),
+ statement,
+ seaTunnelDataType,
+ statementIndex,
+ sourceType);
} catch (Exception e) {
throw new JdbcConnectorException(
JdbcConnectorErrorCode.DATA_TYPE_CAST_FAILED,
@@ -279,6 +229,84 @@ public abstract class AbstractJdbcRowConverter implements
JdbcRowConverter {
return statement;
}
+ protected void setValueToStatementByDataType(
+ Object value,
+ PreparedStatement statement,
+ SeaTunnelDataType<?> seaTunnelDataType,
+ int statementIndex,
+ @Nullable String sourceType)
+ throws SQLException {
+ switch (seaTunnelDataType.getSqlType()) {
+ case STRING:
+ statement.setString(statementIndex, (String) value);
+ break;
+ case BOOLEAN:
+ statement.setBoolean(statementIndex, (Boolean) value);
+ break;
+ case TINYINT:
+ statement.setByte(statementIndex, (Byte) value);
+ break;
+ case SMALLINT:
+ statement.setShort(statementIndex, (Short) value);
+ break;
+ case INT:
+ statement.setInt(statementIndex, (Integer) value);
+ break;
+ case BIGINT:
+ statement.setLong(statementIndex, (Long) value);
+ break;
+ case FLOAT:
+ statement.setFloat(statementIndex, (Float) value);
+ break;
+ case DOUBLE:
+ statement.setDouble(statementIndex, (Double) value);
+ break;
+ case DECIMAL:
+ statement.setBigDecimal(statementIndex, (BigDecimal) value);
+ break;
+ case DATE:
+ LocalDate localDate = (LocalDate) value;
+ statement.setDate(statementIndex, Date.valueOf(localDate));
+ break;
+ case TIME:
+ writeTime(statement, statementIndex, (LocalTime) value);
+ break;
+ case TIMESTAMP:
+ LocalDateTime localDateTime = (LocalDateTime) value;
+ statement.setTimestamp(statementIndex,
Timestamp.valueOf(localDateTime));
+ break;
+ case BYTES:
+ statement.setBytes(statementIndex, (byte[]) value);
+ break;
+ case NULL:
+ statement.setNull(statementIndex, java.sql.Types.NULL);
+ break;
+ case ARRAY:
+ SeaTunnelDataType elementType = ((ArrayType)
seaTunnelDataType).getElementType();
+ Object[] array = (Object[]) value;
+ if (array == null) {
+ statement.setNull(statementIndex, java.sql.Types.ARRAY);
+ break;
+ }
+ if (SqlType.TINYINT.equals(elementType.getSqlType())) {
+ Short[] shortArray = new Short[array.length];
+ for (int i = 0; i < array.length; i++) {
+ shortArray[i] = Short.valueOf(array[i].toString());
+ }
+ statement.setObject(statementIndex, shortArray);
+ } else {
+ statement.setObject(statementIndex, array);
+ }
+ break;
+ case MAP:
+ case ROW:
+ default:
+ throw new JdbcConnectorException(
+ CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
+ "Unexpected value: " + seaTunnelDataType);
+ }
+ }
+
protected void writeTime(PreparedStatement statement, int index, LocalTime
time)
throws SQLException {
statement.setTime(index, java.sql.Time.valueOf(time));
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/JdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/JdbcRowConverter.java
index a8c7c079d3..f3cec3996c 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/JdbcRowConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/JdbcRowConverter.java
@@ -20,6 +20,8 @@ package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import javax.annotation.Nullable;
+
import java.io.Serializable;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -38,7 +40,18 @@ public interface JdbcRowConverter extends Serializable {
*/
SeaTunnelRow toInternal(ResultSet rs, TableSchema tableSchema) throws
SQLException;
+ @Deprecated
PreparedStatement toExternal(
TableSchema tableSchema, SeaTunnelRow row, PreparedStatement
statement)
throws SQLException;
+
+ /** Convert data from internal {@link SeaTunnelRow} to JDBC object. */
+ default PreparedStatement toExternal(
+ TableSchema tableSchema,
+ @Nullable TableSchema databaseTableSchema,
+ SeaTunnelRow row,
+ PreparedStatement statement)
+ throws SQLException {
+ return toExternal(tableSchema, row, statement);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcRowConverter.java
index 28f7fdf425..7433e66bb6 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcRowConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcRowConverter.java
@@ -24,6 +24,8 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorExc
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import javax.annotation.Nullable;
+
import java.sql.PreparedStatement;
public class HiveJdbcRowConverter extends AbstractJdbcRowConverter {
@@ -35,7 +37,10 @@ public class HiveJdbcRowConverter extends
AbstractJdbcRowConverter {
@Override
public PreparedStatement toExternal(
- TableSchema tableSchema, SeaTunnelRow row, PreparedStatement
statement) {
+ TableSchema tableSchema,
+ @Nullable TableSchema databaseTableSchema,
+ SeaTunnelRow row,
+ PreparedStatement statement) {
throw new JdbcConnectorException(
JdbcConnectorErrorCode.DONT_SUPPORT_SINK,
"The Hive jdbc connector don't support sink");
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/inceptor/InceptorJdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/inceptor/InceptorJdbcRowConverter.java
index 806788b30e..33c689fd39 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/inceptor/InceptorJdbcRowConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/inceptor/InceptorJdbcRowConverter.java
@@ -31,6 +31,8 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.hive.Hive
import org.apache.commons.lang3.StringUtils;
+import javax.annotation.Nullable;
+
import java.math.BigDecimal;
import java.sql.PreparedStatement;
import java.time.LocalDate;
@@ -46,7 +48,10 @@ public class InceptorJdbcRowConverter extends
HiveJdbcRowConverter {
@Override
public PreparedStatement toExternal(
- TableSchema tableSchema, SeaTunnelRow row, PreparedStatement
statement) {
+ TableSchema tableSchema,
+ @Nullable TableSchema databaseTableSchema,
+ SeaTunnelRow row,
+ PreparedStatement statement) {
SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields();
fieldIndex++) {
try {
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java
index 4a9411b99b..f766ce3980 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java
@@ -27,6 +27,8 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.Abstrac
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils;
+import javax.annotation.Nullable;
+
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.PreparedStatement;
@@ -119,7 +121,10 @@ public class KingbaseJdbcRowConverter extends
AbstractJdbcRowConverter {
@Override
public PreparedStatement toExternal(
- TableSchema tableSchema, SeaTunnelRow row, PreparedStatement
statement)
+ TableSchema tableSchema,
+ @Nullable TableSchema databaseTableSchema,
+ SeaTunnelRow row,
+ PreparedStatement statement)
throws SQLException {
SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields();
fieldIndex++) {
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
index 0a52e6a90b..5bf23ea1fe 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
@@ -34,6 +34,8 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils;
import org.apache.commons.lang3.StringUtils;
+import javax.annotation.Nullable;
+
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Date;
@@ -145,7 +147,10 @@ public class OceanBaseMysqlJdbcRowConverter extends
AbstractJdbcRowConverter {
@Override
public PreparedStatement toExternal(
- TableSchema tableSchema, SeaTunnelRow row, PreparedStatement
statement)
+ TableSchema tableSchema,
+ @Nullable TableSchema databaseTableSchema,
+ SeaTunnelRow row,
+ PreparedStatement statement)
throws SQLException {
SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields();
fieldIndex++) {
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleJdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleJdbcRowConverter.java
index 6e32cca436..6c74387f47 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleJdbcRowConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleJdbcRowConverter.java
@@ -17,13 +17,43 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SqlType;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleTypeConverter.ORACLE_BLOB;
+
public class OracleJdbcRowConverter extends AbstractJdbcRowConverter {
@Override
public String converterName() {
return DatabaseIdentifier.ORACLE;
}
+
+ @Override
+ protected void setValueToStatementByDataType(
+ Object value,
+ PreparedStatement statement,
+ SeaTunnelDataType<?> seaTunnelDataType,
+ int statementIndex,
+ @Nullable String sourceType)
+ throws SQLException {
+ if (seaTunnelDataType.getSqlType().equals(SqlType.BYTES)) {
+ if (ORACLE_BLOB.equals(sourceType)) {
+ statement.setBinaryStream(statementIndex, new
ByteArrayInputStream((byte[]) value));
+ } else {
+ statement.setBytes(statementIndex, (byte[]) value);
+ }
+ } else {
+ super.setValueToStatementByDataType(
+ value, statement, seaTunnelDataType, statementIndex,
sourceType);
+ }
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
index 071e8ec6e1..7fbb2f7782 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
@@ -33,6 +33,8 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils;
import org.postgresql.util.PGobject;
+import javax.annotation.Nullable;
+
import java.math.BigDecimal;
import java.sql.Array;
import java.sql.Date;
@@ -158,7 +160,10 @@ public class PostgresJdbcRowConverter extends
AbstractJdbcRowConverter {
@Override
public PreparedStatement toExternal(
- TableSchema tableSchema, SeaTunnelRow row, PreparedStatement
statement)
+ TableSchema tableSchema,
+ @Nullable TableSchema databaseTableSchema,
+ SeaTunnelRow row,
+ PreparedStatement statement)
throws SQLException {
SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
String[] sourceTypes =
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java
index c98f50ba92..88e658fc38 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java
@@ -336,7 +336,9 @@ public class FieldNamedPreparedStatement implements
PreparedStatement {
@Override
public void setBinaryStream(int parameterIndex, InputStream x) throws
SQLException {
- throw new UnsupportedOperationException();
+ for (int index : indexMapping[parameterIndex - 1]) {
+ statement.setBinaryStream(index, x);
+ }
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/InsertOrUpdateBatchStatementExecutor.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/InsertOrUpdateBatchStatementExecutor.java
index 9cf8b95863..ae08d027f7 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/InsertOrUpdateBatchStatementExecutor.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/InsertOrUpdateBatchStatementExecutor.java
@@ -26,6 +26,8 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRow
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
+import javax.annotation.Nullable;
+
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -42,6 +44,7 @@ public class InsertOrUpdateBatchStatementExecutor
private final TableSchema keyTableSchema;
private final Function<SeaTunnelRow, SeaTunnelRow> keyExtractor;
@NonNull private final TableSchema valueTableSchema;
+ @Nullable private final TableSchema databaseTableSchema;
@NonNull private final JdbcRowConverter rowConverter;
private transient PreparedStatement existStatement;
private transient PreparedStatement insertStatement;
@@ -53,6 +56,7 @@ public class InsertOrUpdateBatchStatementExecutor
StatementFactory insertStmtFactory,
StatementFactory updateStmtFactory,
TableSchema valueTableSchema,
+ TableSchema databaseTableSchema,
JdbcRowConverter rowConverter) {
this(
null,
@@ -61,6 +65,7 @@ public class InsertOrUpdateBatchStatementExecutor
null,
null,
valueTableSchema,
+ databaseTableSchema,
rowConverter);
}
@@ -81,14 +86,14 @@ public class InsertOrUpdateBatchStatementExecutor
insertStatement.executeBatch();
insertStatement.clearBatch();
}
- rowConverter.toExternal(valueTableSchema, record, updateStatement);
+ rowConverter.toExternal(valueTableSchema, databaseTableSchema,
record, updateStatement);
updateStatement.addBatch();
} else {
if (preExistFlag != null && preExistFlag) {
updateStatement.executeBatch();
updateStatement.clearBatch();
}
- rowConverter.toExternal(valueTableSchema, record, insertStatement);
+ rowConverter.toExternal(valueTableSchema, databaseTableSchema,
record, insertStatement);
insertStatement.addBatch();
}
@@ -147,7 +152,7 @@ public class InsertOrUpdateBatchStatementExecutor
}
private boolean exist(SeaTunnelRow pk) throws SQLException {
- rowConverter.toExternal(keyTableSchema, pk, existStatement);
+ rowConverter.toExternal(keyTableSchema, databaseTableSchema, pk,
existStatement);
try (ResultSet resultSet = existStatement.executeQuery()) {
return resultSet.next();
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/SimpleBatchStatementExecutor.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/SimpleBatchStatementExecutor.java
index a2f0add260..c7e6f8e2ce 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/SimpleBatchStatementExecutor.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/SimpleBatchStatementExecutor.java
@@ -24,6 +24,8 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRow
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
+import javax.annotation.Nullable;
+
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
@@ -32,6 +34,7 @@ import java.sql.SQLException;
public class SimpleBatchStatementExecutor implements
JdbcBatchStatementExecutor<SeaTunnelRow> {
@NonNull private final StatementFactory statementFactory;
@NonNull private final TableSchema tableSchema;
+ @Nullable private final TableSchema databaseTableSchema;
@NonNull private final JdbcRowConverter converter;
private transient PreparedStatement statement;
@@ -42,7 +45,7 @@ public class SimpleBatchStatementExecutor implements
JdbcBatchStatementExecutor<
@Override
public void addToBatch(SeaTunnelRow record) throws SQLException {
- converter.toExternal(tableSchema, record, statement);
+ converter.toExternal(tableSchema, databaseTableSchema, record,
statement);
statement.addBatch();
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java
index 7ab289edc1..521b65ed94 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java
@@ -50,6 +50,7 @@ public abstract class AbstractJdbcSinkWriter<ResourceT>
protected JdbcDialect dialect;
protected TablePath sinkTablePath;
protected TableSchema tableSchema;
+ protected TableSchema databaseTableSchema;
protected transient boolean isOpen;
protected JdbcConnectionProvider connectionProvider;
protected JdbcSinkConfig jdbcSinkConfig;
@@ -76,7 +77,11 @@ public abstract class AbstractJdbcSinkWriter<ResourceT>
}
this.outputFormat =
new JdbcOutputFormatBuilder(
- dialect, connectionProvider, jdbcSinkConfig,
tableSchema)
+ dialect,
+ connectionProvider,
+ jdbcSinkConfig,
+ tableSchema,
+ databaseTableSchema)
.build();
this.outputFormat.open();
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
index d14cf59211..874d04135f 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
@@ -77,6 +77,7 @@ public class JdbcExactlyOnceSinkWriter extends
AbstractJdbcSinkWriter<Void> {
JdbcDialect dialect,
JdbcSinkConfig jdbcSinkConfig,
TableSchema tableSchema,
+ TableSchema databaseTableSchema,
List<JdbcSinkState> states) {
checkArgument(
jdbcSinkConfig.getJdbcConnectionConfig().getMaxRetries() == 0,
@@ -95,7 +96,9 @@ public class JdbcExactlyOnceSinkWriter extends
AbstractJdbcSinkWriter<Void> {
XaFacade.fromJdbcConnectionOptions(jdbcSinkConfig.getJdbcConnectionConfig());
this.xaFacade = (XaFacade) this.connectionProvider;
this.outputFormat =
- new JdbcOutputFormatBuilder(dialect, xaFacade, jdbcSinkConfig,
tableSchema).build();
+ new JdbcOutputFormatBuilder(
+ dialect, xaFacade, jdbcSinkConfig,
tableSchema, databaseTableSchema)
+ .build();
this.xaGroupOps = new XaGroupOpsImpl(xaFacade);
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index 2213a90808..25fc39005d 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -34,6 +34,7 @@ import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.schema.SchemaChangeType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.iris.IrisCatalog;
@@ -52,6 +53,8 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcCatalogUtils;
import org.apache.commons.lang3.StringUtils;
+import lombok.extern.slf4j.Slf4j;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -60,6 +63,7 @@ import java.util.Optional;
import static
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
+@Slf4j
public class JdbcSink
implements SeaTunnelSink<SeaTunnelRow, JdbcSinkState, XidInfo,
JdbcAggregatedCommitInfo>,
SupportSaveMode,
@@ -105,6 +109,11 @@ public class JdbcSink
@Override
public AbstractJdbcSinkWriter createWriter(SinkWriter.Context context) {
+ try {
+
Class.forName(jdbcSinkConfig.getJdbcConnectionConfig().getDriverName());
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
TablePath sinkTablePath = catalogTable.getTablePath();
AbstractJdbcSinkWriter sinkWriter;
if (jdbcSinkConfig.isExactlyOnce()) {
@@ -116,18 +125,30 @@ public class JdbcSink
dialect,
jdbcSinkConfig,
tableSchema,
+ getDatabaseTableSchema().orElse(null),
new ArrayList<>());
} else {
- if (catalogTable != null &&
catalogTable.getTableSchema().getPrimaryKey() != null) {
+ if (catalogTable.getTableSchema().getPrimaryKey() != null) {
String keyName =
tableSchema.getPrimaryKey().getColumnNames().get(0);
int index =
tableSchema.toPhysicalRowDataType().indexOf(keyName);
if (index > -1) {
return new JdbcSinkWriter(
- sinkTablePath, dialect, jdbcSinkConfig,
tableSchema, index);
+ sinkTablePath,
+ dialect,
+ jdbcSinkConfig,
+ tableSchema,
+ getDatabaseTableSchema().orElse(null),
+ index);
}
}
sinkWriter =
- new JdbcSinkWriter(sinkTablePath, dialect, jdbcSinkConfig,
tableSchema, null);
+ new JdbcSinkWriter(
+ sinkTablePath,
+ dialect,
+ jdbcSinkConfig,
+ tableSchema,
+ getDatabaseTableSchema().orElse(null),
+ null);
}
return sinkWriter;
}
@@ -135,6 +156,11 @@ public class JdbcSink
@Override
public SinkWriter<SeaTunnelRow, XidInfo, JdbcSinkState> restoreWriter(
SinkWriter.Context context, List<JdbcSinkState> states) throws
IOException {
+ try {
+
Class.forName(jdbcSinkConfig.getJdbcConnectionConfig().getDriverName());
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
TablePath sinkTablePath = catalogTable.getTablePath();
if (jdbcSinkConfig.isExactlyOnce()) {
return new JdbcExactlyOnceSinkWriter(
@@ -144,11 +170,37 @@ public class JdbcSink
dialect,
jdbcSinkConfig,
tableSchema,
+ getDatabaseTableSchema().orElse(null),
states);
}
return SeaTunnelSink.super.restoreWriter(context, states);
}
+ private Optional<TableSchema> getDatabaseTableSchema() {
+ Optional<Catalog> catalogOptional = getCatalog();
+ FieldIdeEnum fieldIdeEnumEnum = config.get(JdbcOptions.FIELD_IDE);
+ String fieldIde =
+ fieldIdeEnumEnum == null
+ ? FieldIdeEnum.ORIGINAL.getValue()
+ : fieldIdeEnumEnum.getValue();
+ TablePath tablePath =
+ TablePath.of(
+ catalogTable.getTableId().getDatabaseName(),
+ catalogTable.getTableId().getSchemaName(),
+ CatalogUtils.quoteTableIdentifier(
+ catalogTable.getTableId().getTableName(),
fieldIde));
+ if (catalogOptional.isPresent()) {
+ try (Catalog catalog = catalogOptional.get()) {
+ catalog.open();
+ return
Optional.of(catalog.getTable(tablePath).getTableSchema());
+ } catch (TableNotExistException e) {
+ log.warn("table {} not exist when get the database catalog
table", tablePath);
+ return Optional.empty();
+ }
+ }
+ return Optional.empty();
+ }
+
@Override
public Optional<SinkAggregatedCommitter<XidInfo, JdbcAggregatedCommitInfo>>
createAggregatedCommitter() {
@@ -187,18 +239,7 @@ public class JdbcSink
throw new RuntimeException(e);
}
if (catalogTable != null) {
- if (StringUtils.isBlank(jdbcSinkConfig.getDatabase())) {
- return Optional.empty();
- }
- if (StringUtils.isBlank(jdbcSinkConfig.getTable())) {
- return Optional.empty();
- }
- // use query to write data can not support savemode
- if (StringUtils.isNotBlank(jdbcSinkConfig.getSimpleSql())) {
- return Optional.empty();
- }
- Optional<Catalog> catalogOptional =
-
JdbcCatalogUtils.findCatalog(jdbcSinkConfig.getJdbcConnectionConfig(), dialect);
+ Optional<Catalog> catalogOptional = getCatalog();
if (catalogOptional.isPresent()) {
try {
Catalog catalog = catalogOptional.get();
@@ -242,6 +283,20 @@ public class JdbcSink
return Optional.empty();
}
+ private Optional<Catalog> getCatalog() {
+ if (StringUtils.isBlank(jdbcSinkConfig.getDatabase())) {
+ return Optional.empty();
+ }
+ if (StringUtils.isBlank(jdbcSinkConfig.getTable())) {
+ return Optional.empty();
+ }
+ // use query to write data can not support get catalog
+ if (StringUtils.isNotBlank(jdbcSinkConfig.getSimpleSql())) {
+ return Optional.empty();
+ }
+ return
JdbcCatalogUtils.findCatalog(jdbcSinkConfig.getJdbcConnectionConfig(), dialect);
+ }
+
@Override
public Optional<CatalogTable> getWriteCatalogTable() {
return Optional.ofNullable(catalogTable);
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
index 3f43b2088d..d3c9949dc3 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
@@ -49,17 +49,23 @@ public class JdbcSinkWriter extends
AbstractJdbcSinkWriter<ConnectionPoolManager
JdbcDialect dialect,
JdbcSinkConfig jdbcSinkConfig,
TableSchema tableSchema,
+ TableSchema databaseTableSchema,
Integer primaryKeyIndex) {
this.sinkTablePath = sinkTablePath;
this.dialect = dialect;
this.tableSchema = tableSchema;
+ this.databaseTableSchema = databaseTableSchema;
this.jdbcSinkConfig = jdbcSinkConfig;
this.primaryKeyIndex = primaryKeyIndex;
this.connectionProvider =
dialect.getJdbcConnectionProvider(jdbcSinkConfig.getJdbcConnectionConfig());
this.outputFormat =
new JdbcOutputFormatBuilder(
- dialect, connectionProvider, jdbcSinkConfig,
tableSchema)
+ dialect,
+ connectionProvider,
+ jdbcSinkConfig,
+ tableSchema,
+ databaseTableSchema)
.build();
}
@@ -97,7 +103,11 @@ public class JdbcSinkWriter extends
AbstractJdbcSinkWriter<ConnectionPoolManager
queueIndex);
this.outputFormat =
new JdbcOutputFormatBuilder(
- dialect, connectionProvider, jdbcSinkConfig,
tableSchema)
+ dialect,
+ connectionProvider,
+ jdbcSinkConfig,
+ tableSchema,
+ databaseTableSchema)
.build();
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
index b0195837cb..7b21e27364 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
@@ -44,6 +44,7 @@ import org.testcontainers.utility.DockerLoggerFactory;
import org.testcontainers.utility.MountableFile;
import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Statement;
import java.sql.Timestamp;
@@ -53,6 +54,8 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
public class JdbcOracleIT extends AbstractJdbcIT {
@@ -81,6 +84,7 @@ public class JdbcOracleIT extends AbstractJdbcIT {
+ " VARCHAR_10_COL varchar2(10),\n"
+ " CHAR_10_COL char(10),\n"
+ " CLOB_COL clob,\n"
+ + " BLOB_COL blob,\n"
+ " NUMBER_1 number(1),\n"
+ " NUMBER_6 number(6),\n"
+ " NUMBER_10 number(10),\n"
@@ -104,6 +108,7 @@ public class JdbcOracleIT extends AbstractJdbcIT {
+ " VARCHAR_10_COL varchar2(10),\n"
+ " CHAR_10_COL char(10),\n"
+ " CLOB_COL clob,\n"
+ + " BLOB_COL blob,\n"
+ " NUMBER_1 number(1),\n"
+ " NUMBER_6 number(6),\n"
+ " NUMBER_10 number(10),\n"
@@ -125,6 +130,7 @@ public class JdbcOracleIT extends AbstractJdbcIT {
"VARCHAR_10_COL",
"CHAR_10_COL",
"CLOB_COL",
+ "BLOB_COL",
"NUMBER_1",
"NUMBER_6",
"NUMBER_10",
@@ -230,6 +236,11 @@ public class JdbcOracleIT extends AbstractJdbcIT {
String.format("f%s", i),
String.format("f%s", i),
String.format("f%s", i),
+ // set value bytes more than 4000bytes
+ IntStream.range(0, 4000)
+ .mapToObj(d -> d + "")
+ .collect(Collectors.joining(","))
+ .getBytes(StandardCharsets.UTF_8),
1,
i * 10,
i * 1000,
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink.conf
index 4df8c7b993..d2e06158b8 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink.conf
@@ -30,7 +30,7 @@ source {
url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER"
user = testUser
password = testPassword
- query = "SELECT
VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL
FROM E2E_TABLE_SOURCE"
+ query = "SELECT
VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,BLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL
FROM E2E_TABLE_SOURCE"
properties {
database.oracle.jdbc.timezoneAsRegion = "false"
}
@@ -46,7 +46,7 @@ sink {
url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER"
user = testUser
password = testPassword
- query = "INSERT INTO E2E_TABLE_SINK
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL)
VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
+ query = "INSERT INTO E2E_TABLE_SINK
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,BLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL)
VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
properties {
database.oracle.jdbc.timezoneAsRegion = "false"
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select1.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select1.conf
index 1988b48872..33cf33638f 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select1.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select1.conf
@@ -31,7 +31,7 @@ source {
user = testUser
password = testPassword
use_select_count = true
- query = "SELECT
VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL
FROM E2E_TABLE_SOURCE"
+ query = "SELECT
VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,BLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL
FROM E2E_TABLE_SOURCE"
properties {
database.oracle.jdbc.timezoneAsRegion = "false"
}
@@ -47,7 +47,7 @@ sink {
url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER"
user = testUser
password = testPassword
- query = "INSERT INTO E2E_TABLE_SINK
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL)
VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
+ query = "INSERT INTO E2E_TABLE_SINK
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,BLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL)
VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
properties {
database.oracle.jdbc.timezoneAsRegion = "false"
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select2.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select2.conf
index 4d01da5c72..e9e997ea90 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select2.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select2.conf
@@ -47,7 +47,7 @@ sink {
url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER"
user = testUser
password = testPassword
- query = "INSERT INTO E2E_TABLE_SINK
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL)
VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
+ query = "INSERT INTO E2E_TABLE_SINK
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,BLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL)
VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
properties {
database.oracle.jdbc.timezoneAsRegion = "false"
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select3.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select3.conf
index 94a850fdd0..2be0b51224 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select3.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select3.conf
@@ -48,7 +48,7 @@ sink {
url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER"
user = testUser
password = testPassword
- query = "INSERT INTO E2E_TABLE_SINK
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL)
VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
+ query = "INSERT INTO E2E_TABLE_SINK
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,BLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL)
VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
properties {
database.oracle.jdbc.timezoneAsRegion = "false"
}