This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 4a21b135ce77dbea346c6489d88ce52c307c70f5
Author: yongkang.zhong <[email protected]>
AuthorDate: Fri Mar 31 13:40:10 2023 +0800

    [improve](clickhouse jdbc) support clickhouse jdbc 4.x version (#18258)
    
    In clickhouse's 4.x version of jdbc, some UInt types use special Java 
types, so I adapted Doris's ClickHouse JDBC External
    ```
    com.clickhouse.data.value.UnsignedByte;
    com.clickhouse.data.value.UnsignedInteger;
    com.clickhouse.data.value.UnsignedLong;
    com.clickhouse.data.value.UnsignedShort;
    ```
---
 be/src/vec/exec/vjdbc_connector.cpp                |  8 +-
 .../org/apache/doris/external/jdbc/JdbcClient.java |  3 +-
 fe/java-udf/pom.xml                                |  6 ++
 .../java/org/apache/doris/udf/JdbcExecutor.java    | 99 ++++++++++++++++++++++
 4 files changed, 112 insertions(+), 4 deletions(-)

diff --git a/be/src/vec/exec/vjdbc_connector.cpp 
b/be/src/vec/exec/vjdbc_connector.cpp
index 830f4b491f..572be9e733 100644
--- a/be/src/vec/exec/vjdbc_connector.cpp
+++ b/be/src/vec/exec/vjdbc_connector.cpp
@@ -261,7 +261,9 @@ Status JdbcConnector::_check_type(SlotDescriptor* 
slot_desc, const std::string&
     case TYPE_SMALLINT:
     case TYPE_INT: {
         if (type_str != "java.lang.Short" && type_str != "java.lang.Integer" &&
-            type_str != "java.math.BigDecimal" && type_str != 
"java.lang.Byte") {
+            type_str != "java.math.BigDecimal" && type_str != "java.lang.Byte" 
&&
+            type_str != "com.clickhouse.data.value.UnsignedByte" &&
+            type_str != "com.clickhouse.data.value.UnsignedShort") {
             return Status::InternalError(error_msg);
         }
         break;
@@ -269,7 +271,9 @@ Status JdbcConnector::_check_type(SlotDescriptor* 
slot_desc, const std::string&
     case TYPE_BIGINT:
     case TYPE_LARGEINT: {
         if (type_str != "java.lang.Long" && type_str != "java.math.BigDecimal" 
&&
-            type_str != "java.math.BigInteger") {
+            type_str != "java.math.BigInteger" &&
+            type_str != "com.clickhouse.data.value.UnsignedInteger" &&
+            type_str != "com.clickhouse.data.value.UnsignedLong") {
             return Status::InternalError(error_msg);
         }
         break;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClient.java 
b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClient.java
index b152d324bc..c29b18dae0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClient.java
@@ -578,7 +578,7 @@ public class JdbcClient {
                 || ckType.startsWith("FixedString")) {
             return ScalarType.createStringType();
         } else if (ckType.startsWith("DateTime")) {
-            return ScalarType.createDatetimeV2Type(0);
+            return ScalarType.createDatetimeV2Type(6);
         } else if (ckType.startsWith("Array")) {
             String cktype = ckType.substring(6, ckType.length() - 1);
             fieldSchema.setDataTypeName(cktype);
@@ -616,7 +616,6 @@ public class JdbcClient {
             default:
                 return Type.UNSUPPORTED;
         }
-        // Todo(zyk): Wait the JDBC external table support the array type then 
supported clickhouse array type
     }
 
     public Type oracleTypeToDoris(JdbcFieldSchema fieldSchema) {
diff --git a/fe/java-udf/pom.xml b/fe/java-udf/pom.xml
index 163fa5c0ae..e28cfb8470 100644
--- a/fe/java-udf/pom.xml
+++ b/fe/java-udf/pom.xml
@@ -111,6 +111,12 @@ under the License.
             <version>${junit.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.clickhouse</groupId>
+            <artifactId>clickhouse-jdbc</artifactId>
+            <version>0.4.2</version>
+            <classifier>all</classifier>
+        </dependency>
     </dependencies>
     <build>
         <finalName>java-udf</finalName>
diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java 
b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
index 4b848ef1a0..0d284265fb 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
@@ -22,6 +22,10 @@ import org.apache.doris.thrift.TJdbcOperation;
 import org.apache.doris.thrift.TOdbcTableType;
 
 import com.alibaba.druid.pool.DruidDataSource;
+import com.clickhouse.data.value.UnsignedByte;
+import com.clickhouse.data.value.UnsignedInteger;
+import com.clickhouse.data.value.UnsignedLong;
+import com.clickhouse.data.value.UnsignedShort;
 import com.google.common.base.Preconditions;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TDeserializer;
@@ -350,6 +354,23 @@ public class JdbcExecutor {
         }
     }
 
+    private void bytePutToByte(Object[] column, boolean isNullable, int 
numRows, long nullMapAddr,
+            long columnAddr, int startRowForNullable) {
+        if (isNullable) {
+            for (int i = startRowForNullable; i < numRows; i++) {
+                if (column[i] == null) {
+                    UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+                } else {
+                    UdfUtils.UNSAFE.putByte(columnAddr + i, (Byte) column[i]);
+                }
+            }
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                UdfUtils.UNSAFE.putByte(columnAddr + i, (Byte) column[i]);
+            }
+        }
+    }
+
     public void copyBatchTinyIntResult(Object columnObj, boolean isNullable, 
int numRows, long nullMapAddr,
             long columnAddr) {
         Object[] column = (Object[]) columnObj;
@@ -366,6 +387,8 @@ public class JdbcExecutor {
             integerPutToByte(column, isNullable, numRows, nullMapAddr, 
columnAddr, firstNotNullIndex);
         } else if (column[firstNotNullIndex] instanceof Short) {
             shortPutToByte(column, isNullable, numRows, nullMapAddr, 
columnAddr, firstNotNullIndex);
+        } else if (column[firstNotNullIndex] instanceof Byte) {
+            bytePutToByte(column, isNullable, numRows, nullMapAddr, 
columnAddr, firstNotNullIndex);
         }
     }
 
@@ -420,6 +443,23 @@ public class JdbcExecutor {
         }
     }
 
+    public void clickHouseUInt8ToInt(Object[] column, boolean isNullable, int 
numRows, long nullMapAddr,
+            long columnAddr, int startRowForNullable) {
+        if (isNullable) {
+            for (int i = startRowForNullable; i < numRows; i++) {
+                if (column[i] == null) {
+                    UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+                } else {
+                    UdfUtils.UNSAFE.putShort(columnAddr + (i * 2L), (short) 
((UnsignedByte) column[i]).intValue());
+                }
+            }
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                UdfUtils.UNSAFE.putShort(columnAddr + (i * 2L), (short) 
((UnsignedByte) column[i]).intValue());
+            }
+        }
+    }
+
     public void copyBatchSmallIntResult(Object columnObj, boolean isNullable, 
int numRows, long nullMapAddr,
             long columnAddr) {
         Object[] column = (Object[]) columnObj;
@@ -436,6 +476,8 @@ public class JdbcExecutor {
             integerPutToShort(column, isNullable, numRows, nullMapAddr, 
columnAddr, firstNotNullIndex);
         } else if (column[firstNotNullIndex] instanceof Short) {
             shortPutToShort(column, isNullable, numRows, nullMapAddr, 
columnAddr, firstNotNullIndex);
+        } else if (column[firstNotNullIndex] instanceof 
com.clickhouse.data.value.UnsignedByte) {
+            clickHouseUInt8ToInt(column, isNullable, numRows, nullMapAddr, 
columnAddr, firstNotNullIndex);
         }
     }
 
@@ -474,6 +516,23 @@ public class JdbcExecutor {
         }
     }
 
+    public void clickHouseUInt16ToInt(Object[] column, boolean isNullable, int 
numRows, long nullMapAddr,
+            long columnAddr, int startRowForNullable) {
+        if (isNullable) {
+            for (int i = startRowForNullable; i < numRows; i++) {
+                if (column[i] == null) {
+                    UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+                } else {
+                    UdfUtils.UNSAFE.putInt(columnAddr + (i * 4L), 
((UnsignedShort) column[i]).intValue());
+                }
+            }
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                UdfUtils.UNSAFE.putInt(columnAddr + (i * 4L), ((UnsignedShort) 
column[i]).intValue());
+            }
+        }
+    }
+
     public void copyBatchIntResult(Object columnObj, boolean isNullable, int 
numRows, long nullMapAddr,
             long columnAddr) {
         Object[] column = (Object[]) columnObj;
@@ -488,6 +547,8 @@ public class JdbcExecutor {
             bigDecimalPutToInt(column, isNullable, numRows, nullMapAddr, 
columnAddr, firstNotNullIndex);
         } else if (column[firstNotNullIndex] instanceof Integer) {
             integerPutToInt(column, isNullable, numRows, nullMapAddr, 
columnAddr, firstNotNullIndex);
+        } else if (column[firstNotNullIndex] instanceof 
com.clickhouse.data.value.UnsignedShort) {
+            clickHouseUInt16ToInt(column, isNullable, numRows, nullMapAddr, 
columnAddr, firstNotNullIndex);
         }
     }
 
@@ -525,6 +586,23 @@ public class JdbcExecutor {
         }
     }
 
