This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b43244e  [Bug#851][plugin-spark-output-clickhouse] DateTime/Date 
Column support Timestamp (#1433)
b43244e is described below

commit b43244e65d73d3373cbc07b1218abfdae1925a39
Author: Kyle.Hu <[email protected]>
AuthorDate: Tue Mar 8 16:55:08 2022 +0800

    [Bug#851][plugin-spark-output-clickhouse] DateTime/Date Column support 
Timestamp (#1433)
---
 .../apache/seatunnel/spark/sink/Clickhouse.scala    | 21 ++++++++++++++++-----
 1 file changed, 16 insertions(+), 5 deletions(-)

diff --git 
a/seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
 
b/seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
index c1d6445..cfac518 100644
--- 
a/seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
+++ 
b/seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
@@ -18,9 +18,9 @@ package org.apache.seatunnel.spark.sink
 
 import net.jpountz.xxhash.{XXHash64, XXHashFactory}
 import org.apache.commons.lang3.StringUtils
-
 import java.math.{BigDecimal, BigInteger}
-import java.sql.PreparedStatement
+import java.sql.{Date, PreparedStatement, Timestamp}
+
 import java.text.SimpleDateFormat
 import java.util
 import java.util.{Objects, Properties}
@@ -308,6 +308,8 @@ class Clickhouse extends SparkBatchSink {
     fieldType match {
       case "DateTime" | "Date" | "String" =>
         statement.setString(index + 1, 
Clickhouse.renderStringDefault(fieldType))
+      case Clickhouse.datetime64Pattern(_) =>
+        statement.setString(index + 1, 
Clickhouse.renderStringDefault(fieldType))
       case "Int8" | "UInt8" | "Int16" | "Int32" | "UInt32" | "UInt16" =>
         statement.setInt(index + 1, 0)
       case "UInt64" | "Int64" =>
@@ -331,6 +333,7 @@ class Clickhouse extends SparkBatchSink {
       case "String" =>
         statement.setNull(index + 1, java.sql.Types.VARCHAR)
       case "DateTime" => statement.setNull(index + 1, java.sql.Types.DATE)
+      case Clickhouse.datetime64Pattern(_) => statement.setNull(index + 1, 
java.sql.Types.TIMESTAMP)
       case "Date" => statement.setNull(index + 1, java.sql.Types.TIME)
       case "Int8" | "UInt8" | "Int16" | "Int32" | "UInt32" | "UInt16" =>
         statement.setNull(index + 1, java.sql.Types.INTEGER)
@@ -348,8 +351,12 @@ class Clickhouse extends SparkBatchSink {
                                        item: Row,
                                        statement: PreparedStatement): Unit = {
     fieldType match {
-      case "DateTime" | "Date" | "String" =>
+      case "String" =>
         statement.setString(index + 1, item.getAs[String](fieldIndex))
+      case "Date" =>
+        statement.setDate(index + 1, item.getAs[Date](fieldIndex))
+      case "DateTime" | Clickhouse.datetime64Pattern(_) =>
+        statement.setTimestamp(index + 1, item.getAs[Timestamp](fieldIndex))
       case "Int8" | "UInt8" | "Int16" | "UInt16" | "Int32" =>
         statement.setInt(index + 1, item.getAs[Int](fieldIndex))
       case "UInt32" | "UInt64" | "Int64" =>
@@ -395,7 +402,7 @@ class Clickhouse extends SparkBatchSink {
           renderDefaultStatement(i, fieldType, statement)
         } else {
           fieldType match {
-            case "String" | "DateTime" | "Date" | Clickhouse.arrayPattern(_) =>
+            case "String" | "DateTime" | Clickhouse.datetime64Pattern(_) | 
"Date" | Clickhouse.arrayPattern(_) =>
               renderBaseTypeStatement(i, fieldIndex, fieldType, item, 
statement)
             case Clickhouse.floatPattern(_) | Clickhouse.intPattern(_) | 
Clickhouse.uintPattern(_) =>
               renderBaseTypeStatement(i, fieldIndex, fieldType, item, 
statement)
@@ -447,6 +454,7 @@ object Clickhouse {
   val uintPattern: Regex = "(UInt.*)".r
   val floatPattern: Regex = "(Float.*)".r
   val decimalPattern: Regex = "(Decimal.*)".r
+  val datetime64Pattern: Regex = "(DateTime64\\(.*\\))".r
 
   /**
    * Seatunnel support this clickhouse data type or not.
@@ -458,7 +466,7 @@ object Clickhouse {
     dataType match {
       case "Date" | "DateTime" | "String" =>
         true
-      case nullablePattern(_) | floatPattern(_) | intPattern(_) | 
uintPattern(_) =>
+      case nullablePattern(_) | floatPattern(_) | intPattern(_) | 
uintPattern(_) | datetime64Pattern(_) =>
         true
       case arrayPattern(_) =>
         true
@@ -476,6 +484,9 @@ object Clickhouse {
       case "DateTime" =>
         val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss")
         dateFormat.format(System.currentTimeMillis())
+      case datetime64Pattern(_) =>
+        val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss.SSS")
+        dateFormat.format(System.currentTimeMillis())
       case "Date" =>
         val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
         dateFormat.format(System.currentTimeMillis())

Reply via email to