This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f0a1f5013 [Connector-V2] [Clickhouse] Fix Clickhouse Type Mapping and 
Spark Map reconvert Bug (#2767)
f0a1f5013 is described below

commit f0a1f5013a9c68331f5c6b28c34d4b207ccbe41e
Author: Hisoka <[email protected]>
AuthorDate: Fri Sep 23 17:04:29 2022 +0800

    [Connector-V2] [Clickhouse] Fix Clickhouse Type Mapping and Spark Map 
reconvert Bug (#2767)
---
 docs/en/connector-v2/sink/Clickhouse.md              |  5 +++++
 .../clickhouse/sink/client/ClickhouseSinkWriter.java | 20 +++++++++++---------
 .../sink/inject/DateTimeInjectFunction.java          |  9 ++++++++-
 ...imeInjectFunction.java => MapInjectFunction.java} | 15 +++++++--------
 .../seatunnel/clickhouse/util/TypeConvertUtil.java   | 16 +++++++++++++++-
 .../src/main/resources/examples/spark.batch.conf     |  2 +-
 .../common/serialization/InternalRowConverter.java   |  4 ++--
 7 files changed, 49 insertions(+), 22 deletions(-)

diff --git a/docs/en/connector-v2/sink/Clickhouse.md 
b/docs/en/connector-v2/sink/Clickhouse.md
index 02ac2246e..32ee3b5f8 100644
--- a/docs/en/connector-v2/sink/Clickhouse.md
+++ b/docs/en/connector-v2/sink/Clickhouse.md
@@ -29,6 +29,7 @@ Write data to Clickhouse can also be done using JDBC
 | table          | string | yes      | -             |
 | username       | string | yes      | -             |
 | password       | string | yes      | -             |
+| fields         | string | yes      | -             |
 | clickhouse.*   | string | no       |               |
 | bulk_size      | string | no       | 20000         |
 | split_mode     | string | no       | false         |
@@ -55,6 +56,10 @@ The table name
 
 `ClickHouse` user password
 
+### fields [array]
+
+The data field that needs to be output to `ClickHouse` , if not configured, it 
will be automatically adapted according to the sink table `schema` .
+
 ### clickhouse [string]
 
 In addition to the above mandatory parameters that must be specified by 
`clickhouse-jdbc` , users can also specify multiple optional parameters, which 
cover all the 
[parameters](https://github.com/ClickHouse/clickhouse-jdbc/tree/master/clickhouse-client#configuration)
 provided by `clickhouse-jdbc` .
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
index 6e2a4ceb1..cc38ca0c0 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
@@ -31,6 +31,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.DoubleIn
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.FloatInjectFunction;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.IntInjectFunction;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.LongInjectFunction;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.MapInjectFunction;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.StringInjectFunction;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
@@ -189,15 +190,16 @@ public class ClickhouseSinkWriter implements 
SinkWriter<SeaTunnelRow, CKCommitIn
     private Map<String, ClickhouseFieldInjectFunction> 
initFieldInjectFunctionMap() {
         Map<String, ClickhouseFieldInjectFunction> result = new 
HashMap<>(Common.COLLECTION_SIZE);
         List<ClickhouseFieldInjectFunction> clickhouseFieldInjectFunctions = 
Lists.newArrayList(
-                new ArrayInjectFunction(),
-                new BigDecimalInjectFunction(),
-                new DateInjectFunction(),
-                new DateTimeInjectFunction(),
-                new LongInjectFunction(),
-                new DoubleInjectFunction(),
-                new FloatInjectFunction(),
-                new IntInjectFunction(),
-                new StringInjectFunction()
+            new ArrayInjectFunction(),
+            new MapInjectFunction(),
+            new BigDecimalInjectFunction(),
+            new DateInjectFunction(),
+            new DateTimeInjectFunction(),
+            new LongInjectFunction(),
+            new DoubleInjectFunction(),
+            new FloatInjectFunction(),
+            new IntInjectFunction(),
+            new StringInjectFunction()
         );
         ClickhouseFieldInjectFunction defaultFunction = new 
StringInjectFunction();
         // get field type
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DateTimeInjectFunction.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DateTimeInjectFunction.java
index b85c56afb..99a6f6bdb 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DateTimeInjectFunction.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DateTimeInjectFunction.java
@@ -20,12 +20,19 @@ package 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.util.regex.Pattern;
 
 public class DateTimeInjectFunction implements ClickhouseFieldInjectFunction {
+
+    private static final Pattern PATTERN = Pattern.compile("(DateTime.*)");
+
     @Override
     public void injectFields(PreparedStatement statement, int index, Object 
value) throws SQLException {
         if (value instanceof Timestamp) {
             statement.setTimestamp(index, (Timestamp) value);
+        } else if (value instanceof LocalDateTime) {
+            statement.setObject(index, value);
         } else {
             statement.setTimestamp(index, Timestamp.valueOf(value.toString()));
         }
@@ -33,6 +40,6 @@ public class DateTimeInjectFunction implements 
ClickhouseFieldInjectFunction {
 
     @Override
     public boolean isCurrentFieldType(String fieldType) {
-        return "DateTime".equals(fieldType);
+        return PATTERN.matcher(fieldType).matches();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DateTimeInjectFunction.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/MapInjectFunction.java
similarity index 76%
copy from 
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DateTimeInjectFunction.java
copy to 
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/MapInjectFunction.java
index b85c56afb..8ee82064a 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DateTimeInjectFunction.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/MapInjectFunction.java
@@ -19,20 +19,19 @@ package 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject;
 
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
-import java.sql.Timestamp;
+import java.util.regex.Pattern;
+
+public class MapInjectFunction implements ClickhouseFieldInjectFunction {
+
+    private static final Pattern PATTERN = Pattern.compile("(Map.*)");
 
-public class DateTimeInjectFunction implements ClickhouseFieldInjectFunction {
     @Override
     public void injectFields(PreparedStatement statement, int index, Object 
value) throws SQLException {
-        if (value instanceof Timestamp) {
-            statement.setTimestamp(index, (Timestamp) value);
-        } else {
-            statement.setTimestamp(index, Timestamp.valueOf(value.toString()));
-        }
+        statement.setObject(index, value);
     }
 
     @Override
     public boolean isCurrentFieldType(String fieldType) {
-        return "DateTime".equals(fieldType);
+        return PATTERN.matcher(fieldType).matches();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/TypeConvertUtil.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/TypeConvertUtil.java
index 55234ab24..323df2c3b 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/TypeConvertUtil.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/TypeConvertUtil.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.util;
 
+import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.LocalTimeType;
@@ -27,9 +28,12 @@ import com.clickhouse.client.ClickHouseColumn;
 import com.clickhouse.client.ClickHouseValue;
 
 import java.math.BigDecimal;
+import java.net.Inet4Address;
+import java.net.Inet6Address;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.util.Map;
+import java.util.UUID;
 
 public class TypeConvertUtil {
 
@@ -58,7 +62,15 @@ public class TypeConvertUtil {
         } else if (Double.class.equals(type)) {
             return BasicType.DOUBLE_TYPE;
         } else if (Map.class.equals(type)) {
-            return new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE);
+            return new MapType<>(convert(column.getNestedColumns().get(0)), 
convert(column.getNestedColumns().get(1)));
+        } else if (UUID.class.equals(type)) {
+            return BasicType.STRING_TYPE;
+        } else if (Inet4Address.class.equals(type)) {
+            return BasicType.STRING_TYPE;
+        } else if (Inet6Address.class.equals(type)) {
+            return BasicType.STRING_TYPE;
+        } else if (Object.class.equals(type)) {
+            return BasicType.STRING_TYPE;
         } else {
             // TODO support pojo
             throw new IllegalArgumentException("not supported data type: " + 
column.getDataType());
@@ -91,6 +103,8 @@ public class TypeConvertUtil {
             return record.asDouble();
         } else if (dataType instanceof MapType) {
             return record.asMap();
+        } else if (dataType instanceof ArrayType) {
+            return record.asObject();
         } else {
             // TODO support pojo
             throw new IllegalArgumentException("not supported data type: " + 
dataType);
diff --git 
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
 
b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
index 00b2338c2..e04880dd2 100644
--- 
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
+++ 
b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
@@ -71,7 +71,7 @@ transform {
 
   # you can also use other transform plugins, such as sql
   sql {
-    sql = "select 
c_array,c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_null,c_bytes,c_date,c_timestamp
 from fake"
+    sql = "select 
c_map,c_array,c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_null,c_bytes,c_date,c_timestamp
 from fake"
     result_table_name = "sql"
   }
 
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java
index 70545bcd1..23fb88aaa 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java
@@ -119,7 +119,7 @@ public final class InternalRowConverter extends 
RowConverter<InternalRow> {
         return new SpecificInternalRow(values);
     }
 
-    private static Object convertMap(Map<?, ?> mapData, MapType<?, ?> mapType) 
{
+    private static ArrayBasedMapData convertMap(Map<?, ?> mapData, MapType<?, 
?> mapType) {
         if (mapData == null || mapData.size() == 0) {
             return ArrayBasedMapData.apply(new Object[]{}, new Object[]{});
         }
@@ -132,7 +132,7 @@ public final class InternalRowConverter extends 
RowConverter<InternalRow> {
         return ArrayBasedMapData.apply(keys, values);
     }
 
-    private static Object reconvertMap(MapData mapData, MapType<?, ?> mapType) 
{
+    private static Map<Object, Object> reconvertMap(MapData mapData, 
MapType<?, ?> mapType) {
         if (mapData == null || mapData.numElements() == 0) {
             return Collections.emptyMap();
         }

Reply via email to