+    private void clickHouseUInt32ToLong(Object[] column, boolean isNullable, 
int numRows, long nullMapAddr,
+            long columnAddr, int startRowForNullable) {
+        if (isNullable) {
+            for (int i = startRowForNullable; i < numRows; i++) {
+                if (column[i] == null) {
+                    UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+                } else {
+                    UdfUtils.UNSAFE.putLong(columnAddr + (i * 8L), 
((UnsignedInteger) column[i]).longValue());
+                }
+            }
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                UdfUtils.UNSAFE.putLong(columnAddr + (i * 8L), 
((UnsignedInteger) column[i]).longValue());
+            }
+        }
+    }
+
     public void copyBatchBigIntResult(Object columnObj, boolean isNullable, 
int numRows, long nullMapAddr,
             long columnAddr) {
         Object[] column = (Object[]) columnObj;
@@ -539,6 +617,8 @@ public class JdbcExecutor {
             bigDecimalPutToLong(column, isNullable, numRows, nullMapAddr, 
columnAddr, firstNotNullIndex);
         } else if (column[firstNotNullIndex] instanceof Long) {
             longPutToLong(column, isNullable, numRows, nullMapAddr, 
columnAddr, firstNotNullIndex);
+        } else if (column[firstNotNullIndex] instanceof 
com.clickhouse.data.value.UnsignedInteger) {
+            clickHouseUInt32ToLong(column, isNullable, numRows, nullMapAddr, 
columnAddr, firstNotNullIndex);
         }
     }
 
