This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 19d10815b [cdc] add set type for mysql cdc action (#1065)
19d10815b is described below

commit 19d10815bcb726b8efcd7f14db3d9e4f4032b119
Author: JunZhang <[email protected]>
AuthorDate: Fri May 5 15:39:59 2023 +0800

    [cdc] add set type for mysql cdc action (#1065)
---
 .../java/org/apache/paimon/utils/TypeUtils.java    | 22 ++++++++++++++++++++++
 .../org/apache/paimon/data/DataFormatTestUtil.java | 12 ++++++++++++
 .../cdc/mysql/MySqlSyncTableActionITCase.java      |  8 ++++++--
 .../src/test/resources/mysql/setup.sql             |  7 +++++--
 4 files changed, 45 insertions(+), 4 deletions(-)

diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
index b4ed5bded..b853d4881 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
@@ -20,7 +20,9 @@ package org.apache.paimon.utils;
 
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericArray;
 import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypeChecks;
@@ -29,6 +31,7 @@ import org.apache.paimon.types.DecimalType;
 import org.apache.paimon.types.LocalZonedTimestampType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.VarCharType;
 
 import java.math.BigDecimal;
 import java.time.DateTimeException;
@@ -115,6 +118,25 @@ public class TypeUtils {
             case TIMESTAMP_WITHOUT_TIME_ZONE:
                 TimestampType timestampType = (TimestampType) type;
                 return toTimestamp(str, timestampType.getPrecision());
+            case ARRAY:
+                ArrayType arrayType = (ArrayType) type;
+                DataType elementType = arrayType.getElementType();
+                if (elementType instanceof VarCharType) {
+                    if (s.startsWith("[")) {
+                        s = s.substring(1);
+                    }
+                    if (s.endsWith("]")) {
+                        s = s.substring(0, s.length() - 1);
+                    }
+                    String[] ss = s.split(",");
+                    BinaryString[] binaryStrings = new BinaryString[ss.length];
+                    for (int i = 0; i < ss.length; i++) {
+                        binaryStrings[i] = BinaryString.fromString(ss[i]);
+                    }
+                    return new GenericArray(binaryStrings);
+                } else {
+                    throw new UnsupportedOperationException("Unsupported type 
" + type);
+                }
             default:
                 throw new UnsupportedOperationException("Unsupported type " + 
type);
         }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/data/DataFormatTestUtil.java 
b/paimon-common/src/test/java/org/apache/paimon/data/DataFormatTestUtil.java
index 11a08583e..b8b5f9f6d 100644
--- a/paimon-common/src/test/java/org/apache/paimon/data/DataFormatTestUtil.java
+++ b/paimon-common/src/test/java/org/apache/paimon/data/DataFormatTestUtil.java
@@ -18,6 +18,7 @@
 package org.apache.paimon.data;
 
 import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.StringUtils;
 
@@ -43,6 +44,17 @@ public class DataFormatTestUtil {
                 Object field = fieldGetter.getFieldOrNull(row);
                 if (field instanceof byte[]) {
                     build.append(Arrays.toString((byte[]) field));
+                } else if (field instanceof InternalArray) {
+                    InternalArray internalArray = (InternalArray) field;
+                    ArrayType arrayType = (ArrayType) type.getTypeAt(i);
+                    InternalArray.ElementGetter elementGetter =
+                            
InternalArray.createElementGetter(arrayType.getElementType());
+                    String[] result = new String[internalArray.size()];
+                    for (int j = 0; j < internalArray.size(); j++) {
+                        Object object = 
elementGetter.getElementOrNull(internalArray, j);
+                        result[j] = null == object ? null : object.toString();
+                    }
+                    build.append(Arrays.toString(result));
                 } else {
                     build.append(field);
                 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index db8a93c21..9a5b21f44 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -487,7 +487,8 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                             DataTypes.STRING(), // _multipoint
                             DataTypes.STRING(), // _multiline
                             DataTypes.STRING(), // _multipolygon
-                            DataTypes.STRING() // _geometrycollection
+                            DataTypes.STRING(), // _geometrycollection
+                            DataTypes.ARRAY(DataTypes.STRING()) // _set
                         },
                         new String[] {
                             "_id",
@@ -564,6 +565,7 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                             "_multiline",
                             "_multipolygon",
                             "_geometrycollection",
+                            "_set",
                         });
         FileStoreTable table = getFileStoreTable();
         List<String> expected =
@@ -609,7 +611,8 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                                 + 
"{\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0}, "
                                 + 
"{\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0},
 "
                                 + 
"{\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0},
 "
-                                + 
"{\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0}"
+                                + 
"{\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0},
 "
+                                + "[a, b]"
                                 + "]",
                         "+I["
                                 + "2, 2.2, "
@@ -642,6 +645,7 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                                 + "NULL, "
                                 + "NULL, "
                                 + "NULL, "
+                                + "NULL, "
                                 + "NULL"
                                 + "]");
         waitForResult(expected, table, rowType, Arrays.asList("pt", "_id"));
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
index a3ee9a113..dc4a6cbe7 100644
--- a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
+++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
@@ -145,6 +145,7 @@ CREATE TABLE all_types_table (
     _multiline  MULTILINESTRING,
     _multipolygon  MULTIPOLYGON,
     _geometrycollection GEOMETRYCOLLECTION,
+    _set SET('a', 'b', 'c', 'd'),
     PRIMARY KEY (_id)
 );
 
@@ -201,8 +202,9 @@ INSERT INTO all_types_table VALUES (
     ST_GeomFromText('POLYGON((1 1, 2 1, 2 2,  1 2, 1 1))'),
     ST_GeomFromText('MULTIPOINT((1 1),(2 2))'),
     ST_GeomFromText('MultiLineString((1 1,2 2,3 3),(4 4,5 5))'),
-    ST_GeomFromText('MULTIPOLYGON(((0 0,10 0,10 10,0 10,0 0)),((5 5,7 5,7 7,5 
7, 5 5)))'),
-    ST_GeomFromText('GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), 
LINESTRING(15 15, 20 20))')
+    ST_GeomFromText('MULTIPOLYGON(((0 0, 10 0, 10 10, 0 10, 0 0)), ((5 5, 7 5, 
7 7, 5 7, 5 5)))'),
+    ST_GeomFromText('GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), 
LINESTRING(15 15, 20 20))'),
+    'a,b'
 ), (
     2, 2.2,
     NULL, NULL, NULL, NULL, NULL, NULL,
@@ -234,6 +236,7 @@ INSERT INTO all_types_table VALUES (
     NULL,
     NULL,
     NULL,
+    NULL,
     NULL
 );
 

Reply via email to