This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 9178356  [fix] improve char length proplem (#262)
9178356 is described below

commit 91783560773ff958c5689a494349f5996ed86308
Author: wudi <[email protected]>
AuthorDate: Tue Dec 12 10:04:34 2023 +0800

    [fix] improve char length proplem (#262)
---
 .../apache/doris/flink/catalog/DorisTypeMapper.java   | 19 +++++++++++++++----
 .../apache/doris/flink/tools/cdc/mysql/MysqlType.java |  2 +-
 .../sink/writer/TestJsonDebeziumSchemaSerializer.java |  2 +-
 3 files changed, 17 insertions(+), 6 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
index 9c99b4f..7da45b9 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
@@ -59,6 +59,12 @@ import static 
org.apache.doris.flink.catalog.doris.DorisType.VARCHAR;
 
 public class DorisTypeMapper {
 
+    /** Max size of char type of Doris. */
+    public static final int MAX_CHAR_SIZE = 255;
+
+    /** Max size of varchar type of Doris. */
+    public static final int MAX_VARCHAR_SIZE = 65533;
+
     public static DataType toFlinkType(String columnName, String columnType, 
int precision, int scale) {
         columnType = columnType.toUpperCase();
         switch (columnType) {
@@ -119,14 +125,19 @@ public class DorisTypeMapper {
 
         @Override
         public String visit(CharType charType) {
-            return String.format("%s(%s)", DorisType.CHAR, 
charType.getLength());
+            long length = charType.getLength() * 3L;
+            if (length <= MAX_CHAR_SIZE) {
+                return String.format("%s(%s)", DorisType.CHAR, length);
+            } else {
+                return visit(new VarCharType(charType.getLength()));
+            }
         }
 
         @Override
         public String visit(VarCharType varCharType) {
-            //Flink varchar length max value is int, it may overflow after 
multiplying by 4
-            long length = varCharType.getLength();
-            return length * 4 >= 65533 ? STRING : String.format("%s(%s)", 
VARCHAR, length * 4);
+            //Flink varchar length max value is int, it may overflow after 
multiplying by 3
+            long length = varCharType.getLength() * 3L;
+            return length >= MAX_VARCHAR_SIZE ? STRING : 
String.format("%s(%s)", VARCHAR, length);
         }
 
         @Override
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java
index 1b0b9d4..143ea52 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java
@@ -142,7 +142,7 @@ public class MysqlType {
             case CHAR:
             case VARCHAR:
                 Preconditions.checkNotNull(length);
-                return length * 4 > 65533 ? DorisType.STRING : 
String.format("%s(%s)", DorisType.VARCHAR, length * 4);
+                return length * 3 > 65533 ? DorisType.STRING : 
String.format("%s(%s)", DorisType.VARCHAR, length * 3);
             case TINYTEXT:
             case TEXT:
             case MEDIUMTEXT:
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
index 32aedab..2117f5f 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
@@ -253,7 +253,7 @@ public class TestJsonDebeziumSchemaSerializer {
     public void testFillOriginSchema() throws IOException {
         Map<String, FieldSchema> srcFiledSchemaMap = new LinkedHashMap<>();
         srcFiledSchemaMap.put("id", new FieldSchema("id", "INT", null, null));
-        srcFiledSchemaMap.put("name", new FieldSchema("name", "VARCHAR(200)", 
null, null));
+        srcFiledSchemaMap.put("name", new FieldSchema("name", "VARCHAR(150)", 
null, null));
         srcFiledSchemaMap.put("test_time", new FieldSchema("test_time", 
"DATETIMEV2(0)", null, null));
         srcFiledSchemaMap.put("c1", new FieldSchema("c1", "INT", "'100'", 
null));
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to