This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ea4b952af0 [INLONG-8743][Sort] Support more types of ddl in all
migration (#8744)
ea4b952af0 is described below
commit ea4b952af0e7ae5ba86061fe0faf483e12e42d17
Author: Sting <[email protected]>
AuthorDate: Wed Aug 16 19:39:35 2023 +0800
[INLONG-8743][Sort] Support more types of ddl in all migration (#8744)
---
.../inlong/sort/protocol/ddl/enums/AlterType.java | 4 +-
.../sort/protocol/ddl/expressions/AlterColumn.java | 6 +++
.../sort/cdc/mysql/utils/OperationUtils.java | 11 ++++
.../org/apache/inlong/sort/cdc/TestOperation.java | 62 ++++++++++++++++++++++
4 files changed, 82 insertions(+), 1 deletion(-)
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/AlterType.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/AlterType.java
index c6813b3998..5dfb48301e 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/AlterType.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/AlterType.java
@@ -26,6 +26,8 @@ public enum AlterType {
ADD_COLUMN,
DROP_COLUMN,
MODIFY_COLUMN,
- CHANGE_COLUMN
+ CHANGE_COLUMN,
+ DROP_CONSTRAINT,
+ ADD_CONSTRAINT
}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/AlterColumn.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/AlterColumn.java
index c8f30d5184..f76195fb9f 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/AlterColumn.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/AlterColumn.java
@@ -50,6 +50,12 @@ public class AlterColumn {
this.oldColumn = oldColumn;
}
+ public AlterColumn(@JsonProperty("alterType") AlterType alterType,
+ @JsonProperty("newColumn") Column newColumn) {
+ this.alterType = alterType;
+ this.newColumn = newColumn;
+ }
+
public AlterColumn(@JsonProperty("alterType") AlterType alterType) {
this.alterType = alterType;
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/OperationUtils.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/OperationUtils.java
index 028a98c912..7c5559f578 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/OperationUtils.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/OperationUtils.java
@@ -112,12 +112,23 @@ public class OperationUtils {
statement.getAlterExpressions().forEach(alterExpression -> {
switch (alterExpression.getOperation()) {
case DROP:
+ if (alterExpression.getConstraintName() != null) {
+ alterColumns.add(new
AlterColumn(AlterType.DROP_CONSTRAINT,
+ new
Column(reformatName(alterExpression.getConstraintName()))));
+ break;
+ }
alterColumns.add(new AlterColumn(AlterType.DROP_COLUMN,
null,
Column.builder().name(reformatName(alterExpression.getColumnName()))
.build()));
break;
case ADD:
+ if (alterExpression.getIndex() != null) {
+ // only support constraint type now
+ // the sink connector doesn't support add constraint
+ alterColumns.add(new
AlterColumn(AlterType.ADD_CONSTRAINT));
+ break;
+ }
alterColumns.add(new AlterColumn(AlterType.ADD_COLUMN,
parseColumnWithPosition(isFirst, sqlType,
alterExpression.getColDataTypeList().get(0)),
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/test/java/org/apache/inlong/sort/cdc/TestOperation.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/test/java/org/apache/inlong/sort/cdc/TestOperation.java
index baf3de8f68..963e13954d 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/test/java/org/apache/inlong/sort/cdc/TestOperation.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/test/java/org/apache/inlong/sort/cdc/TestOperation.java
@@ -23,6 +23,7 @@ import
org.apache.inlong.sort.protocol.ddl.enums.OperationType;
import org.apache.inlong.sort.protocol.ddl.enums.PositionType;
import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn;
import org.apache.inlong.sort.protocol.ddl.operations.AlterOperation;
+import org.apache.inlong.sort.protocol.ddl.operations.CreateTableOperation;
import org.apache.inlong.sort.protocol.ddl.operations.Operation;
import org.apache.inlong.sort.protocol.ddl.operations.UnsupportedOperation;
@@ -46,6 +47,15 @@ public class TestOperation {
Assert.assertEquals(operation.getOperationType(),
OperationType.RENAME);
}
+ @Test
+ public void testRenameTableByAlter() {
+ String sql = "alter table a rename to b";
+ HashMap<String, Integer> sqlType = new HashMap<>();
+ Operation operation = OperationUtils.generateOperation(sql, sqlType);
+ assert operation != null;
+ Assert.assertTrue(operation instanceof AlterOperation);
+ }
+
@Test
public void testDropTableOperation() {
String sql = "drop table `tv3`";
@@ -95,4 +105,56 @@ public class TestOperation {
Assert.assertEquals(operation.getOperationType(), OperationType.OTHER);
}
+ @Test
+ public void testCreateTableWithCharacterConstraint() {
+ String sql = "create table a (b int) engine=innodb character "
+ + "set=utf8 collate=utf8_bin row_format=dynamic";
+ HashMap<String, Integer> sqlType = new HashMap<>();
+ sqlType.put("b", 1);
+ Operation operation = OperationUtils.generateOperation(sql, sqlType);
+ assert operation != null;
+ Assert.assertTrue(operation instanceof CreateTableOperation);
+ }
+
+ @Test
+ public void alterTableChangeType() {
+ String sql = "ALTER TABLE test CHANGE COLUMN name name1 "
+ + "mediumtext character set utf8mb4 COLLATE=utf8 NULL";
+ HashMap<String, Integer> sqlType = new HashMap<>();
+ sqlType.put("name1", 1);
+ Operation operation = OperationUtils.generateOperation(sql, sqlType);
+ assert operation != null;
+ Assert.assertTrue(operation instanceof AlterOperation);
+ AlterColumn alterColumn = ((AlterOperation)
operation).getAlterColumns().get(0);
+ Assert.assertEquals(alterColumn.getAlterType(),
AlterType.CHANGE_COLUMN);
+ Assert.assertEquals(alterColumn.getNewColumn().getName(), "name1");
+ Assert.assertEquals(alterColumn.getOldColumn().getName(), "name");
+ }
+
+ @Test
+ public void dropTableConstraint() {
+ String sql = "ALTER TABLE test drop constraint a";
+ HashMap<String, Integer> sqlType = new HashMap<>();
+ sqlType.put("name1", 1);
+ Operation operation = OperationUtils.generateOperation(sql, sqlType);
+ assert operation != null;
+ Assert.assertTrue(operation instanceof AlterOperation);
+ Assert.assertEquals(operation.getOperationType(), OperationType.ALTER);
+ Assert.assertEquals(((AlterOperation)
operation).getAlterColumns().get(0).getAlterType(),
+ AlterType.DROP_CONSTRAINT);
+ }
+
+ @Test
+ public void addConstraint() {
+ String sql = "ALTER TABLE test add constraint primary key (a)";
+ HashMap<String, Integer> sqlType = new HashMap<>();
+ sqlType.put("name1", 1);
+ Operation operation = OperationUtils.generateOperation(sql, sqlType);
+ assert operation != null;
+ Assert.assertTrue(operation instanceof AlterOperation);
+ Assert.assertEquals(operation.getOperationType(), OperationType.ALTER);
+ Assert.assertEquals(((AlterOperation)
operation).getAlterColumns().get(0).getAlterType(),
+ AlterType.ADD_CONSTRAINT);
+ }
+
}