@@ -591,6 +671,23 @@ public class JdbcExecutor {
         }
     }
 
+    private void clickHouseUInt64ToLong(Object[] column, boolean isNullable, 
int numRows, long nullMapAddr,
+            long columnAddr, int startRowForNullable) {
+        if (isNullable) {
+            for (int i = startRowForNullable; i < numRows; i++) {
+                if (column[i] == null) {
+                    UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+                } else {
+                    UdfUtils.UNSAFE.putLong(columnAddr + (i * 16L), 
((UnsignedLong) column[i]).longValue());
+                }
+            }
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                UdfUtils.UNSAFE.putLong(columnAddr + (i * 16L), 
((UnsignedLong) column[i]).longValue());
+            }
+        }
+    }
+
     public void copyBatchLargeIntResult(Object columnObj, boolean isNullable, 
int numRows, long nullMapAddr,
             long columnAddr) {
         Object[] column = (Object[]) columnObj;
@@ -605,6 +702,8 @@ public class JdbcExecutor {
             bigDecimalPutToBigInteger(column, isNullable, numRows, 
nullMapAddr, columnAddr, firstNotNullIndex);
         } else if (column[firstNotNullIndex] instanceof BigInteger) {
             bigIntegerPutToByte(column, isNullable, numRows, nullMapAddr, 
columnAddr, firstNotNullIndex);
+        } else if (column[firstNotNullIndex] instanceof 
com.clickhouse.data.value.UnsignedLong) {
+            clickHouseUInt64ToLong(column, isNullable, numRows, nullMapAddr, 
columnAddr, firstNotNullIndex);
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to