This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new d66d55e5 [fix]Fix SQLParser parsing the default value of datetime type
as the current time (#464)
d66d55e5 is described below
commit d66d55e5bf881ba3462868dd9717f085e4ddd5be
Author: wudongliang <[email protected]>
AuthorDate: Thu Aug 8 17:26:08 2024 +0800
[fix]Fix SQLParser parsing the default value of datetime type as the
current time (#464)
---
.../flink/sink/schema/SQLParserSchemaManager.java | 39 +++++++++++++--
.../sink/schema/SQLParserSchemaManagerTest.java | 55 ++++++++++++++++++----
2 files changed, 81 insertions(+), 13 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
index 9289c886..67a2ddac 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
@@ -32,6 +32,7 @@ import net.sf.jsqlparser.statement.create.table.CreateTable;
import net.sf.jsqlparser.statement.create.table.Index;
import org.apache.commons.collections.CollectionUtils;
import org.apache.doris.flink.catalog.doris.DorisSchemaFactory;
+import org.apache.doris.flink.catalog.doris.DorisType;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.catalog.doris.TableSchema;
import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeUtils;
@@ -42,9 +43,14 @@ import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
/** Use {@link CCJSqlParserUtil} to parse SQL statements. */
public class SQLParserSchemaManager implements Serializable {
@@ -54,6 +60,16 @@ public class SQLParserSchemaManager implements Serializable {
private static final String PRIMARY = "PRIMARY";
private static final String PRIMARY_KEY = "PRIMARY KEY";
private static final String UNIQUE = "UNIQUE";
+ private static final String DORIS_CURRENT_TIMESTAMP = "CURRENT_TIMESTAMP";
+ private static final Set<String> sourceConnectorTimeValues =
+ new HashSet<>(
+ Arrays.asList(
+ "SYSDATE",
+ "SYSTIMESTAMP",
+ "CURRENT_TIMESTAMP",
+ "NOW()",
+ "CURRENT TIMESTAMP",
+ "GETDATE()"));
/**
* Doris' schema change only supports ADD, DROP, and RENAME operations.
This method is only used
@@ -126,7 +142,8 @@ public class SQLParserSchemaManager implements Serializable
{
ColDataType colDataType =
column.getColDataType();
String dataType =
parseDataType(colDataType, sourceConnector);
List<String> columnSpecs =
column.getColumnSpecs();
- String defaultValue =
extractDefaultValue(columnSpecs);
+ String defaultValue =
+ extractDefaultValue(dataType,
columnSpecs);
String comment =
extractComment(columnSpecs);
FieldSchema fieldSchema =
new FieldSchema(
@@ -232,7 +249,7 @@ public class SQLParserSchemaManager implements Serializable
{
String datatype = parseDataType(colDataType, sourceConnector);
List<String> columnSpecs = columnDataType.getColumnSpecs();
- String defaultValue = extractDefaultValue(columnSpecs);
+ String defaultValue = extractDefaultValue(datatype, columnSpecs);
String comment = extractComment(columnSpecs);
FieldSchema fieldSchema = new FieldSchema(columnName, datatype,
defaultValue, comment);
String addColumnDDL =
SchemaChangeHelper.buildAddColumnDDL(dorisTable, fieldSchema);
@@ -267,11 +284,25 @@ public class SQLParserSchemaManager implements
Serializable {
}
@VisibleForTesting
- public String extractDefaultValue(List<String> columnSpecs) {
+ public String extractDefaultValue(String dateType, List<String>
columnSpecs) {
if (CollectionUtils.isEmpty(columnSpecs)) {
return null;
}
- return extractAdjacentString(columnSpecs, DEFAULT);
+ String adjacentDefaultValue = extractAdjacentString(columnSpecs,
DEFAULT);
+ return parseDorisDefaultValue(dateType, adjacentDefaultValue);
+ }
+
+ private String parseDorisDefaultValue(String dateType, String
defaultValue) {
+ if (Objects.isNull(defaultValue)) {
+ return null;
+ }
+ // In doris, DATETIME supports specifying the current time by default
through
+ // CURRENT_TIMESTAMP.
+ if ((dateType.startsWith(DorisType.DATETIME) ||
dateType.startsWith(DorisType.DATETIME_V2))
+ &&
sourceConnectorTimeValues.contains(defaultValue.toUpperCase(Locale.ROOT))) {
+ return DORIS_CURRENT_TIMESTAMP;
+ }
+ return defaultValue;
}
private String extractAdjacentString(List<String> columnSpecs, String key)
{
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
index d8cb7c3f..d65deeb0 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
@@ -17,6 +17,7 @@
package org.apache.doris.flink.sink.schema;
+import org.apache.doris.flink.catalog.doris.DorisType;
import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.tools.cdc.DorisTableConfig;
import org.apache.doris.flink.tools.cdc.SourceConnector;
@@ -139,7 +140,7 @@ public class SQLParserSchemaManagerTest {
public void testExtractDefaultValue() {
String expectDefault = "100";
List<String> columnSpecs = Arrays.asList("default", "'100'",
"comment", "");
- String actualDefault = schemaManager.extractDefaultValue(columnSpecs);
+ String actualDefault =
schemaManager.extractDefaultValue(DorisType.INT, columnSpecs);
Assert.assertEquals(expectDefault, actualDefault);
}
@@ -147,14 +148,14 @@ public class SQLParserSchemaManagerTest {
public void testExtractDefaultValueQuotes() {
String expectDefault = "100";
List<String> columnSpecs = Arrays.asList("default", "\"100\"",
"comment", "");
- String actualDefault = schemaManager.extractDefaultValue(columnSpecs);
+ String actualDefault =
schemaManager.extractDefaultValue(DorisType.BIGINT, columnSpecs);
Assert.assertEquals(expectDefault, actualDefault);
}
@Test
public void testExtractDefaultValueNull() {
List<String> columnSpecs = Arrays.asList("Default", null, "comment",
null);
- String actualDefault = schemaManager.extractDefaultValue(columnSpecs);
+ String actualDefault =
schemaManager.extractDefaultValue(DorisType.STRING, columnSpecs);
Assert.assertNull(actualDefault);
}
@@ -162,7 +163,7 @@ public class SQLParserSchemaManagerTest {
public void testExtractDefaultValueEmpty() {
String expectDefault = null;
List<String> columnSpecs = Arrays.asList("DEFAULT", "comment", null);
- String actualDefault = schemaManager.extractDefaultValue(columnSpecs);
+ String actualDefault =
schemaManager.extractDefaultValue(DorisType.STRING, columnSpecs);
Assert.assertEquals(expectDefault, actualDefault);
}
@@ -170,17 +171,53 @@ public class SQLParserSchemaManagerTest {
public void testExtractDefaultValueA() {
String expectDefault = "aaa";
List<String> columnSpecs = Arrays.asList("default", "aaa");
- String actualDefault = schemaManager.extractDefaultValue(columnSpecs);
+ String actualDefault =
schemaManager.extractDefaultValue(DorisType.STRING, columnSpecs);
Assert.assertEquals(expectDefault, actualDefault);
}
@Test
public void testExtractDefaultValueNULL() {
List<String> columnSpecs = Collections.singletonList("default");
- String actualDefault = schemaManager.extractDefaultValue(columnSpecs);
+ String actualDefault =
schemaManager.extractDefaultValue(DorisType.STRING, columnSpecs);
Assert.assertNull(actualDefault);
}
+ @Test
+ public void testExtractDefaultValueDateTime() {
+ List<String> columnSpecs = Arrays.asList("default", "SYSTIMESTAMP");
+ String actualDefault =
schemaManager.extractDefaultValue(DorisType.DATETIME, columnSpecs);
+ Assert.assertEquals("CURRENT_TIMESTAMP", actualDefault);
+ }
+
+ @Test
+ public void testExtractDefaultValueDateTimeV2() {
+ List<String> columnSpecs = Arrays.asList("default", "GETDATE()");
+ String actualDefault =
+ schemaManager.extractDefaultValue(DorisType.DATETIME_V2,
columnSpecs);
+ Assert.assertEquals("CURRENT_TIMESTAMP", actualDefault);
+ }
+
+ @Test
+ public void testExtractDefaultValueDateTimeV2Time() {
+ List<String> columnSpecs = Arrays.asList("default", "2024-03-14
17:50:36.002");
+ String actualDefault =
schemaManager.extractDefaultValue("DATETIMEV2(3)", columnSpecs);
+ Assert.assertEquals("2024-03-14 17:50:36.002", actualDefault);
+ }
+
+ @Test
+ public void testExtractDefaultValueDateTimeV2CurrentTime() {
+ List<String> columnSpecs = Arrays.asList("default", "now()");
+ String actualDefault =
schemaManager.extractDefaultValue("DATETIMEV2(3)", columnSpecs);
+ Assert.assertEquals("CURRENT_TIMESTAMP", actualDefault);
+ }
+
+ @Test
+ public void testExtractDefaultValueDate() {
+ List<String> columnSpecs = Arrays.asList("default", "2024-03-14
17:50:36");
+ String actualDefault =
schemaManager.extractDefaultValue(DorisType.DATE, columnSpecs);
+ Assert.assertEquals("2024-03-14 17:50:36", actualDefault);
+ }
+
@Test
public void testRemoveContinuousChar() {
// Test removing continuous target characters from both ends
@@ -288,7 +325,7 @@ public class SQLParserSchemaManagerTest {
SourceConnector.ORACLE, ddl, dorisTable, null);
String expected =
- "TableSchema{database='doris', table='auto_tab',
tableComment='null', fields={employee_id=FieldSchema{name='employee_id',
typeString='BIGINT', defaultValue='null', comment='null'},
first_name=FieldSchema{name='first_name', typeString='VARCHAR(150)',
defaultValue='null', comment='null'}, last_name=FieldSchema{name='last_name',
typeString='VARCHAR(150)', defaultValue='null', comment='null'},
email=FieldSchema{name='email', typeString='VARCHAR(300)', defaultValue='null',
com [...]
+ "TableSchema{database='doris', table='auto_tab',
tableComment='null', fields={employee_id=FieldSchema{name='employee_id',
typeString='BIGINT', defaultValue='null', comment='null'},
first_name=FieldSchema{name='first_name', typeString='VARCHAR(150)',
defaultValue='null', comment='null'}, last_name=FieldSchema{name='last_name',
typeString='VARCHAR(150)', defaultValue='null', comment='null'},
email=FieldSchema{name='email', typeString='VARCHAR(300)', defaultValue='null',
com [...]
Assert.assertEquals(expected, tableSchema.toString());
}
@@ -314,7 +351,7 @@ public class SQLParserSchemaManagerTest {
SourceConnector.ORACLE, ddl, dorisTable, null);
String expected =
- "TableSchema{database='doris', table='auto_tab',
tableComment='null', fields={employee_id=FieldSchema{name='employee_id',
typeString='BIGINT', defaultValue='null', comment='null'},
first_name=FieldSchema{name='first_name', typeString='VARCHAR(150)',
defaultValue='null', comment='null'}, last_name=FieldSchema{name='last_name',
typeString='VARCHAR(150)', defaultValue='null', comment='null'},
email=FieldSchema{name='email', typeString='VARCHAR(300)', defaultValue='null',
com [...]
+ "TableSchema{database='doris', table='auto_tab',
tableComment='null', fields={employee_id=FieldSchema{name='employee_id',
typeString='BIGINT', defaultValue='null', comment='null'},
first_name=FieldSchema{name='first_name', typeString='VARCHAR(150)',
defaultValue='null', comment='null'}, last_name=FieldSchema{name='last_name',
typeString='VARCHAR(150)', defaultValue='null', comment='null'},
email=FieldSchema{name='email', typeString='VARCHAR(300)', defaultValue='null',
com [...]
Assert.assertEquals(expected, tableSchema.toString());
}
@@ -341,7 +378,7 @@ public class SQLParserSchemaManagerTest {
dorisTable,
new DorisTableConfig(new HashMap<>()));
String expected =
- "TableSchema{database='doris', table='auto_tab',
tableComment='null', fields={order_id=FieldSchema{name='order_id',
typeString='BIGINT', defaultValue='null', comment='null'},
customer_id=FieldSchema{name='customer_id', typeString='BIGINT',
defaultValue='null', comment='null'}, order_date=FieldSchema{name='order_date',
typeString='DATETIMEV2', defaultValue='SYSDATE', comment='null'},
status=FieldSchema{name='status', typeString='VARCHAR(60)',
defaultValue='null', comment=' [...]
+ "TableSchema{database='doris', table='auto_tab',
tableComment='null', fields={order_id=FieldSchema{name='order_id',
typeString='BIGINT', defaultValue='null', comment='null'},
customer_id=FieldSchema{name='customer_id', typeString='BIGINT',
defaultValue='null', comment='null'}, order_date=FieldSchema{name='order_date',
typeString='DATETIMEV2', defaultValue='CURRENT_TIMESTAMP', comment='null'},
status=FieldSchema{name='status', typeString='VARCHAR(60)',
defaultValue='null', [...]
Assert.assertEquals(expected, tableSchema.toString());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]