This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c5f67399 [Bug] [Connector] Clickhouse sink 
Int8/UInt8/Int16/UInt16/Int32 datatypes support to handle Byte/Short value 
(#1748) (#1749)
c5f67399 is described below

commit c5f67399901655c390fc5cefa0a2eaa8e3461c17
Author: xpleaf <[email protected]>
AuthorDate: Wed Apr 27 15:05:12 2022 +0800

    [Bug] [Connector] Clickhouse sink Int8/UInt8/Int16/UInt16/Int32 datatypes 
support to handle Byte/Short value (#1748) (#1749)
---
 .../apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala    | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
index 0719ebb3..202979a4 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
@@ -234,7 +234,15 @@ class Clickhouse extends SparkBatchSink {
             statement.setTimestamp(index + 1, 
Timestamp.valueOf(value.toString))
         }
       case "Int8" | "UInt8" | "Int16" | "UInt16" | "Int32" =>
-        statement.setInt(index + 1, item.getAs[Int](fieldIndex))
+        val value = item.get(fieldIndex)
+        value match {
+          case byte: Byte =>
+            statement.setByte(index + 1, byte.byteValue())
+          case short: Short =>
+            statement.setShort(index + 1, short.shortValue())
+          case _ =>
+            statement.setInt(index + 1, value.asInstanceOf[Int])
+        }
       case "UInt32" | "UInt64" | "Int64" =>
         statement.setLong(index + 1, item.getAs[Long](fieldIndex))
       case "Float32" =>

Reply via email to