This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 19d10815b [cdc] add set type for mysql cdc action (#1065)
19d10815b is described below
commit 19d10815bcb726b8efcd7f14db3d9e4f4032b119
Author: JunZhang <[email protected]>
AuthorDate: Fri May 5 15:39:59 2023 +0800
[cdc] add set type for mysql cdc action (#1065)
---
.../java/org/apache/paimon/utils/TypeUtils.java | 22 ++++++++++++++++++++++
.../org/apache/paimon/data/DataFormatTestUtil.java | 12 ++++++++++++
.../cdc/mysql/MySqlSyncTableActionITCase.java | 8 ++++++--
.../src/test/resources/mysql/setup.sql | 7 +++++--
4 files changed, 45 insertions(+), 4 deletions(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
index b4ed5bded..b853d4881 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
@@ -20,7 +20,9 @@ package org.apache.paimon.utils;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericArray;
import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeChecks;
@@ -29,6 +31,7 @@ import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.VarCharType;
import java.math.BigDecimal;
import java.time.DateTimeException;
@@ -115,6 +118,25 @@ public class TypeUtils {
case TIMESTAMP_WITHOUT_TIME_ZONE:
TimestampType timestampType = (TimestampType) type;
return toTimestamp(str, timestampType.getPrecision());
+ case ARRAY:
+ ArrayType arrayType = (ArrayType) type;
+ DataType elementType = arrayType.getElementType();
+ if (elementType instanceof VarCharType) {
+ if (s.startsWith("[")) {
+ s = s.substring(1);
+ }
+ if (s.endsWith("]")) {
+ s = s.substring(0, s.length() - 1);
+ }
+ String[] ss = s.split(",");
+ BinaryString[] binaryStrings = new BinaryString[ss.length];
+ for (int i = 0; i < ss.length; i++) {
+ binaryStrings[i] = BinaryString.fromString(ss[i]);
+ }
+ return new GenericArray(binaryStrings);
+ } else {
+ throw new UnsupportedOperationException("Unsupported type
" + type);
+ }
default:
throw new UnsupportedOperationException("Unsupported type " +
type);
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/data/DataFormatTestUtil.java
b/paimon-common/src/test/java/org/apache/paimon/data/DataFormatTestUtil.java
index 11a08583e..b8b5f9f6d 100644
--- a/paimon-common/src/test/java/org/apache/paimon/data/DataFormatTestUtil.java
+++ b/paimon-common/src/test/java/org/apache/paimon/data/DataFormatTestUtil.java
@@ -18,6 +18,7 @@
package org.apache.paimon.data;
import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.StringUtils;
@@ -43,6 +44,17 @@ public class DataFormatTestUtil {
Object field = fieldGetter.getFieldOrNull(row);
if (field instanceof byte[]) {
build.append(Arrays.toString((byte[]) field));
+ } else if (field instanceof InternalArray) {
+ InternalArray internalArray = (InternalArray) field;
+ ArrayType arrayType = (ArrayType) type.getTypeAt(i);
+ InternalArray.ElementGetter elementGetter =
+
InternalArray.createElementGetter(arrayType.getElementType());
+ String[] result = new String[internalArray.size()];
+ for (int j = 0; j < internalArray.size(); j++) {
+ Object object =
elementGetter.getElementOrNull(internalArray, j);
+ result[j] = null == object ? null : object.toString();
+ }
+ build.append(Arrays.toString(result));
} else {
build.append(field);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index db8a93c21..9a5b21f44 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -487,7 +487,8 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
DataTypes.STRING(), // _multipoint
DataTypes.STRING(), // _multiline
DataTypes.STRING(), // _multipolygon
- DataTypes.STRING() // _geometrycollection
+ DataTypes.STRING(), // _geometrycollection
+ DataTypes.ARRAY(DataTypes.STRING()) // _set
},
new String[] {
"_id",
@@ -564,6 +565,7 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
"_multiline",
"_multipolygon",
"_geometrycollection",
+ "_set",
});
FileStoreTable table = getFileStoreTable();
List<String> expected =
@@ -609,7 +611,8 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
+
"{\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0}, "
+
"{\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0},
"
+
"{\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0},
"
- +
"{\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0}"
+ +
"{\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0},
"
+ + "[a, b]"
+ "]",
"+I["
+ "2, 2.2, "
@@ -642,6 +645,7 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
+ "NULL, "
+ "NULL, "
+ "NULL, "
+ + "NULL, "
+ "NULL"
+ "]");
waitForResult(expected, table, rowType, Arrays.asList("pt", "_id"));
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
index a3ee9a113..dc4a6cbe7 100644
--- a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
+++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
@@ -145,6 +145,7 @@ CREATE TABLE all_types_table (
_multiline MULTILINESTRING,
_multipolygon MULTIPOLYGON,
_geometrycollection GEOMETRYCOLLECTION,
+ _set SET('a', 'b', 'c', 'd'),
PRIMARY KEY (_id)
);
@@ -201,8 +202,9 @@ INSERT INTO all_types_table VALUES (
ST_GeomFromText('POLYGON((1 1, 2 1, 2 2, 1 2, 1 1))'),
ST_GeomFromText('MULTIPOINT((1 1),(2 2))'),
ST_GeomFromText('MultiLineString((1 1,2 2,3 3),(4 4,5 5))'),
- ST_GeomFromText('MULTIPOLYGON(((0 0,10 0,10 10,0 10,0 0)),((5 5,7 5,7 7,5
7, 5 5)))'),
- ST_GeomFromText('GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30),
LINESTRING(15 15, 20 20))')
+ ST_GeomFromText('MULTIPOLYGON(((0 0, 10 0, 10 10, 0 10, 0 0)), ((5 5, 7 5,
7 7, 5 7, 5 5)))'),
+ ST_GeomFromText('GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30),
LINESTRING(15 15, 20 20))'),
+ 'a,b'
), (
2, 2.2,
NULL, NULL, NULL, NULL, NULL, NULL,
@@ -234,6 +236,7 @@ INSERT INTO all_types_table VALUES (
NULL,
NULL,
NULL,
+ NULL,
NULL
);