This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b43244e [Bug#851][plugin-spark-output-clickhouse] DateTime/Date
Column support Timestamp (#1433)
b43244e is described below
commit b43244e65d73d3373cbc07b1218abfdae1925a39
Author: Kyle.Hu <[email protected]>
AuthorDate: Tue Mar 8 16:55:08 2022 +0800
[Bug#851][plugin-spark-output-clickhouse] DateTime/Date Column support
Timestamp (#1433)
---
.../apache/seatunnel/spark/sink/Clickhouse.scala | 21 ++++++++++++++++-----
1 file changed, 16 insertions(+), 5 deletions(-)
diff --git
a/seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
b/seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
index c1d6445..cfac518 100644
---
a/seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
+++
b/seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
@@ -18,9 +18,9 @@ package org.apache.seatunnel.spark.sink
import net.jpountz.xxhash.{XXHash64, XXHashFactory}
import org.apache.commons.lang3.StringUtils
-
import java.math.{BigDecimal, BigInteger}
-import java.sql.PreparedStatement
+import java.sql.{Date, PreparedStatement, Timestamp}
+
import java.text.SimpleDateFormat
import java.util
import java.util.{Objects, Properties}
@@ -308,6 +308,8 @@ class Clickhouse extends SparkBatchSink {
fieldType match {
case "DateTime" | "Date" | "String" =>
statement.setString(index + 1,
Clickhouse.renderStringDefault(fieldType))
+ case Clickhouse.datetime64Pattern(_) =>
+ statement.setString(index + 1,
Clickhouse.renderStringDefault(fieldType))
case "Int8" | "UInt8" | "Int16" | "Int32" | "UInt32" | "UInt16" =>
statement.setInt(index + 1, 0)
case "UInt64" | "Int64" =>
@@ -331,6 +333,7 @@ class Clickhouse extends SparkBatchSink {
case "String" =>
statement.setNull(index + 1, java.sql.Types.VARCHAR)
case "DateTime" => statement.setNull(index + 1, java.sql.Types.DATE)
+ case Clickhouse.datetime64Pattern(_) => statement.setNull(index + 1,
java.sql.Types.TIMESTAMP)
case "Date" => statement.setNull(index + 1, java.sql.Types.TIME)
case "Int8" | "UInt8" | "Int16" | "Int32" | "UInt32" | "UInt16" =>
statement.setNull(index + 1, java.sql.Types.INTEGER)
@@ -348,8 +351,12 @@ class Clickhouse extends SparkBatchSink {
item: Row,
statement: PreparedStatement): Unit = {
fieldType match {
- case "DateTime" | "Date" | "String" =>
+ case "String" =>
statement.setString(index + 1, item.getAs[String](fieldIndex))
+ case "Date" =>
+ statement.setDate(index + 1, item.getAs[Date](fieldIndex))
+ case "DateTime" | Clickhouse.datetime64Pattern(_) =>
+ statement.setTimestamp(index + 1, item.getAs[Timestamp](fieldIndex))
case "Int8" | "UInt8" | "Int16" | "UInt16" | "Int32" =>
statement.setInt(index + 1, item.getAs[Int](fieldIndex))
case "UInt32" | "UInt64" | "Int64" =>
@@ -395,7 +402,7 @@ class Clickhouse extends SparkBatchSink {
renderDefaultStatement(i, fieldType, statement)
} else {
fieldType match {
- case "String" | "DateTime" | "Date" | Clickhouse.arrayPattern(_) =>
+ case "String" | "DateTime" | Clickhouse.datetime64Pattern(_) |
"Date" | Clickhouse.arrayPattern(_) =>
renderBaseTypeStatement(i, fieldIndex, fieldType, item,
statement)
case Clickhouse.floatPattern(_) | Clickhouse.intPattern(_) |
Clickhouse.uintPattern(_) =>
renderBaseTypeStatement(i, fieldIndex, fieldType, item,
statement)
@@ -447,6 +454,7 @@ object Clickhouse {
val uintPattern: Regex = "(UInt.*)".r
val floatPattern: Regex = "(Float.*)".r
val decimalPattern: Regex = "(Decimal.*)".r
+ val datetime64Pattern: Regex = "(DateTime64\\(.*\\))".r
/**
* Seatunnel support this clickhouse data type or not.
@@ -458,7 +466,7 @@ object Clickhouse {
dataType match {
case "Date" | "DateTime" | "String" =>
true
- case nullablePattern(_) | floatPattern(_) | intPattern(_) |
uintPattern(_) =>
+ case nullablePattern(_) | floatPattern(_) | intPattern(_) |
uintPattern(_) | datetime64Pattern(_) =>
true
case arrayPattern(_) =>
true
@@ -476,6 +484,9 @@ object Clickhouse {
case "DateTime" =>
val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss")
dateFormat.format(System.currentTimeMillis())
+ case datetime64Pattern(_) =>
+ val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss.SSS")
+ dateFormat.format(System.currentTimeMillis())
case "Date" =>
val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
dateFormat.format(System.currentTimeMillis())