This is an automated email from the ASF dual-hosted git repository.
dailai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e1f0a238f7 [Improve][Jdbc] Refactor ddl change (#8134)
e1f0a238f7 is described below
commit e1f0a238f7bc325e1ab6c4db009a544ac34a0c5e
Author: hailin0 <[email protected]>
AuthorDate: Tue Nov 26 19:02:52 2024 +0800
[Improve][Jdbc] Refactor ddl change (#8134)
---
.../jdbc/internal/dialect/JdbcDialect.java | 453 ++++++++++-----------
.../jdbc/internal/dialect/mysql/MysqlDialect.java | 15 +-
.../oceanbase/OceanBaseMySqlTypeConverter.java | 2 +
.../dialect/oceanbase/OceanBaseMysqlDialect.java | 15 +-
.../internal/dialect/oracle/OracleDialect.java | 162 +++++++-
.../jdbc/sink/AbstractJdbcSinkWriter.java | 81 +---
.../seatunnel/jdbc/sink/JdbcSinkWriter.java | 23 --
7 files changed, 382 insertions(+), 369 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index 953bb30ee7..56b2b285c8 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -17,10 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
-import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
-import org.apache.seatunnel.api.table.converter.ConverterLoader;
import org.apache.seatunnel.api.table.converter.TypeConverter;
import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableChangeColumnEvent;
@@ -29,7 +27,7 @@ import
org.apache.seatunnel.api.table.schema.event.AlterTableColumnsEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableDropColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableModifyColumnEvent;
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
-import org.apache.seatunnel.common.utils.SeaTunnelException;
+import org.apache.seatunnel.api.table.type.SqlType;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
@@ -82,6 +80,15 @@ public interface JdbcDialect extends Serializable {
*/
JdbcRowConverter getRowConverter();
+ /**
+ * Get converter that convert type object to seatunnel internal type.
+ *
+ * @return a type converter for the database
+ */
+ default TypeConverter<BasicTypeDefine> getTypeConverter() {
+ throw new UnsupportedOperationException("TypeConverter is not
supported");
+ }
+
/**
* get jdbc meta-information type to seatunnel data type mapper.
*
@@ -441,16 +448,16 @@ public interface JdbcDialect extends Serializable {
/**
* Refresh physical table schema by schema change event
*
- * @param event schema change event
* @param connection jdbc connection
* @param tablePath sink table path
+ * @param event schema change event
*/
default void applySchemaChange(
- SchemaChangeEvent event, Connection connection, TablePath
tablePath)
+ Connection connection, TablePath tablePath, SchemaChangeEvent
event)
throws SQLException {
if (event instanceof AlterTableColumnsEvent) {
for (AlterTableColumnEvent columnEvent : ((AlterTableColumnsEvent)
event).getEvents()) {
- applySchemaChange(columnEvent, connection, tablePath);
+ applySchemaChange(connection, tablePath, columnEvent);
}
} else {
if (event instanceof AlterTableChangeColumnEvent) {
@@ -497,8 +504,7 @@ public interface JdbcDialect extends Serializable {
}
applySchemaChange(connection, tablePath, dropColumnEvent);
} else {
- throw new SeaTunnelException(
- "Unsupported schemaChangeEvent : " +
event.getEventType());
+ throw new UnsupportedOperationException("Unsupported
schemaChangeEvent: " + event);
}
}
}
@@ -527,20 +533,60 @@ public interface JdbcDialect extends Serializable {
default void applySchemaChange(
Connection connection, TablePath tablePath,
AlterTableAddColumnEvent event)
throws SQLException {
- String tableIdentifierWithQuoted = tableIdentifier(tablePath);
- Column addColumn = event.getColumn();
- String afterColumn = event.getAfterColumn();
- String addColumnSQL =
- buildAlterTableSql(
- event.getSourceDialectName(),
- addColumn.getSourceType(),
- AlterType.ADD.name(),
- addColumn,
- tableIdentifierWithQuoted,
- StringUtils.EMPTY,
- afterColumn);
+ boolean sameCatalog =
event.getSourceDialectName().equals(dialectName());
+ BasicTypeDefine typeDefine =
getTypeConverter().reconvert(event.getColumn());
+ String columnType =
+ sameCatalog ? event.getColumn().getSourceType() :
typeDefine.getColumnType();
+ StringBuilder sqlBuilder =
+ new StringBuilder()
+ .append("ALTER TABLE")
+ .append(" ")
+ .append(tableIdentifier(tablePath))
+ .append(" ")
+ .append("ADD COLUMN")
+ .append(" ")
+ .append(quoteIdentifier(event.getColumn().getName()))
+ .append(" ")
+ .append(columnType);
+
+ // Only decorate with default value when source dialect is same as
sink dialect
+ // Todo Support for cross-database default values for ddl statements
+ if (event.getColumn().getDefaultValue() == null) {
+ sqlBuilder.append(" ").append(event.getColumn().isNullable() ?
"NULL" : "NOT NULL");
+ } else {
+ if (event.getColumn().isNullable()) {
+ sqlBuilder.append(" NULL");
+ } else if (sameCatalog) {
+ sqlBuilder.append(" ").append(event.getColumn().isNullable() ?
"NULL" : "NOT NULL");
+ } else if
(SqlType.TIMESTAMP.equals(event.getColumn().getDataType().getSqlType())) {
+ log.warn(
+ "Default value is not supported for column {} in table
{}. Skipping add column operation. event: {}",
+ event.getColumn().getName(),
+ tablePath.getFullName(),
+ event);
+ } else {
+ sqlBuilder.append(" NOT NULL");
+ }
+ if (sameCatalog) {
+ sqlBuilder.append("
").append(sqlClauseWithDefaultValue(typeDefine));
+ }
+ }
+
+ if (event.getColumn().getComment() != null) {
+ sqlBuilder
+ .append(" ")
+ .append("COMMENT ")
+ .append("'")
+ .append(event.getColumn().getComment())
+ .append("'");
+ }
+ if (event.getAfterColumn() != null) {
+ sqlBuilder.append(" ").append("AFTER
").append(quoteIdentifier(event.getAfterColumn()));
+ }
+
+ String addColumnSQL = sqlBuilder.toString();
try (Statement statement = connection.createStatement()) {
- log.info("Executing add column SQL: " + addColumnSQL);
+ log.info("Executing add column SQL: {}", addColumnSQL);
statement.execute(addColumnSQL);
}
}
@@ -548,24 +594,79 @@ public interface JdbcDialect extends Serializable {
default void applySchemaChange(
Connection connection, TablePath tablePath,
AlterTableChangeColumnEvent event)
throws SQLException {
- String tableIdentifierWithQuoted = tableIdentifier(tablePath);
- Column changeColumn = event.getColumn();
- String oldColumnName = event.getOldColumn();
- String afterColumn = event.getAfterColumn();
- String changeColumnSQL =
- buildAlterTableSql(
- event.getSourceDialectName(),
- changeColumn.getSourceType(),
- changeColumn.getDataType() == null
- ? AlterType.RENAME.name()
- : AlterType.CHANGE.name(),
- changeColumn,
- tableIdentifierWithQuoted,
- oldColumnName,
- afterColumn);
+ if (event.getColumn().getDataType() == null) {
+ StringBuilder sqlBuilder =
+ new StringBuilder()
+ .append("ALTER TABLE")
+ .append(" ")
+ .append(tableIdentifier(tablePath))
+ .append(" ")
+ .append("RENAME COLUMN")
+ .append(" ")
+ .append(quoteIdentifier(event.getOldColumn()))
+ .append(" TO ")
+
.append(quoteIdentifier(event.getColumn().getName()));
+ try (Statement statement = connection.createStatement()) {
+ log.info("Executing rename column SQL: {}", sqlBuilder);
+ statement.execute(sqlBuilder.toString());
+ }
+ return;
+ }
+
+ boolean sameCatalog =
event.getSourceDialectName().equals(dialectName());
+ BasicTypeDefine typeDefine =
getTypeConverter().reconvert(event.getColumn());
+ String columnType =
+ sameCatalog ? event.getColumn().getSourceType() :
typeDefine.getColumnType();
+ StringBuilder sqlBuilder =
+ new StringBuilder()
+ .append("ALTER TABLE")
+ .append(" ")
+ .append(tableIdentifier(tablePath))
+ .append(" ")
+ .append("CHANGE COLUMN")
+ .append(" ")
+ .append(quoteIdentifier(event.getOldColumn()))
+ .append(" ")
+ .append(quoteIdentifier(event.getColumn().getName()))
+ .append(" ")
+ .append(columnType);
+ // Only decorate with default value when source dialect is same as
sink dialect
+ // Todo Support for cross-database default values for ddl statements
+ if (event.getColumn().getDefaultValue() == null) {
+ sqlBuilder.append(" ").append(event.getColumn().isNullable() ?
"NULL" : "NOT NULL");
+ } else {
+ if (event.getColumn().isNullable()) {
+ sqlBuilder.append(" NULL");
+ } else if (sameCatalog) {
+ sqlBuilder.append(" ").append(event.getColumn().isNullable() ?
"NULL" : "NOT NULL");
+ } else if
(SqlType.TIMESTAMP.equals(event.getColumn().getDataType().getSqlType())) {
+ log.warn(
+ "Default value is not supported for column {} in table
{}. Skipping add column operation. event: {}",
+ event.getColumn().getName(),
+ tablePath.getFullName(),
+ event);
+ } else {
+ sqlBuilder.append(" NOT NULL");
+ }
+ if (sameCatalog) {
+ sqlBuilder.append("
").append(sqlClauseWithDefaultValue(typeDefine));
+ }
+ }
+ if (event.getColumn().getComment() != null) {
+ sqlBuilder
+ .append(" ")
+ .append("COMMENT ")
+ .append("'")
+ .append(event.getColumn().getComment())
+ .append("'");
+ }
+ if (event.getAfterColumn() != null) {
+ sqlBuilder.append(" ").append("AFTER
").append(quoteIdentifier(event.getAfterColumn()));
+ }
+ String changeColumnSQL = sqlBuilder.toString();
try (Statement statement = connection.createStatement()) {
- log.info("Executing change column SQL: " + changeColumnSQL);
+ log.info("Executing change column SQL: {}", changeColumnSQL);
statement.execute(changeColumnSQL);
}
}
@@ -573,21 +674,60 @@ public interface JdbcDialect extends Serializable {
default void applySchemaChange(
Connection connection, TablePath tablePath,
AlterTableModifyColumnEvent event)
throws SQLException {
- String tableIdentifierWithQuoted = tableIdentifier(tablePath);
- Column modifyColumn = event.getColumn();
- String afterColumn = event.getAfterColumn();
- String modifyColumnSQL =
- buildAlterTableSql(
- event.getSourceDialectName(),
- modifyColumn.getSourceType(),
- AlterType.MODIFY.name(),
- modifyColumn,
- tableIdentifierWithQuoted,
- StringUtils.EMPTY,
- afterColumn);
+ boolean sameCatalog =
event.getSourceDialectName().equals(dialectName());
+ BasicTypeDefine typeDefine =
getTypeConverter().reconvert(event.getColumn());
+ String columnType =
+ sameCatalog ? event.getColumn().getSourceType() :
typeDefine.getColumnType();
+ StringBuilder sqlBuilder =
+ new StringBuilder()
+ .append("ALTER TABLE")
+ .append(" ")
+ .append(tableIdentifier(tablePath))
+ .append(" ")
+ .append("MODIFY COLUMN")
+ .append(" ")
+ .append(quoteIdentifier(event.getColumn().getName()))
+ .append(" ")
+ .append(columnType);
+
+ // Only decorate with default value when source dialect is same as
sink dialect
+ // Todo Support for cross-database default values for ddl statements
+ if (event.getColumn().getDefaultValue() == null) {
+ sqlBuilder.append(" ").append(event.getColumn().isNullable() ?
"NULL" : "NOT NULL");
+ } else {
+ if (event.getColumn().isNullable()) {
+ sqlBuilder.append(" NULL");
+ } else if (sameCatalog) {
+ sqlBuilder.append(" ").append(event.getColumn().isNullable() ?
"NULL" : "NOT NULL");
+ } else if
(SqlType.TIMESTAMP.equals(event.getColumn().getDataType().getSqlType())) {
+ log.warn(
+ "Default value is not supported for column {} in table
{}. Skipping add column operation. event: {}",
+ event.getColumn().getName(),
+ tablePath.getFullName(),
+ event);
+ } else {
+ sqlBuilder.append(" NOT NULL");
+ }
+ if (sameCatalog) {
+ sqlBuilder.append("
").append(sqlClauseWithDefaultValue(typeDefine));
+ }
+ }
+ if (event.getColumn().getComment() != null) {
+ sqlBuilder
+ .append(" ")
+ .append("COMMENT ")
+ .append("'")
+ .append(event.getColumn().getComment())
+ .append("'");
+ }
+ if (event.getAfterColumn() != null) {
+ sqlBuilder.append(" ").append("AFTER
").append(quoteIdentifier(event.getAfterColumn()));
+ }
+
+ String modifyColumnSQL = sqlBuilder.toString();
try (Statement statement = connection.createStatement()) {
- log.info("Executing modify column SQL: " + modifyColumnSQL);
+ log.info("Executing modify column SQL: {}", modifyColumnSQL);
statement.execute(modifyColumnSQL);
}
}
@@ -595,213 +735,40 @@ public interface JdbcDialect extends Serializable {
default void applySchemaChange(
Connection connection, TablePath tablePath,
AlterTableDropColumnEvent event)
throws SQLException {
- String tableIdentifierWithQuoted = tableIdentifier(tablePath);
- String dropColumn = event.getColumn();
String dropColumnSQL =
- buildAlterTableSql(
- event.getSourceDialectName(),
- null,
- AlterType.DROP.name(),
- null,
- tableIdentifierWithQuoted,
- dropColumn,
- null);
+ String.format(
+ "ALTER TABLE %s DROP COLUMN %s",
+ tableIdentifier(tablePath),
quoteIdentifier(event.getColumn()));
try (Statement statement = connection.createStatement()) {
- log.info("Executing drop column SQL: " + dropColumnSQL);
+ log.info("Executing drop column SQL: {}", dropColumnSQL);
statement.execute(dropColumnSQL);
}
}
/**
- * build alter table sql
- *
- * @param sourceDialectName source dialect name
- * @param sourceColumnType source column type
- * @param alterOperation alter operation of ddl
- * @param newColumn new column after ddl
- * @param tableName table name of sink table
- * @param oldColumnName old column name before ddl
- * @param afterColumn column before the new column
- * @return alter table sql for sink table after schema change
- */
- default String buildAlterTableSql(
- String sourceDialectName,
- String sourceColumnType,
- String alterOperation,
- Column newColumn,
- String tableName,
- String oldColumnName,
- String afterColumn) {
- if (StringUtils.equals(alterOperation, AlterType.DROP.name())) {
- return String.format(
- "ALTER TABLE %s drop column %s", tableName,
quoteIdentifier(oldColumnName));
- }
-
- if (alterOperation.equalsIgnoreCase(AlterType.RENAME.name())) {
- return String.format(
- "ALTER TABLE %s RENAME COLUMN %s TO %s",
- tableName, oldColumnName, newColumn.getName());
- }
-
- TypeConverter<?> typeConverter =
ConverterLoader.loadTypeConverter(dialectName());
- BasicTypeDefine typeBasicTypeDefine = (BasicTypeDefine)
typeConverter.reconvert(newColumn);
-
- String basicSql = buildAlterTableBasicSql(alterOperation, tableName);
- basicSql =
- decorateWithColumnNameAndType(
- sourceDialectName,
- sourceColumnType,
- basicSql,
- alterOperation,
- newColumn,
- oldColumnName,
- typeBasicTypeDefine.getColumnType());
- // Only decorate with default value when source dialect is same as
sink dialect
- // Todo Support for cross-database default values for ddl statements
- if (sourceDialectName.equals(dialectName())) {
- basicSql = decorateWithDefaultValue(basicSql, typeBasicTypeDefine);
- }
- basicSql = decorateWithNullable(basicSql, typeBasicTypeDefine,
sourceDialectName);
- basicSql = decorateWithComment(tableName, basicSql,
typeBasicTypeDefine);
- basicSql = decorateWithAfterColumn(basicSql, afterColumn);
- return dialectName().equals(DatabaseIdentifier.ORACLE) ? basicSql :
basicSql + ";";
- }
-
- /**
- * build the body of alter table sql
- *
- * @param alterOperation alter operation of ddl
- * @param tableName table name of sink table
- * @return basic sql of alter table for sink table
- */
- default String buildAlterTableBasicSql(String alterOperation, String
tableName) {
- StringBuilder sql =
- new StringBuilder(
- "ALTER TABLE "
- + tableName
- + StringUtils.SPACE
- + alterOperation
- + StringUtils.SPACE);
- return sql.toString();
- }
-
- /**
- * decorate the sql with column name and type
- *
- * @param sourceDialectName source dialect name
- * @param sourceColumnType source column type
- * @param basicSql basic sql of alter table for sink table
- * @param alterOperation alter operation of ddl
- * @param newColumn new column after ddl
- * @param oldColumnName old column name before ddl
- * @param columnType column type of new column
- * @return basic sql with column name and type of alter table for sink
table
- */
- default String decorateWithColumnNameAndType(
- String sourceDialectName,
- String sourceColumnType,
- String basicSql,
- String alterOperation,
- Column newColumn,
- String oldColumnName,
- String columnType) {
- StringBuilder sql = new StringBuilder(basicSql);
- String oldColumnNameWithQuoted = quoteIdentifier(oldColumnName);
- String newColumnNameWithQuoted = quoteIdentifier(newColumn.getName());
- if (alterOperation.equals(AlterType.CHANGE.name())) {
- sql.append(oldColumnNameWithQuoted)
- .append(StringUtils.SPACE)
- .append(newColumnNameWithQuoted)
- .append(StringUtils.SPACE);
- } else {
- sql.append(newColumnNameWithQuoted).append(StringUtils.SPACE);
- }
- if (sourceDialectName.equals(dialectName())) {
- sql.append(sourceColumnType);
- } else {
- sql.append(columnType);
- }
- sql.append(StringUtils.SPACE);
- return sql.toString();
- }
-
- /**
- * decorate with nullable
- *
- * @param basicSql alter table sql for sink table
- * @param typeBasicTypeDefine type basic type define of new column
- * @param sourceDialectName source dialect name
- * @return alter table sql with nullable for sink table
- */
- default String decorateWithNullable(
- String basicSql, BasicTypeDefine typeBasicTypeDefine, String
sourceDialectName) {
- StringBuilder sql = new StringBuilder(basicSql);
- if (typeBasicTypeDefine.isNullable()
- && !dialectName().equalsIgnoreCase(DatabaseIdentifier.ORACLE))
{
- sql.append("NULL ");
- } else {
- // Todo: Support cross-dabaase default values for ddl statements
which can remove this
- if (!(!dialectName().equalsIgnoreCase(sourceDialectName)
- && dialectName().equalsIgnoreCase(DatabaseIdentifier.MYSQL)
- &&
typeBasicTypeDefine.getDataType().equalsIgnoreCase("datetime"))) {
- sql.append("NOT NULL ");
- }
- }
- return sql.toString();
- }
-
- /**
- * decorate with default value
+ * Get the SQL clause for define column default value
*
- * @param basicSql alter table sql for sink table
- * @param typeBasicTypeDefine type basic type define of new column
- * @return alter table sql with default value for sink table
+ * @param columnDefine column define
+ * @return SQL clause for define default value
*/
- default String decorateWithDefaultValue(String basicSql, BasicTypeDefine
typeBasicTypeDefine) {
- Object defaultValue = typeBasicTypeDefine.getDefaultValue();
+ default String sqlClauseWithDefaultValue(BasicTypeDefine columnDefine) {
+ Object defaultValue = columnDefine.getDefaultValue();
if (Objects.nonNull(defaultValue)
- &&
needsQuotesWithDefaultValue(typeBasicTypeDefine.getColumnType())
+ && needsQuotesWithDefaultValue(columnDefine.getColumnType())
&& !isSpecialDefaultValue(defaultValue)) {
defaultValue = quotesDefaultValue(defaultValue);
}
- StringBuilder sql = new StringBuilder(basicSql);
- if (Objects.nonNull(defaultValue)) {
- sql.append("DEFAULT
").append(defaultValue).append(StringUtils.SPACE);
- }
- return sql.toString();
+ return "DEFAULT " + defaultValue;
}
/**
- * decorate with comment
+ * Whether support default value
*
- * @param tableName table name with quoted
- * @param basicSql alter table sql for sink table
- * @param typeBasicTypeDefine type basic type define of new column
- * @return alter table sql with comment for sink table
+ * @param columnDefine column define
+ * @return whether support set default value
*/
- default String decorateWithComment(
- String tableName, String basicSql, BasicTypeDefine
typeBasicTypeDefine) {
- String comment = typeBasicTypeDefine.getComment();
- StringBuilder sql = new StringBuilder(basicSql);
- if (StringUtils.isNotBlank(comment)) {
- sql.append("COMMENT '").append(comment).append("'");
- }
- return sql.toString();
- }
-
- /**
- * decorate with after
- *
- * @param basicSql alter table sql for sink table
- * @param afterColumn column before the new column
- * @return alter table sql with after for sink table
- */
- default String decorateWithAfterColumn(String basicSql, String
afterColumn) {
- StringBuilder sql = new StringBuilder(basicSql);
- if (StringUtils.isNotBlank(afterColumn)) {
- sql.append("AFTER ").append(afterColumn).append(StringUtils.SPACE);
- }
- return sql.toString();
+ default boolean supportDefaultValue(BasicTypeDefine columnDefine) {
+ return true;
}
/**
@@ -833,12 +800,4 @@ public interface JdbcDialect extends Serializable {
default String quotesDefaultValue(Object defaultValue) {
return "'" + defaultValue + "'";
}
-
- enum AlterType {
- ADD,
- DROP,
- MODIFY,
- CHANGE,
- RENAME
- }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
index 163c030445..cc3f1300b5 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.api.table.converter.TypeConverter;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
@@ -70,6 +71,12 @@ public class MysqlDialect implements JdbcDialect {
return new MysqlJdbcRowConverter();
}
+ @Override
+ public TypeConverter<BasicTypeDefine> getTypeConverter() {
+ TypeConverter typeConverter = MySqlTypeConverter.DEFAULT_INSTANCE;
+ return typeConverter;
+ }
+
@Override
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
return new MySqlTypeMapper();
@@ -228,13 +235,9 @@ public class MysqlDialect implements JdbcDialect {
}
@Override
- public String decorateWithComment(
- String tableName, String basicSql, BasicTypeDefine
typeBasicTypeDefine) {
+ public boolean supportDefaultValue(BasicTypeDefine typeBasicTypeDefine) {
MysqlType nativeType = (MysqlType) typeBasicTypeDefine.getNativeType();
- if (NOT_SUPPORTED_DEFAULT_VALUES.contains(nativeType)) {
- return basicSql;
- }
- return JdbcDialect.super.decorateWithComment(tableName, basicSql,
typeBasicTypeDefine);
+ return !(NOT_SUPPORTED_DEFAULT_VALUES.contains(nativeType));
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
index 78c8415a88..72c771f9ed 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
@@ -104,6 +104,8 @@ public class OceanBaseMySqlTypeConverter
private static final String VECTOR_TYPE_NAME = "";
private static final String VECTOR_NAME = "VECTOR";
+ public static final OceanBaseMySqlTypeConverter INSTANCE = new
OceanBaseMySqlTypeConverter();
+
@Override
public String identifier() {
return DatabaseIdentifier.OCENABASE;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlDialect.java
index 1824d6c76a..315eccaa52 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlDialect.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbas
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.api.table.converter.TypeConverter;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
@@ -73,6 +74,12 @@ public class OceanBaseMysqlDialect implements JdbcDialect {
return new OceanBaseMysqlJdbcRowConverter();
}
+ @Override
+ public TypeConverter<BasicTypeDefine> getTypeConverter() {
+ TypeConverter typeConverter = OceanBaseMySqlTypeConverter.INSTANCE;
+ return typeConverter;
+ }
+
@Override
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
return new OceanBaseMySqlTypeMapper();
@@ -231,13 +238,9 @@ public class OceanBaseMysqlDialect implements JdbcDialect {
}
@Override
- public String decorateWithComment(
- String tableName, String basicSql, BasicTypeDefine
typeBasicTypeDefine) {
+ public boolean supportDefaultValue(BasicTypeDefine typeBasicTypeDefine) {
OceanBaseMysqlType nativeType = (OceanBaseMysqlType)
typeBasicTypeDefine.getNativeType();
- if (NOT_SUPPORTED_DEFAULT_VALUES.contains(nativeType)) {
- return basicSql;
- }
- return JdbcDialect.super.decorateWithComment(tableName, basicSql,
typeBasicTypeDefine);
+ return !(NOT_SUPPORTED_DEFAULT_VALUES.contains(nativeType));
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
index b314302ba4..b80620a4ce 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
@@ -17,8 +17,14 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle;
+import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.api.table.converter.TypeConverter;
+import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableChangeColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableModifyColumnEvent;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
@@ -64,6 +70,11 @@ public class OracleDialect implements JdbcDialect {
return new OracleJdbcRowConverter();
}
+ @Override
+ public TypeConverter<BasicTypeDefine> getTypeConverter() {
+ return OracleTypeConverter.INSTANCE;
+ }
+
@Override
public String hashModForField(String fieldName, int mod) {
return "MOD(ORA_HASH(" + quoteIdentifier(fieldName) + ")," + mod + ")";
@@ -329,17 +340,146 @@ public class OracleDialect implements JdbcDialect {
}
@Override
- public String decorateWithComment(
- String tableName, String basicSql, BasicTypeDefine
typeBasicTypeDefine) {
- String comment = typeBasicTypeDefine.getComment();
- StringBuilder sql = new StringBuilder(basicSql);
- if (StringUtils.isNotBlank(comment)) {
- String commentSql =
- String.format(
- "COMMENT ON COLUMN %s.%s IS '%s'",
- tableName,
quoteIdentifier(typeBasicTypeDefine.getName()), comment);
- sql.append(";\n").append(commentSql);
+ public void applySchemaChange(
+ Connection connection, TablePath tablePath,
AlterTableAddColumnEvent event)
+ throws SQLException {
+ List<String> ddlSQL = new ArrayList<>();
+ ddlSQL.add(buildUpdateColumnSQL(connection, tablePath, event));
+
+ if (event.getColumn().getComment() != null) {
+ ddlSQL.add(buildUpdateColumnCommentSQL(tablePath,
event.getColumn()));
+ }
+
+ try (Statement statement = connection.createStatement()) {
+ for (String sql : ddlSQL) {
+ log.info("Executing add column SQL: {}", sql);
+ statement.execute(sql);
+ }
+ }
+ }
+
+ @Override
+ public void applySchemaChange(
+ Connection connection, TablePath tablePath,
AlterTableChangeColumnEvent event)
+ throws SQLException {
+ List<String> ddlSQL = new ArrayList<>();
+ if (event.getOldColumn() != null
+ &&
!(event.getColumn().getName().equals(event.getOldColumn()))) {
+ StringBuilder sqlBuilder =
+ new StringBuilder()
+ .append("ALTER TABLE ")
+ .append(tableIdentifier(tablePath))
+ .append(" RENAME COLUMN ")
+ .append(quoteIdentifier(event.getOldColumn()))
+ .append(" TO ")
+
.append(quoteIdentifier(event.getColumn().getName()));
+ ddlSQL.add(sqlBuilder.toString());
+ }
+
+ try (Statement statement = connection.createStatement()) {
+ for (String sql : ddlSQL) {
+ log.info("Executing change column SQL: {}", sql);
+ statement.execute(sql);
+ }
+ }
+
+ if (event.getColumn().getDataType() != null) {
+ applySchemaChange(
+ connection,
+ tablePath,
+
AlterTableModifyColumnEvent.modify(event.tableIdentifier(), event.getColumn()));
+ }
+ }
+
+ @Override
+ public void applySchemaChange(
+ Connection connection, TablePath tablePath,
AlterTableModifyColumnEvent event)
+ throws SQLException {
+ List<String> ddlSQL = new ArrayList<>();
+ ddlSQL.add(buildUpdateColumnSQL(connection, tablePath, event));
+
+ if (event.getColumn().getComment() != null) {
+ ddlSQL.add(buildUpdateColumnCommentSQL(tablePath,
event.getColumn()));
+ }
+
+ try (Statement statement = connection.createStatement()) {
+ for (String sql : ddlSQL) {
+ log.info("Executing modify column SQL: {}", sql);
+ statement.execute(sql);
+ }
+ }
+ }
+
+ private String buildUpdateColumnSQL(
+ Connection connection, TablePath tablePath, AlterTableColumnEvent
event)
+ throws SQLException {
+ String actionType;
+ Column column;
+ if (event instanceof AlterTableModifyColumnEvent) {
+ actionType = "MODIFY";
+ column = ((AlterTableModifyColumnEvent) event).getColumn();
+ } else if (event instanceof AlterTableAddColumnEvent) {
+ actionType = "ADD";
+ column = ((AlterTableAddColumnEvent) event).getColumn();
+ } else {
+ throw new IllegalArgumentException("Unsupported
AlterTableColumnEvent: " + event);
+ }
+
+ boolean sameCatalog =
event.getSourceDialectName().equals(dialectName());
+ BasicTypeDefine typeDefine = getTypeConverter().reconvert(column);
+ String columnType = sameCatalog ? column.getSourceType() :
typeDefine.getColumnType();
+ StringBuilder sqlBuilder =
+ new StringBuilder()
+ .append("ALTER TABLE ")
+ .append(tableIdentifier(tablePath))
+ .append(" ")
+ .append(actionType)
+ .append(" ")
+ .append(quoteIdentifier(column.getName()))
+ .append(" ")
+ .append(columnType);
+ // Only decorate with default value when source dialect is same as
sink dialect
+ // Todo Support for cross-database default values for ddl statements
+ if (column.getDefaultValue() != null && sameCatalog) {
+ sqlBuilder.append("
").append(sqlClauseWithDefaultValue(typeDefine));
+ }
+ if (event instanceof AlterTableModifyColumnEvent) {
+ boolean targetColumnNullable =
+ columnIsNullable(connection, tablePath, column.getName());
+ if (column.isNullable() != targetColumnNullable) {
+ sqlBuilder.append(" ").append(column.isNullable() ? "NULL" :
"NOT NULL");
+ }
+ } else {
+ sqlBuilder.append(" ").append(column.isNullable() ? "NULL" : "NOT
NULL");
+ }
+ return sqlBuilder.toString();
+ }
+
+ private String buildUpdateColumnCommentSQL(TablePath tablePath, Column
column) {
+ return String.format(
+ "COMMENT ON COLUMN %s.%s IS '%s'",
+ tableIdentifier(tablePath), quoteIdentifier(column.getName()),
column.getComment());
+ }
+
+ private boolean columnIsNullable(Connection connection, TablePath
tablePath, String column)
+ throws SQLException {
+ String selectColumnSQL =
+ "SELECT"
+ + " NULLABLE FROM"
+ + " ALL_TAB_COLUMNS c"
+ + " WHERE c.owner = '"
+ + tablePath.getSchemaName()
+ + "'"
+ + " AND c.table_name = '"
+ + tablePath.getTableName()
+ + "'"
+ + " AND c.column_name = '"
+ + column
+ + "'";
+ try (Statement statement = connection.createStatement()) {
+ ResultSet rs = statement.executeQuery(selectColumnSQL);
+ rs.next();
+ return rs.getString("NULLABLE").equals("Y");
}
- return sql.toString();
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java
index 4d7a9ef2fd..67de53dfc5 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java
@@ -17,20 +17,15 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
-import org.apache.seatunnel.api.event.EventType;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter;
-import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
-import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
-import org.apache.seatunnel.api.table.schema.event.AlterTableChangeColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableColumnsEvent;
-import org.apache.seatunnel.api.table.schema.event.AlterTableDropColumnEvent;
-import org.apache.seatunnel.api.table.schema.event.AlterTableModifyColumnEvent;
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
+import
org.apache.seatunnel.api.table.schema.handler.TableSchemaChangeEventDispatcher;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
@@ -50,9 +45,7 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.sql.Connection;
-import java.util.ArrayList;
import java.util.List;
-import java.util.Optional;
@Slf4j
public abstract class AbstractJdbcSinkWriter<ResourceT>
@@ -67,6 +60,8 @@ public abstract class AbstractJdbcSinkWriter<ResourceT>
protected JdbcConnectionProvider connectionProvider;
protected JdbcSinkConfig jdbcSinkConfig;
protected JdbcOutputFormat<SeaTunnelRow,
JdbcBatchStatementExecutor<SeaTunnelRow>> outputFormat;
+ protected TableSchemaChangeEventDispatcher tableSchemaChanger =
+ new TableSchemaChangeEventDispatcher();
@Override
public void applySchemaChange(SchemaChangeEvent event) throws IOException {
@@ -88,57 +83,7 @@ public abstract class AbstractJdbcSinkWriter<ResourceT>
}
protected void processSchemaChangeEvent(AlterTableColumnEvent event)
throws IOException {
- List<Column> columns = new ArrayList<>(tableSchema.getColumns());
- switch (event.getEventType()) {
- case SCHEMA_CHANGE_ADD_COLUMN:
- AlterTableAddColumnEvent alterTableAddColumnEvent =
- (AlterTableAddColumnEvent) event;
- Column addColumn = alterTableAddColumnEvent.getColumn();
- String afterColumn = alterTableAddColumnEvent.getAfterColumn();
- if (StringUtils.isNotBlank(afterColumn)) {
- Optional<Column> columnOptional =
- columns.stream()
- .filter(column ->
afterColumn.equals(column.getName()))
- .findFirst();
- if (!columnOptional.isPresent()) {
- columns.add(addColumn);
- break;
- }
- columnOptional.ifPresent(
- column -> {
- int index = columns.indexOf(column);
- columns.add(index + 1, addColumn);
- });
- } else {
- columns.add(addColumn);
- }
- break;
- case SCHEMA_CHANGE_DROP_COLUMN:
- String dropColumn = ((AlterTableDropColumnEvent)
event).getColumn();
- columns.removeIf(column ->
column.getName().equalsIgnoreCase(dropColumn));
- break;
- case SCHEMA_CHANGE_MODIFY_COLUMN:
- Column modifyColumn = ((AlterTableModifyColumnEvent)
event).getColumn();
- replaceColumnByIndex(
- event.getEventType(), columns, modifyColumn.getName(),
modifyColumn);
- break;
- case SCHEMA_CHANGE_CHANGE_COLUMN:
- AlterTableChangeColumnEvent alterTableChangeColumnEvent =
- (AlterTableChangeColumnEvent) event;
- Column changeColumn = alterTableChangeColumnEvent.getColumn();
- String oldColumnName =
alterTableChangeColumnEvent.getOldColumn();
- replaceColumnByIndex(event.getEventType(), columns,
oldColumnName, changeColumn);
- break;
- default:
- throw new SeaTunnelException(
- "Unsupported schemaChangeEvent for event type: " +
event.getEventType());
- }
- this.tableSchema =
- TableSchema.builder()
- .columns(columns)
- .primaryKey(tableSchema.getPrimaryKey())
- .constraintKey(tableSchema.getConstraintKeys())
- .build();
+ this.tableSchema = tableSchemaChanger.reset(tableSchema).apply(event);
reOpenOutputFormat(event);
}
@@ -148,7 +93,7 @@ public abstract class AbstractJdbcSinkWriter<ResourceT>
dialect.getJdbcConnectionProvider(jdbcSinkConfig.getJdbcConnectionConfig());
try (Connection connection =
refreshTableSchemaConnectionProvider.getOrEstablishConnection()) {
- dialect.applySchemaChange(event, connection, sinkTablePath);
+ dialect.applySchemaChange(connection, sinkTablePath, event);
} catch (Throwable e) {
throw new JdbcConnectorException(
JdbcConnectorErrorCode.REFRESH_PHYSICAL_TABLESCHEMA_BY_SCHEMA_CHANGE_EVENT, e);
@@ -159,20 +104,4 @@ public abstract class AbstractJdbcSinkWriter<ResourceT>
.build();
this.outputFormat.open();
}
-
- protected void replaceColumnByIndex(
- EventType eventType, List<Column> columns, String oldColumnName,
Column newColumn) {
- for (int i = 0; i < columns.size(); i++) {
- Column column = columns.get(i);
- if (column.getName().equalsIgnoreCase(oldColumnName)) {
- // rename ...... to ...... which just has column name
- if (eventType.equals(EventType.SCHEMA_CHANGE_CHANGE_COLUMN)
- && newColumn.getDataType() == null) {
- columns.set(i, column.rename(newColumn.getName()));
- } else {
- columns.set(i, newColumn);
- }
- }
- }
- }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
index 41dd41ff9e..3f43b2088d 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
@@ -17,9 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
-import org.apache.seatunnel.api.event.EventType;
import org.apache.seatunnel.api.sink.MultiTableResourceManager;
-import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -30,7 +28,6 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorExc
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormatBuilder;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionPoolProviderProxy;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
@@ -43,8 +40,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Optional;
-import static
org.apache.seatunnel.api.event.EventType.SCHEMA_CHANGE_CHANGE_COLUMN;
-
@Slf4j
public class JdbcSinkWriter extends
AbstractJdbcSinkWriter<ConnectionPoolManager> {
private final Integer primaryKeyIndex;
@@ -167,22 +162,4 @@ public class JdbcSinkWriter extends
AbstractJdbcSinkWriter<ConnectionPoolManager
outputFormat.close();
}
}
-
- @Override
- protected void replaceColumnByIndex(
- EventType eventType, List<Column> columns, String oldColumnName,
Column newColumn) {
- // The operation of renaming a column in Oracle is only supported to
modify the column name,
- // so we just modify the column name directly.
- if (eventType.equals(SCHEMA_CHANGE_CHANGE_COLUMN) && dialect
instanceof OracleDialect) {
- for (int i = 0; i < columns.size(); i++) {
- Column column = columns.get(i);
- if (column.getName().equalsIgnoreCase(oldColumnName)) {
- column = column.rename(newColumn.getName());
- columns.set(i, column);
- }
- }
- return;
- }
- super.replaceColumnByIndex(eventType, columns, oldColumnName,
newColumn);
- }
}