This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit 4a21b135ce77dbea346c6489d88ce52c307c70f5 Author: yongkang.zhong <[email protected]> AuthorDate: Fri Mar 31 13:40:10 2023 +0800 [improve](clickhouse jdbc) support clickhouse jdbc 4.x version (#18258) In clickhouse's 4.x version of jdbc, some UInt types use special Java types, so I adapted Doris's ClickHouse JDBC External ``` com.clickhouse.data.value.UnsignedByte; com.clickhouse.data.value.UnsignedInteger; com.clickhouse.data.value.UnsignedLong; com.clickhouse.data.value.UnsignedShort; ``` --- be/src/vec/exec/vjdbc_connector.cpp | 8 +- .../org/apache/doris/external/jdbc/JdbcClient.java | 3 +- fe/java-udf/pom.xml | 6 ++ .../java/org/apache/doris/udf/JdbcExecutor.java | 99 ++++++++++++++++++++++ 4 files changed, 112 insertions(+), 4 deletions(-) diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp index 830f4b491f..572be9e733 100644 --- a/be/src/vec/exec/vjdbc_connector.cpp +++ b/be/src/vec/exec/vjdbc_connector.cpp @@ -261,7 +261,9 @@ Status JdbcConnector::_check_type(SlotDescriptor* slot_desc, const std::string& case TYPE_SMALLINT: case TYPE_INT: { if (type_str != "java.lang.Short" && type_str != "java.lang.Integer" && - type_str != "java.math.BigDecimal" && type_str != "java.lang.Byte") { + type_str != "java.math.BigDecimal" && type_str != "java.lang.Byte" && + type_str != "com.clickhouse.data.value.UnsignedByte" && + type_str != "com.clickhouse.data.value.UnsignedShort") { return Status::InternalError(error_msg); } break; @@ -269,7 +271,9 @@ Status JdbcConnector::_check_type(SlotDescriptor* slot_desc, const std::string& case TYPE_BIGINT: case TYPE_LARGEINT: { if (type_str != "java.lang.Long" && type_str != "java.math.BigDecimal" && - type_str != "java.math.BigInteger") { + type_str != "java.math.BigInteger" && + type_str != "com.clickhouse.data.value.UnsignedInteger" && + type_str != "com.clickhouse.data.value.UnsignedLong") { return Status::InternalError(error_msg); } break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClient.java b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClient.java index b152d324bc..c29b18dae0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClient.java @@ -578,7 +578,7 @@ public class JdbcClient { || ckType.startsWith("FixedString")) { return ScalarType.createStringType(); } else if (ckType.startsWith("DateTime")) { - return ScalarType.createDatetimeV2Type(0); + return ScalarType.createDatetimeV2Type(6); } else if (ckType.startsWith("Array")) { String cktype = ckType.substring(6, ckType.length() - 1); fieldSchema.setDataTypeName(cktype); @@ -616,7 +616,6 @@ public class JdbcClient { default: return Type.UNSUPPORTED; } - // Todo(zyk): Wait the JDBC external table support the array type then supported clickhouse array type } public Type oracleTypeToDoris(JdbcFieldSchema fieldSchema) { diff --git a/fe/java-udf/pom.xml b/fe/java-udf/pom.xml index 163fa5c0ae..e28cfb8470 100644 --- a/fe/java-udf/pom.xml +++ b/fe/java-udf/pom.xml @@ -111,6 +111,12 @@ under the License. <version>${junit.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>com.clickhouse</groupId> + <artifactId>clickhouse-jdbc</artifactId> + <version>0.4.2</version> + <classifier>all</classifier> + </dependency> </dependencies> <build> <finalName>java-udf</finalName> diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java index 4b848ef1a0..0d284265fb 100644 --- a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java +++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java @@ -22,6 +22,10 @@ import org.apache.doris.thrift.TJdbcOperation; import org.apache.doris.thrift.TOdbcTableType; import com.alibaba.druid.pool.DruidDataSource; +import com.clickhouse.data.value.UnsignedByte; +import com.clickhouse.data.value.UnsignedInteger; +import com.clickhouse.data.value.UnsignedLong; +import com.clickhouse.data.value.UnsignedShort; import com.google.common.base.Preconditions; import org.apache.log4j.Logger; import org.apache.thrift.TDeserializer; @@ -350,6 +354,23 @@ public class JdbcExecutor { } } + private void bytePutToByte(Object[] column, boolean isNullable, int numRows, long nullMapAddr, + long columnAddr, int startRowForNullable) { + if (isNullable) { + for (int i = startRowForNullable; i < numRows; i++) { + if (column[i] == null) { + UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); + } else { + UdfUtils.UNSAFE.putByte(columnAddr + i, (Byte) column[i]); + } + } + } else { + for (int i = 0; i < numRows; i++) { + UdfUtils.UNSAFE.putByte(columnAddr + i, (Byte) column[i]); + } + } + } + public void copyBatchTinyIntResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, long columnAddr) { Object[] column = (Object[]) columnObj; @@ -366,6 +387,8 @@ public class JdbcExecutor { integerPutToByte(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); } else if (column[firstNotNullIndex] instanceof Short) { shortPutToByte(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); + } else if (column[firstNotNullIndex] instanceof Byte) { + bytePutToByte(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); } } @@ -420,6 +443,23 @@ public class JdbcExecutor { } } + public void clickHouseUInt8ToInt(Object[] column, boolean isNullable, int numRows, long nullMapAddr, + long columnAddr, int startRowForNullable) { + if (isNullable) { + for (int i = startRowForNullable; i < numRows; i++) { + if (column[i] == null) { + UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); + } else { + UdfUtils.UNSAFE.putShort(columnAddr + (i * 2L), (short) ((UnsignedByte) column[i]).intValue()); + } + } + } else { + for (int i = 0; i < numRows; i++) { + UdfUtils.UNSAFE.putShort(columnAddr + (i * 2L), (short) ((UnsignedByte) column[i]).intValue()); + } + } + } + public void copyBatchSmallIntResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, long columnAddr) { Object[] column = (Object[]) columnObj; @@ -436,6 +476,8 @@ public class JdbcExecutor { integerPutToShort(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); } else if (column[firstNotNullIndex] instanceof Short) { shortPutToShort(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); + } else if (column[firstNotNullIndex] instanceof com.clickhouse.data.value.UnsignedByte) { + clickHouseUInt8ToInt(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); } } @@ -474,6 +516,23 @@ public class JdbcExecutor { } } + public void clickHouseUInt16ToInt(Object[] column, boolean isNullable, int numRows, long nullMapAddr, + long columnAddr, int startRowForNullable) { + if (isNullable) { + for (int i = startRowForNullable; i < numRows; i++) { + if (column[i] == null) { + UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); + } else { + UdfUtils.UNSAFE.putInt(columnAddr + (i * 4L), ((UnsignedShort) column[i]).intValue()); + } + } + } else { + for (int i = 0; i < numRows; i++) { + UdfUtils.UNSAFE.putInt(columnAddr + (i * 4L), ((UnsignedShort) column[i]).intValue()); + } + } + } + public void copyBatchIntResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, long columnAddr) { Object[] column = (Object[]) columnObj; @@ -488,6 +547,8 @@ public class JdbcExecutor { bigDecimalPutToInt(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); } else if (column[firstNotNullIndex] instanceof Integer) { integerPutToInt(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); + } else if (column[firstNotNullIndex] instanceof com.clickhouse.data.value.UnsignedShort) { + clickHouseUInt16ToInt(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); } } @@ -525,6 +586,23 @@ public class JdbcExecutor { } } + private void clickHouseUInt32ToLong(Object[] column, boolean isNullable, int numRows, long nullMapAddr, + long columnAddr, int startRowForNullable) { + if (isNullable) { + for (int i = startRowForNullable; i < numRows; i++) { + if (column[i] == null) { + UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); + } else { + UdfUtils.UNSAFE.putLong(columnAddr + (i * 8L), ((UnsignedInteger) column[i]).longValue()); + } + } + } else { + for (int i = 0; i < numRows; i++) { + UdfUtils.UNSAFE.putLong(columnAddr + (i * 8L), ((UnsignedInteger) column[i]).longValue()); + } + } + } + public void copyBatchBigIntResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, long columnAddr) { Object[] column = (Object[]) columnObj; @@ -539,6 +617,8 @@ public class JdbcExecutor { bigDecimalPutToLong(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); } else if (column[firstNotNullIndex] instanceof Long) { longPutToLong(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); + } else if (column[firstNotNullIndex] instanceof com.clickhouse.data.value.UnsignedInteger) { + clickHouseUInt32ToLong(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); } } @@ -591,6 +671,23 @@ public class JdbcExecutor { } } + private void clickHouseUInt64ToLong(Object[] column, boolean isNullable, int numRows, long nullMapAddr, + long columnAddr, int startRowForNullable) { + if (isNullable) { + for (int i = startRowForNullable; i < numRows; i++) { + if (column[i] == null) { + UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); + } else { + UdfUtils.UNSAFE.putLong(columnAddr + (i * 16L), ((UnsignedLong) column[i]).longValue()); + } + } + } else { + for (int i = 0; i < numRows; i++) { + UdfUtils.UNSAFE.putLong(columnAddr + (i * 16L), ((UnsignedLong) column[i]).longValue()); + } + } + } + public void copyBatchLargeIntResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, long columnAddr) { Object[] column = (Object[]) columnObj; @@ -605,6 +702,8 @@ public class JdbcExecutor { bigDecimalPutToBigInteger(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); } else if (column[firstNotNullIndex] instanceof BigInteger) { bigIntegerPutToByte(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); + } else if (column[firstNotNullIndex] instanceof com.clickhouse.data.value.UnsignedLong) { + clickHouseUInt64ToLong(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
