This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 021af147cc [Feature] [Postgre CDC]support array type (#8560)
021af147cc is described below
commit 021af147cce6c90593406096e9ce2e0d96921d40
Author: litiliu <[email protected]>
AuthorDate: Tue Jan 21 11:04:10 2025 +0800
[Feature] [Postgre CDC]support array type (#8560)
Co-authored-by: litiliu <[email protected]>
---
...TunnelRowDebeziumDeserializationConverters.java | 41 +++++++++++++++++
...elRowDebeziumDeserializationConvertersTest.java | 52 ++++++++++++++++++++++
2 files changed, 93 insertions(+)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConverters.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConverters.java
index 89b9c50c30..227d2b7eee 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConverters.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConverters.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.connectors.cdc.debezium.row;
+import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -48,6 +51,7 @@ import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.util.Arrays;
+import java.util.List;
import java.util.Optional;
/** Deserialization schema from Debezium object to {@link SeaTunnelRow} */
@@ -173,12 +177,49 @@ public class
SeaTunnelRowDebeziumDeserializationConverters implements Serializab
return createRowConverter(
(SeaTunnelRowType) type, serverTimeZone,
userDefinedConverterFactory);
case ARRAY:
+ return createArrayConverter(type);
case MAP:
default:
throw new UnsupportedOperationException("Unsupported type: " +
type);
}
}
+ @VisibleForTesting
+ protected static DebeziumDeserializationConverter createArrayConverter(
+ SeaTunnelDataType<?> type) {
+ SeaTunnelDataType elementType = ((ArrayType) type).getElementType();
+ switch (elementType.getSqlType()) {
+ case BOOLEAN:
+ return (dbzObj, schema) ->
+ convertListToArray((List<Boolean>) dbzObj,
Boolean.class);
+ case SMALLINT:
+ return (dbzObj, schema) -> convertListToArray((List<Short>)
dbzObj, Short.class);
+ case INT:
+ return (dbzObj, schema) ->
+ convertListToArray((List<Integer>) dbzObj,
Integer.class);
+ case BIGINT:
+ return (dbzObj, schema) -> convertListToArray((List<Long>)
dbzObj, Long.class);
+ case FLOAT:
+ return (dbzObj, schema) -> convertListToArray((List<Float>)
dbzObj, Float.class);
+ case DOUBLE:
+ return (dbzObj, schema) -> convertListToArray((List<Double>)
dbzObj, Double.class);
+ case STRING:
+ return (dbzObj, schema) -> convertListToArray((List<String>)
dbzObj, String.class);
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported SQL type: " + elementType.getSqlType());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> T[] convertListToArray(List<T> list, Class<T> clazz) {
+ T[] array = (T[]) java.lang.reflect.Array.newInstance(clazz,
list.size());
+ for (int i = 0; i < list.size(); i++) {
+ array[i] = list.get(i);
+ }
+ return array;
+ }
+
private static DebeziumDeserializationConverter convertToBoolean() {
return new DebeziumDeserializationConverter() {
private static final long serialVersionUID = 1L;
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConvertersTest.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConvertersTest.java
index 74e832d6e0..14098cecc9 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConvertersTest.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConvertersTest.java
@@ -17,10 +17,12 @@
package org.apache.seatunnel.connectors.cdc.debezium.row;
+import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationConverter;
import
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationConverterFactory;
import org.apache.seatunnel.connectors.cdc.debezium.MetadataConverter;
@@ -34,6 +36,7 @@ import org.junit.jupiter.api.Test;
import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
public class SeaTunnelRowDebeziumDeserializationConvertersTest {
@@ -75,4 +78,53 @@ public class
SeaTunnelRowDebeziumDeserializationConvertersTest {
Assertions.assertEquals(row.getField(0), 1);
Assertions.assertNull(row.getField(1));
}
+
+ @Test
+ void testArrayConverter() throws Exception {
+ DebeziumDeserializationConverter converter;
+ // bool array converter
+ converter =
+
SeaTunnelRowDebeziumDeserializationConverters.createArrayConverter(
+ ArrayType.BOOLEAN_ARRAY_TYPE);
+ Boolean[] booleans = new Boolean[] {false, true};
+ Assertions.assertTrue(
+ Arrays.equals(
+ booleans, (Boolean[])
(converter.convert(Arrays.asList(booleans), null))));
+ // smallInt array converter
+ converter =
+
SeaTunnelRowDebeziumDeserializationConverters.createArrayConverter(
+ ArrayType.SHORT_ARRAY_TYPE);
+ Short[] shorts = new Short[] {(short) 1, (short) 2};
+ Assertions.assertTrue(
+ Arrays.equals(shorts, (Short[])
(converter.convert(Arrays.asList(shorts), null))));
+ // int array converter
+ converter =
+
SeaTunnelRowDebeziumDeserializationConverters.createArrayConverter(
+ ArrayType.INT_ARRAY_TYPE);
+ Integer[] ints = new Integer[] {1, 2};
+ Assertions.assertTrue(
+ Arrays.equals(ints, (Integer[])
(converter.convert(Arrays.asList(ints), null))));
+ // long array converter
+ converter =
+
SeaTunnelRowDebeziumDeserializationConverters.createArrayConverter(
+ ArrayType.LONG_ARRAY_TYPE);
+ Long[] longs = new Long[] {1L, 2L};
+ Assertions.assertTrue(
+ Arrays.equals(longs, (Long[])
(converter.convert(Arrays.asList(longs), null))));
+ // float array converter
+ converter =
+
SeaTunnelRowDebeziumDeserializationConverters.createArrayConverter(
+ ArrayType.FLOAT_ARRAY_TYPE);
+ Float[] floats = new Float[] {1.0f, 2.0f};
+ Assertions.assertTrue(
+ Arrays.equals(floats, (Float[])
(converter.convert(Arrays.asList(floats), null))));
+ // double array converter
+ converter =
+
SeaTunnelRowDebeziumDeserializationConverters.createArrayConverter(
+ ArrayType.DOUBLE_ARRAY_TYPE);
+ Double[] doubles = new Double[] {1.0, 2.0};
+ Assertions.assertTrue(
+ Arrays.equals(
+ doubles, (Double[])
(converter.convert(Arrays.asList(doubles), null))));
+ }
}