This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f0a1f5013 [Connector-V2] [Clickhouse] Fix Clickhouse Type Mapping and
Spark Map reconvert Bug (#2767)
f0a1f5013 is described below
commit f0a1f5013a9c68331f5c6b28c34d4b207ccbe41e
Author: Hisoka <[email protected]>
AuthorDate: Fri Sep 23 17:04:29 2022 +0800
[Connector-V2] [Clickhouse] Fix Clickhouse Type Mapping and Spark Map
reconvert Bug (#2767)
---
docs/en/connector-v2/sink/Clickhouse.md | 5 +++++
.../clickhouse/sink/client/ClickhouseSinkWriter.java | 20 +++++++++++---------
.../sink/inject/DateTimeInjectFunction.java | 9 ++++++++-
...imeInjectFunction.java => MapInjectFunction.java} | 15 +++++++--------
.../seatunnel/clickhouse/util/TypeConvertUtil.java | 16 +++++++++++++++-
.../src/main/resources/examples/spark.batch.conf | 2 +-
.../common/serialization/InternalRowConverter.java | 4 ++--
7 files changed, 49 insertions(+), 22 deletions(-)
diff --git a/docs/en/connector-v2/sink/Clickhouse.md
b/docs/en/connector-v2/sink/Clickhouse.md
index 02ac2246e..32ee3b5f8 100644
--- a/docs/en/connector-v2/sink/Clickhouse.md
+++ b/docs/en/connector-v2/sink/Clickhouse.md
@@ -29,6 +29,7 @@ Write data to Clickhouse can also be done using JDBC
| table | string | yes | - |
| username | string | yes | - |
| password | string | yes | - |
+| fields | string | yes | - |
| clickhouse.* | string | no | |
| bulk_size | string | no | 20000 |
| split_mode | string | no | false |
@@ -55,6 +56,10 @@ The table name
`ClickHouse` user password
+### fields [array]
+
+The data field that needs to be output to `ClickHouse` , if not configured, it
will be automatically adapted according to the sink table `schema` .
+
### clickhouse [string]
In addition to the above mandatory parameters that must be specified by
`clickhouse-jdbc` , users can also specify multiple optional parameters, which
cover all the
[parameters](https://github.com/ClickHouse/clickhouse-jdbc/tree/master/clickhouse-client#configuration)
provided by `clickhouse-jdbc` .
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
index 6e2a4ceb1..cc38ca0c0 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
@@ -31,6 +31,7 @@ import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.DoubleIn
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.FloatInjectFunction;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.IntInjectFunction;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.LongInjectFunction;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.MapInjectFunction;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.StringInjectFunction;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
@@ -189,15 +190,16 @@ public class ClickhouseSinkWriter implements
SinkWriter<SeaTunnelRow, CKCommitIn
private Map<String, ClickhouseFieldInjectFunction>
initFieldInjectFunctionMap() {
Map<String, ClickhouseFieldInjectFunction> result = new
HashMap<>(Common.COLLECTION_SIZE);
List<ClickhouseFieldInjectFunction> clickhouseFieldInjectFunctions =
Lists.newArrayList(
- new ArrayInjectFunction(),
- new BigDecimalInjectFunction(),
- new DateInjectFunction(),
- new DateTimeInjectFunction(),
- new LongInjectFunction(),
- new DoubleInjectFunction(),
- new FloatInjectFunction(),
- new IntInjectFunction(),
- new StringInjectFunction()
+ new ArrayInjectFunction(),
+ new MapInjectFunction(),
+ new BigDecimalInjectFunction(),
+ new DateInjectFunction(),
+ new DateTimeInjectFunction(),
+ new LongInjectFunction(),
+ new DoubleInjectFunction(),
+ new FloatInjectFunction(),
+ new IntInjectFunction(),
+ new StringInjectFunction()
);
ClickhouseFieldInjectFunction defaultFunction = new
StringInjectFunction();
// get field type
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DateTimeInjectFunction.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DateTimeInjectFunction.java
index b85c56afb..99a6f6bdb 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DateTimeInjectFunction.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DateTimeInjectFunction.java
@@ -20,12 +20,19 @@ package
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.util.regex.Pattern;
public class DateTimeInjectFunction implements ClickhouseFieldInjectFunction {
+
+ private static final Pattern PATTERN = Pattern.compile("(DateTime.*)");
+
@Override
public void injectFields(PreparedStatement statement, int index, Object
value) throws SQLException {
if (value instanceof Timestamp) {
statement.setTimestamp(index, (Timestamp) value);
+ } else if (value instanceof LocalDateTime) {
+ statement.setObject(index, value);
} else {
statement.setTimestamp(index, Timestamp.valueOf(value.toString()));
}
@@ -33,6 +40,6 @@ public class DateTimeInjectFunction implements
ClickhouseFieldInjectFunction {
@Override
public boolean isCurrentFieldType(String fieldType) {
- return "DateTime".equals(fieldType);
+ return PATTERN.matcher(fieldType).matches();
}
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DateTimeInjectFunction.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/MapInjectFunction.java
similarity index 76%
copy from
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DateTimeInjectFunction.java
copy to
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/MapInjectFunction.java
index b85c56afb..8ee82064a 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DateTimeInjectFunction.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/MapInjectFunction.java
@@ -19,20 +19,19 @@ package
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject;
import java.sql.PreparedStatement;
import java.sql.SQLException;
-import java.sql.Timestamp;
+import java.util.regex.Pattern;
+
+public class MapInjectFunction implements ClickhouseFieldInjectFunction {
+
+ private static final Pattern PATTERN = Pattern.compile("(Map.*)");
-public class DateTimeInjectFunction implements ClickhouseFieldInjectFunction {
@Override
public void injectFields(PreparedStatement statement, int index, Object
value) throws SQLException {
- if (value instanceof Timestamp) {
- statement.setTimestamp(index, (Timestamp) value);
- } else {
- statement.setTimestamp(index, Timestamp.valueOf(value.toString()));
- }
+ statement.setObject(index, value);
}
@Override
public boolean isCurrentFieldType(String fieldType) {
- return "DateTime".equals(fieldType);
+ return PATTERN.matcher(fieldType).matches();
}
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/TypeConvertUtil.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/TypeConvertUtil.java
index 55234ab24..323df2c3b 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/TypeConvertUtil.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/TypeConvertUtil.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.util;
+import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
@@ -27,9 +28,12 @@ import com.clickhouse.client.ClickHouseColumn;
import com.clickhouse.client.ClickHouseValue;
import java.math.BigDecimal;
+import java.net.Inet4Address;
+import java.net.Inet6Address;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Map;
+import java.util.UUID;
public class TypeConvertUtil {
@@ -58,7 +62,15 @@ public class TypeConvertUtil {
} else if (Double.class.equals(type)) {
return BasicType.DOUBLE_TYPE;
} else if (Map.class.equals(type)) {
- return new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE);
+ return new MapType<>(convert(column.getNestedColumns().get(0)),
convert(column.getNestedColumns().get(1)));
+ } else if (UUID.class.equals(type)) {
+ return BasicType.STRING_TYPE;
+ } else if (Inet4Address.class.equals(type)) {
+ return BasicType.STRING_TYPE;
+ } else if (Inet6Address.class.equals(type)) {
+ return BasicType.STRING_TYPE;
+ } else if (Object.class.equals(type)) {
+ return BasicType.STRING_TYPE;
} else {
// TODO support pojo
throw new IllegalArgumentException("not supported data type: " +
column.getDataType());
@@ -91,6 +103,8 @@ public class TypeConvertUtil {
return record.asDouble();
} else if (dataType instanceof MapType) {
return record.asMap();
+ } else if (dataType instanceof ArrayType) {
+ return record.asObject();
} else {
// TODO support pojo
throw new IllegalArgumentException("not supported data type: " +
dataType);
diff --git
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
index 00b2338c2..e04880dd2 100644
---
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
+++
b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
@@ -71,7 +71,7 @@ transform {
# you can also use other transform plugins, such as sql
sql {
- sql = "select
c_array,c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_null,c_bytes,c_date,c_timestamp
from fake"
+ sql = "select
c_map,c_array,c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_null,c_bytes,c_date,c_timestamp
from fake"
result_table_name = "sql"
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java
index 70545bcd1..23fb88aaa 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java
@@ -119,7 +119,7 @@ public final class InternalRowConverter extends
RowConverter<InternalRow> {
return new SpecificInternalRow(values);
}
- private static Object convertMap(Map<?, ?> mapData, MapType<?, ?> mapType)
{
+ private static ArrayBasedMapData convertMap(Map<?, ?> mapData, MapType<?,
?> mapType) {
if (mapData == null || mapData.size() == 0) {
return ArrayBasedMapData.apply(new Object[]{}, new Object[]{});
}
@@ -132,7 +132,7 @@ public final class InternalRowConverter extends
RowConverter<InternalRow> {
return ArrayBasedMapData.apply(keys, values);
}
- private static Object reconvertMap(MapData mapData, MapType<?, ?> mapType)
{
+ private static Map<Object, Object> reconvertMap(MapData mapData,
MapType<?, ?> mapType) {
if (mapData == null || mapData.numElements() == 0) {
return Collections.emptyMap();
}