This is an automated email from the ASF dual-hosted git repository.
zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 9178356 [fix] improve char length proplem (#262)
9178356 is described below
commit 91783560773ff958c5689a494349f5996ed86308
Author: wudi <[email protected]>
AuthorDate: Tue Dec 12 10:04:34 2023 +0800
[fix] improve char length proplem (#262)
---
.../apache/doris/flink/catalog/DorisTypeMapper.java | 19 +++++++++++++++----
.../apache/doris/flink/tools/cdc/mysql/MysqlType.java | 2 +-
.../sink/writer/TestJsonDebeziumSchemaSerializer.java | 2 +-
3 files changed, 17 insertions(+), 6 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
index 9c99b4f..7da45b9 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
@@ -59,6 +59,12 @@ import static
org.apache.doris.flink.catalog.doris.DorisType.VARCHAR;
public class DorisTypeMapper {
+ /** Max size of char type of Doris. */
+ public static final int MAX_CHAR_SIZE = 255;
+
+ /** Max size of varchar type of Doris. */
+ public static final int MAX_VARCHAR_SIZE = 65533;
+
public static DataType toFlinkType(String columnName, String columnType,
int precision, int scale) {
columnType = columnType.toUpperCase();
switch (columnType) {
@@ -119,14 +125,19 @@ public class DorisTypeMapper {
@Override
public String visit(CharType charType) {
- return String.format("%s(%s)", DorisType.CHAR,
charType.getLength());
+ long length = charType.getLength() * 3L;
+ if (length <= MAX_CHAR_SIZE) {
+ return String.format("%s(%s)", DorisType.CHAR, length);
+ } else {
+ return visit(new VarCharType(charType.getLength()));
+ }
}
@Override
public String visit(VarCharType varCharType) {
- //Flink varchar length max value is int, it may overflow after
multiplying by 4
- long length = varCharType.getLength();
- return length * 4 >= 65533 ? STRING : String.format("%s(%s)",
VARCHAR, length * 4);
+ //Flink varchar length max value is int, it may overflow after
multiplying by 3
+ long length = varCharType.getLength() * 3L;
+ return length >= MAX_VARCHAR_SIZE ? STRING :
String.format("%s(%s)", VARCHAR, length);
}
@Override
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java
index 1b0b9d4..143ea52 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java
@@ -142,7 +142,7 @@ public class MysqlType {
case CHAR:
case VARCHAR:
Preconditions.checkNotNull(length);
- return length * 4 > 65533 ? DorisType.STRING :
String.format("%s(%s)", DorisType.VARCHAR, length * 4);
+ return length * 3 > 65533 ? DorisType.STRING :
String.format("%s(%s)", DorisType.VARCHAR, length * 3);
case TINYTEXT:
case TEXT:
case MEDIUMTEXT:
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
index 32aedab..2117f5f 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
@@ -253,7 +253,7 @@ public class TestJsonDebeziumSchemaSerializer {
public void testFillOriginSchema() throws IOException {
Map<String, FieldSchema> srcFiledSchemaMap = new LinkedHashMap<>();
srcFiledSchemaMap.put("id", new FieldSchema("id", "INT", null, null));
- srcFiledSchemaMap.put("name", new FieldSchema("name", "VARCHAR(200)",
null, null));
+ srcFiledSchemaMap.put("name", new FieldSchema("name", "VARCHAR(150)",
null, null));
srcFiledSchemaMap.put("test_time", new FieldSchema("test_time",
"DATETIMEV2(0)", null, null));
srcFiledSchemaMap.put("c1", new FieldSchema("c1", "INT", "'100'",
null));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]