This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3fb05da365 [Improve][Connector-V2] Improve schema evolution on column
insert after for mysql-jdbc (#8017)
3fb05da365 is described below
commit 3fb05da365649dddaeaf7cc21e167037b3bd40f6
Author: Shiwanming <[email protected]>
AuthorDate: Sat Nov 16 21:25:48 2024 +0800
[Improve][Connector-V2] Improve schema evolution on column insert after for
mysql-jdbc (#8017)
---
.../jdbc/internal/dialect/JdbcDialect.java | 35 ++++++++++++++++++----
.../jdbc/sink/AbstractJdbcSinkWriter.java | 24 +++++++++++++--
.../src/test/resources/ddl/add_columns.sql | 13 ++++++++
.../src/test/resources/ddl/drop_columns.sql | 2 +-
4 files changed, 66 insertions(+), 8 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index 7b72c43ed5..953bb30ee7 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -529,6 +529,7 @@ public interface JdbcDialect extends Serializable {
throws SQLException {
String tableIdentifierWithQuoted = tableIdentifier(tablePath);
Column addColumn = event.getColumn();
+ String afterColumn = event.getAfterColumn();
String addColumnSQL =
buildAlterTableSql(
event.getSourceDialectName(),
@@ -536,7 +537,8 @@ public interface JdbcDialect extends Serializable {
AlterType.ADD.name(),
addColumn,
tableIdentifierWithQuoted,
- StringUtils.EMPTY);
+ StringUtils.EMPTY,
+ afterColumn);
try (Statement statement = connection.createStatement()) {
log.info("Executing add column SQL: " + addColumnSQL);
statement.execute(addColumnSQL);
@@ -549,6 +551,7 @@ public interface JdbcDialect extends Serializable {
String tableIdentifierWithQuoted = tableIdentifier(tablePath);
Column changeColumn = event.getColumn();
String oldColumnName = event.getOldColumn();
+ String afterColumn = event.getAfterColumn();
String changeColumnSQL =
buildAlterTableSql(
event.getSourceDialectName(),
@@ -558,7 +561,8 @@ public interface JdbcDialect extends Serializable {
: AlterType.CHANGE.name(),
changeColumn,
tableIdentifierWithQuoted,
- oldColumnName);
+ oldColumnName,
+ afterColumn);
try (Statement statement = connection.createStatement()) {
log.info("Executing change column SQL: " + changeColumnSQL);
@@ -571,6 +575,7 @@ public interface JdbcDialect extends Serializable {
throws SQLException {
String tableIdentifierWithQuoted = tableIdentifier(tablePath);
Column modifyColumn = event.getColumn();
+ String afterColumn = event.getAfterColumn();
String modifyColumnSQL =
buildAlterTableSql(
event.getSourceDialectName(),
@@ -578,7 +583,8 @@ public interface JdbcDialect extends Serializable {
AlterType.MODIFY.name(),
modifyColumn,
tableIdentifierWithQuoted,
- StringUtils.EMPTY);
+ StringUtils.EMPTY,
+ afterColumn);
try (Statement statement = connection.createStatement()) {
log.info("Executing modify column SQL: " + modifyColumnSQL);
@@ -598,7 +604,8 @@ public interface JdbcDialect extends Serializable {
AlterType.DROP.name(),
null,
tableIdentifierWithQuoted,
- dropColumn);
+ dropColumn,
+ null);
try (Statement statement = connection.createStatement()) {
log.info("Executing drop column SQL: " + dropColumnSQL);
statement.execute(dropColumnSQL);
@@ -614,6 +621,7 @@ public interface JdbcDialect extends Serializable {
* @param newColumn new column after ddl
* @param tableName table name of sink table
* @param oldColumnName old column name before ddl
+ * @param afterColumn column before the new column
* @return alter table sql for sink table after schema change
*/
default String buildAlterTableSql(
@@ -622,7 +630,8 @@ public interface JdbcDialect extends Serializable {
String alterOperation,
Column newColumn,
String tableName,
- String oldColumnName) {
+ String oldColumnName,
+ String afterColumn) {
if (StringUtils.equals(alterOperation, AlterType.DROP.name())) {
return String.format(
"ALTER TABLE %s drop column %s", tableName,
quoteIdentifier(oldColumnName));
@@ -654,6 +663,7 @@ public interface JdbcDialect extends Serializable {
}
basicSql = decorateWithNullable(basicSql, typeBasicTypeDefine,
sourceDialectName);
basicSql = decorateWithComment(tableName, basicSql,
typeBasicTypeDefine);
+ basicSql = decorateWithAfterColumn(basicSql, afterColumn);
return dialectName().equals(DatabaseIdentifier.ORACLE) ? basicSql :
basicSql + ";";
}
@@ -779,6 +789,21 @@ public interface JdbcDialect extends Serializable {
return sql.toString();
}
+ /**
+ * decorate with after
+ *
+ * @param basicSql alter table sql for sink table
+ * @param afterColumn column before the new column
+ * @return alter table sql with after for sink table
+ */
+ default String decorateWithAfterColumn(String basicSql, String
afterColumn) {
+ StringBuilder sql = new StringBuilder(basicSql);
+ if (StringUtils.isNotBlank(afterColumn)) {
+ sql.append("AFTER ").append(afterColumn).append(StringUtils.SPACE);
+ }
+ return sql.toString();
+ }
+
/**
* whether quotes with default value
*
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java
index 5cedb688d7..4d7a9ef2fd 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java
@@ -52,6 +52,7 @@ import java.io.IOException;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
@Slf4j
public abstract class AbstractJdbcSinkWriter<ResourceT>
@@ -90,8 +91,27 @@ public abstract class AbstractJdbcSinkWriter<ResourceT>
List<Column> columns = new ArrayList<>(tableSchema.getColumns());
switch (event.getEventType()) {
case SCHEMA_CHANGE_ADD_COLUMN:
- Column addColumn = ((AlterTableAddColumnEvent)
event).getColumn();
- columns.add(addColumn);
+ AlterTableAddColumnEvent alterTableAddColumnEvent =
+ (AlterTableAddColumnEvent) event;
+ Column addColumn = alterTableAddColumnEvent.getColumn();
+ String afterColumn = alterTableAddColumnEvent.getAfterColumn();
+ if (StringUtils.isNotBlank(afterColumn)) {
+ Optional<Column> columnOptional =
+ columns.stream()
+ .filter(column ->
afterColumn.equals(column.getName()))
+ .findFirst();
+ if (!columnOptional.isPresent()) {
+ columns.add(addColumn);
+ break;
+ }
+ columnOptional.ifPresent(
+ column -> {
+ int index = columns.indexOf(column);
+ columns.add(index + 1, addColumn);
+ });
+ } else {
+ columns.add(addColumn);
+ }
break;
case SCHEMA_CHANGE_DROP_COLUMN:
String dropColumn = ((AlterTableDropColumnEvent)
event).getColumn();
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/add_columns.sql
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/add_columns.sql
index 7ec4e23e62..d08c19c8ca 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/add_columns.sql
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/add_columns.sql
@@ -64,6 +64,19 @@ values (128,"scooter","Small 2-wheel
scooter",3.14,'xx',1,1.1,'2023-02-02 09:09:
(136,"spare tire","24 inch spare tire",22.2,'xx',9,1.9,'2023-02-02
09:09:09');
update products set name = 'dailai' where id = 135;
+alter table products ADD COLUMN add_column6 varchar(64) not null default 'ff'
after id;
+delete from products where id = 115;
+insert into products
+values (173,'tt',"scooter","Small 2-wheel scooter",3.14,'xx',1,1.1,'2023-02-02
09:09:09'),
+ (174,'tt',"car battery","12V car battery",8.1,'xx',2,1.2,'2023-02-02
09:09:09'),
+ (175,'tt',"12-pack drill bits","12-pack of drill bits with sizes
ranging from #40 to #3",0.8,'xx',3,1.3,'2023-02-02 09:09:09'),
+ (176,'tt',"hammer","12oz carpenter's
hammer",0.75,'xx',4,1.4,'2023-02-02 09:09:09'),
+ (177,'tt',"hammer","14oz carpenter's
hammer",0.875,'xx',5,1.5,'2023-02-02 09:09:09'),
+ (178,'tt',"hammer","16oz carpenter's hammer",1.0,'xx',6,1.6,'2023-02-02
09:09:09'),
+ (179,'tt',"rocks","box of assorted rocks",5.3,'xx',7,1.7,'2023-02-02
09:09:09'),
+ (180,'tt',"jacket","water resistent black wind
breaker",0.1,'xx',8,1.8,'2023-02-02 09:09:09'),
+ (181,'tt',"spare tire","24 inch spare tire",22.2,'xx',9,1.9,'2023-02-02
09:09:09');
-- add column for irrelevant table
ALTER TABLE products_on_hand ADD COLUMN add_column5 varchar(64) not null
default 'yy';
+
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/drop_columns.sql
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/drop_columns.sql
index d6502bbfdf..5c3b7d1f54 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/drop_columns.sql
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/drop_columns.sql
@@ -21,7 +21,7 @@
CREATE DATABASE IF NOT EXISTS `shop`;
use shop;
-alter table products drop column add_column4;
+alter table products drop column add_column4,drop column add_column6;
insert into products
values (137,"scooter","Small 2-wheel scooter",3.14,'xx',1,1.1),
(138,"car battery","12V car battery",8.1,'xx',2,1.2),