This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 1a92f2a7 [Fix](cdc) Fix sql_parse schema table annotation and field
type parsing inaccuracies (#540)
1a92f2a7 is described below
commit 1a92f2a719256b38baa65dcee8f686ccb9580095
Author: SHHH <[email protected]>
AuthorDate: Fri Jan 10 14:04:38 2025 +0800
[Fix](cdc) Fix sql_parse schema table annotation and field type parsing
inaccuracies (#540)
---
.../flink/sink/schema/SQLParserSchemaManager.java | 73 +++++++++++++++++-----
.../sink/schema/SQLParserSchemaManagerTest.java | 33 +++++++++-
2 files changed, 86 insertions(+), 20 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
index 67a2ddac..626f17ec 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
@@ -61,6 +61,8 @@ public class SQLParserSchemaManager implements Serializable {
private static final String PRIMARY_KEY = "PRIMARY KEY";
private static final String UNIQUE = "UNIQUE";
private static final String DORIS_CURRENT_TIMESTAMP = "CURRENT_TIMESTAMP";
+ private static final List<String> TYPE_MODIFIER =
+ Arrays.asList("UNSIGNED", "ZEROFILL", "PRECISION");
private static final Set<String> sourceConnectorTimeValues =
new HashSet<>(
Arrays.asList(
@@ -139,15 +141,13 @@ public class SQLParserSchemaManager implements
Serializable {
.forEach(
column -> {
String columnName = column.getColumnName();
- ColDataType colDataType =
column.getColDataType();
- String dataType =
parseDataType(colDataType, sourceConnector);
List<String> columnSpecs =
column.getColumnSpecs();
- String defaultValue =
- extractDefaultValue(dataType,
columnSpecs);
- String comment =
extractComment(columnSpecs);
FieldSchema fieldSchema =
- new FieldSchema(
- columnName, dataType,
defaultValue, comment);
+ getFieldSchema(
+ column.getColumnName(),
+ column.getColumnSpecs(),
+ column.getColDataType(),
+ sourceConnector);
columnFields.put(columnName, fieldSchema);
extractColumnPrimaryKey(columnName,
columnSpecs, pkKeys);
});
@@ -181,6 +181,20 @@ public class SQLParserSchemaManager implements
Serializable {
return null;
}
+ private String extractTypeModifier(List<String> columnSpecs) {
+ if (CollectionUtils.isEmpty(columnSpecs)) {
+ return "";
+ }
+ StringBuilder builder = new StringBuilder();
+ for (String columnSpec : columnSpecs) {
+ String columnSpecUpperCase = columnSpec.toUpperCase(Locale.ROOT);
+ if (TYPE_MODIFIER.contains(columnSpecUpperCase)) {
+ builder.append(" ").append(columnSpecUpperCase);
+ }
+ }
+ return builder.toString();
+ }
+
private void extractIndexesPrimaryKey(List<Index> indexes, List<String>
pkKeys) {
if (CollectionUtils.isEmpty(indexes)) {
return;
@@ -215,10 +229,23 @@ public class SQLParserSchemaManager implements
Serializable {
if (CollectionUtils.isEmpty(tableOptionsStrings)) {
return null;
}
+
+ for (int i = 0; i < tableOptionsStrings.size(); i++) {
+ String columnSpec = tableOptionsStrings.get(i);
+ // If you encounter a COMMENT and the next element is an equal
sign (=)
+ if (COMMENT.equalsIgnoreCase(columnSpec)
+ && i + 1 < tableOptionsStrings.size()
+ && "=".equals(tableOptionsStrings.get(i + 1))) {
+ tableOptionsStrings.remove(i + 1);
+ break;
+ }
+ }
+
return extractAdjacentString(tableOptionsStrings, COMMENT);
}
- private String parseDataType(ColDataType colDataType, SourceConnector
sourceConnector) {
+ private String parseDataType(
+ ColDataType colDataType, String typeModifier, SourceConnector
sourceConnector) {
String dataType = colDataType.getDataType();
int length = 0;
int scale = 0;
@@ -229,7 +256,8 @@ public class SQLParserSchemaManager implements Serializable
{
scale = Integer.parseInt(argumentsStringList.get(1));
}
}
- return JsonDebeziumChangeUtils.buildDorisTypeName(sourceConnector,
dataType, length, scale);
+ return JsonDebeziumChangeUtils.buildDorisTypeName(
+ sourceConnector, dataType + typeModifier, length, scale);
}
private String processDropColumnOperation(AlterExpression alterExpression,
String dorisTable) {
@@ -244,14 +272,12 @@ public class SQLParserSchemaManager implements
Serializable {
List<ColumnDataType> colDataTypeList =
alterExpression.getColDataTypeList();
List<String> addColumnList = new ArrayList<>();
for (ColumnDataType columnDataType : colDataTypeList) {
- String columnName = columnDataType.getColumnName();
- ColDataType colDataType = columnDataType.getColDataType();
- String datatype = parseDataType(colDataType, sourceConnector);
-
- List<String> columnSpecs = columnDataType.getColumnSpecs();
- String defaultValue = extractDefaultValue(datatype, columnSpecs);
- String comment = extractComment(columnSpecs);
- FieldSchema fieldSchema = new FieldSchema(columnName, datatype,
defaultValue, comment);
+ FieldSchema fieldSchema =
+ getFieldSchema(
+ columnDataType.getColumnName(),
+ columnDataType.getColumnSpecs(),
+ columnDataType.getColDataType(),
+ sourceConnector);
String addColumnDDL =
SchemaChangeHelper.buildAddColumnDDL(dorisTable, fieldSchema);
LOG.info("Parsed add column DDL SQL is: {}", addColumnDDL);
addColumnList.add(addColumnDDL);
@@ -259,6 +285,19 @@ public class SQLParserSchemaManager implements
Serializable {
return addColumnList;
}
+ private FieldSchema getFieldSchema(
+ String columnName,
+ List<String> columnSpecs,
+ ColDataType colDataType,
+ SourceConnector sourceConnector) {
+ String typeModifier = extractTypeModifier(columnSpecs);
+ String datatype = parseDataType(colDataType, typeModifier,
sourceConnector);
+
+ String defaultValue = extractDefaultValue(datatype, columnSpecs);
+ String comment = extractComment(columnSpecs);
+ return new FieldSchema(columnName, datatype, defaultValue, comment);
+ }
+
private String processChangeColumnOperation(
AlterExpression alterExpression, String dorisTable) {
String columnNewName =
alterExpression.getColDataTypeList().get(0).getColumnName();
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
index d65deeb0..c7b23c18 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
@@ -53,10 +53,12 @@ public class SQLParserSchemaManagerTest {
"ALTER TABLE `doris`.`tab` ADD COLUMN `create_time`
DATETIMEV2(3) DEFAULT CURRENT_TIMESTAMP COMMENT 'time_comment'");
expectDDLs.add("ALTER TABLE `doris`.`tab` RENAME COLUMN `c10` `c11`");
expectDDLs.add("ALTER TABLE `doris`.`tab` RENAME COLUMN `c12` `c13`");
+ expectDDLs.add(
+ "ALTER TABLE `doris`.`tab` ADD COLUMN `card` LARGEINT COMMENT
'card_comment'");
SourceConnector mysql = SourceConnector.MYSQL;
String ddl =
- "alter table t1 drop c1, drop column c2, add c3 int default
100, add column `decimal_type` decimal(38,9) DEFAULT '1.123456789' COMMENT
'decimal_type_comment', add `create_time` datetime(3) DEFAULT
CURRENT_TIMESTAMP(3) comment 'time_comment', rename column c10 to c11, change
column c12 c13 varchar(10)";
+ "alter table t1 drop c1, drop column c2, add c3 int default
100, add column `decimal_type` decimal(38,9) DEFAULT '1.123456789' COMMENT
'decimal_type_comment', add `create_time` datetime(3) DEFAULT
CURRENT_TIMESTAMP(3) comment 'time_comment', rename column c10 to c11, change
column c12 c13 varchar(10), add card bigint(20) unsigned NOT NULL COMMENT
'card_comment'";
List<String> actualDDLs = schemaManager.parseAlterDDLs(mysql, ddl,
dorisTable);
for (String actualDDL : actualDDLs) {
Assert.assertTrue(expectDDLs.contains(actualDDL));
@@ -257,13 +259,38 @@ public class SQLParserSchemaManagerTest {
+ " `decimal_type3` decimal(38,9) DEFAULT
'1.123456789' COMMENT 'comment_test',\n"
+ " `create_time3` datetime(3) DEFAULT
CURRENT_TIMESTAMP(3) COMMENT 'ttime_aaa',\n"
+ " PRIMARY KEY (`id`)\n"
- + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
COLLATE=utf8mb4_0900_ai_ci";
+ + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
COLLATE=utf8mb4_0900_ai_ci comment='test_sinka'";
+ TableSchema tableSchema =
+ schemaManager.parseCreateTableStatement(
+ SourceConnector.MYSQL, ddl, dorisTable, null);
+
+ String expected =
+ "TableSchema{database='doris', table='auto_tab',
tableComment='test_sinka', fields={`id`=FieldSchema{name='`id`',
typeString='INT', defaultValue='10000', comment='id_test'},
`create_time`=FieldSchema{name='`create_time`', typeString='DATETIMEV2(3)',
defaultValue='CURRENT_TIMESTAMP', comment='null'},
`c1`=FieldSchema{name='`c1`', typeString='INT', defaultValue='999',
comment='null'}, `decimal_type`=FieldSchema{name='`decimal_type`',
typeString='DECIMALV3(9,3)', defaultValu [...]
+ Assert.assertEquals(expected, tableSchema.toString());
+ }
+
+ @Test
+ public void testParseCreateTableUnsignedStatement() {
+ String dorisTable = "doris.auto_tab";
+ String ddl =
+ "CREATE TABLE `test_sinka` (\n"
+ + " `id` BIGINT NOT NULL DEFAULT '10000' COMMENT
'id_test',\n"
+ + " `id2` BIGINT UNSIGNED ZEROFILL NOT NULL DEFAULT
'10000' COMMENT 'id2_comment',\n"
+ + " `create_time` datetime(3) DEFAULT
CURRENT_TIMESTAMP(3),\n"
+ + " `c1` int DEFAULT '999',\n"
+ + " `decimal_type` decimal(9,3) DEFAULT '1.000'
COMMENT 'decimal_tes',\n"
+ + " `aaa` varchar(100) DEFAULT NULL,\n"
+ + " `decimal_type3` decimal(38,9) DEFAULT
'1.123456789' COMMENT 'comment_test',\n"
+ + " `create_time3` datetime(3) DEFAULT
CURRENT_TIMESTAMP(3) COMMENT 'ttime_aaa',\n"
+ + " PRIMARY KEY (`id`)\n"
+ + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
COLLATE=utf8mb4_0900_ai_ci comment 'test_sinka'";
TableSchema tableSchema =
schemaManager.parseCreateTableStatement(
SourceConnector.MYSQL, ddl, dorisTable, null);
String expected =
- "TableSchema{database='doris', table='auto_tab',
tableComment='null', fields={`id`=FieldSchema{name='`id`', typeString='INT',
defaultValue='10000', comment='id_test'},
`create_time`=FieldSchema{name='`create_time`', typeString='DATETIMEV2(3)',
defaultValue='CURRENT_TIMESTAMP', comment='null'},
`c1`=FieldSchema{name='`c1`', typeString='INT', defaultValue='999',
comment='null'}, `decimal_type`=FieldSchema{name='`decimal_type`',
typeString='DECIMALV3(9,3)', defaultValue='1.0 [...]
+ "TableSchema{database='doris', table='auto_tab',
tableComment='test_sinka', fields={`id`=FieldSchema{name='`id`',
typeString='BIGINT', defaultValue='10000', comment='id_test'},
`id2`=FieldSchema{name='`id2`', typeString='LARGEINT', defaultValue='10000',
comment='id2_comment'}, `create_time`=FieldSchema{name='`create_time`',
typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='null'},
`c1`=FieldSchema{name='`c1`', typeString='INT', defaultValue='999', com [...]
+ System.out.println(tableSchema.toString());
Assert.assertEquals(expected, tableSchema.toString());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]