This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 678821f  [Bug] Fix type overflow bug (#249)
678821f is described below

commit 678821f59d3d98d140077b4145c61f5193e823b8
Author: wudi <[email protected]>
AuthorDate: Thu Nov 30 09:55:11 2023 +0800

    [Bug] Fix type overflow bug (#249)
    
    Co-authored-by: wudi <>
---
 .../main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java    | 5 +++--
 .../src/main/java/org/apache/doris/flink/rest/RestService.java       | 4 ++--
 .../java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java | 1 +
 3 files changed, 6 insertions(+), 4 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
index fda3c93..9c99b4f 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
@@ -124,8 +124,9 @@ public class DorisTypeMapper {
 
         @Override
         public String visit(VarCharType varCharType) {
-            int length = varCharType.getLength();
-            return length * 4 > 65533 ? STRING : String.format("%s(%s)", 
VARCHAR, length * 4);
+            //Flink varchar length max value is int, it may overflow after 
multiplying by 4
+            long length = varCharType.getLength();
+            return length * 4 >= 65533 ? STRING : String.format("%s(%s)", 
VARCHAR, length * 4);
         }
 
         @Override
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
index cd02209..219806e 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -458,10 +458,10 @@ public class RestService implements Serializable {
 
     public static boolean isUniqueKeyType(DorisOptions options, 
DorisReadOptions readOptions, Logger logger)
             throws DorisRuntimeException {
-        //Enable 2pc in multi-table scenario
+        //disable 2pc in multi-table scenario
         if(StringUtils.isBlank(options.getTableIdentifier())){
             logger.info("table model verification is skipped in multi-table 
scenarios.");
-            return false;
+            return true;
         }
         try {
             return UNIQUE_KEYS_TYPE.equals(getSchema(options, readOptions, 
logger).getKeysType());
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
index d9c3345..296fb2f 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
@@ -120,6 +120,7 @@ public class SchemaChangeManager implements Serializable {
         if(StringUtils.isNullOrWhitespaceOnly(ddl)){
             return false;
         }
+        LOG.info("Execute SQL: {}", ddl);
         Map<String, String> param = new HashMap<>();
         param.put("stmt", ddl);
         String requestUrl = String.format(SCHEMA_CHANGE_API,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to