This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new f1bdb72  [Feature](type) Support doris array type and  
FlinkSQLarray,map,row type (#122)
f1bdb72 is described below

commit f1bdb72028f5921214d2cd5a303cb9bc0871562a
Author: wudi <[email protected]>
AuthorDate: Tue Mar 14 16:25:28 2023 +0800

    [Feature](type) Support doris array type and  FlinkSQLarray,map,row type 
(#122)
    
    * support doris array type and  flinkSQL map row type
---
 .../converter/DorisRowConverter.java               | 128 +++++++-
 .../apache/doris/flink/serialization/RowBatch.java | 331 ++++++++++-----------
 .../doris/flink/sink/writer/RowDataSerializer.java |   2 +-
 .../doris/flink/DorisSinkArraySQLExample.java      | 129 ++++++++
 .../apache/doris/flink/DorisSourceSinkExample.java | 126 ++++++--
 .../doris/flink/lookup/LookupJoinCdcExample.java   |  80 +++++
 6 files changed, 598 insertions(+), 198 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
index 435c0f3..919c1f4 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
@@ -16,34 +16,53 @@
 // under the License.
 package org.apache.doris.flink.deserialization.converter;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.doris.flink.serialization.RowBatch;
+import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
 import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.binary.BinaryArrayData;
+import org.apache.flink.table.data.binary.BinaryMapData;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
 import java.math.BigDecimal;
 import java.sql.Date;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 public class DorisRowConverter implements Serializable {
 
     private static final long serialVersionUID = 1L;
-    private final DeserializationConverter[] deserializationConverters;
-    private final SerializationConverter[] serializationConverters;
+    private DeserializationConverter[] deserializationConverters;
+    private SerializationConverter[] serializationConverters;
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    public DorisRowConverter(){}
 
     public DorisRowConverter(RowType rowType) {
         checkNotNull(rowType);
@@ -66,6 +85,16 @@ public class DorisRowConverter implements Serializable {
         }
     }
 
+    public DorisRowConverter setExternalConverter(DataType[] dataTypes) {
+        checkNotNull(dataTypes);
+        this.serializationConverters = new 
SerializationConverter[dataTypes.length];
+        for (int i = 0; i < dataTypes.length; i++) {
+            LogicalType logicalType = dataTypes[i].getLogicalType();
+            serializationConverters[i] = 
createNullableExternalConverter(logicalType);
+        }
+        return this;
+    }
+
     /**
      * Convert data retrieved from {@link RowBatch} to  {@link RowData}.
      *
@@ -180,11 +209,12 @@ public class DorisRowConverter implements Serializable {
                 };
             case CHAR:
             case VARCHAR:
-                return val -> StringData.fromString((String) val);
+                return val -> StringData.fromString(val.toString());
             case TIME_WITHOUT_TIME_ZONE:
             case BINARY:
             case VARBINARY:
             case ARRAY:
+                return val -> convertArrayData(((List<?>) val).toArray(), 
type);
             case ROW:
             case MAP:
             case MULTISET:
@@ -234,11 +264,14 @@ public class DorisRowConverter implements Serializable {
                 return (index, val) -> val.getTimestamp(index, 
localP).toTimestamp();
             case TIMESTAMP_WITH_TIME_ZONE:
                 final int zonedP = ((ZonedTimestampType) type).getPrecision();
-            return (index, val) -> val.getTimestamp(index, 
zonedP).toTimestamp();
+                return (index, val) -> val.getTimestamp(index, 
zonedP).toTimestamp();
             case ARRAY:
-            case MULTISET:
+                return (index, val) -> convertArrayData(val.getArray(index), 
type);
             case MAP:
+                return (index, val) -> 
writeValueAsString(convertMapData(val.getMap(index), type));
             case ROW:
+                return (index, val) -> writeValueAsString(convertRowData(val, 
index, type));
+            case MULTISET:
             case STRUCTURED_TYPE:
             case DISTINCT_TYPE:
             case RAW:
@@ -248,4 +281,89 @@ public class DorisRowConverter implements Serializable {
                 throw new UnsupportedOperationException("Unsupported type:" + 
type);
         }
     }
+
+    private ArrayData convertArrayData(Object[] array, LogicalType type) {
+        List<LogicalType> children = type.getChildren();
+        Preconditions.checkState(children.size() > 0, "Failed to obtain the 
item type of array");
+        DeserializationConverter converter = 
createNullableInternalConverter(children.get(0));
+        for (int i = 0; i < array.length; i++) {
+            Object result = converter.deserialize(array[i]);
+            array[i] = result;
+        }
+        GenericArrayData arrayData = new GenericArrayData(array);
+        return arrayData;
+    }
+
+    private List<Object> convertArrayData(ArrayData array, LogicalType type){
+        if(array instanceof GenericArrayData){
+            return Arrays.asList(((GenericArrayData)array).toObjectArray());
+        }
+        if(array instanceof BinaryArrayData){
+            LogicalType elementType = ((ArrayType)type).getElementType();
+            List<Object> values = Arrays.asList(((BinaryArrayData) 
array).toObjectArray(elementType));
+            if(LogicalTypeRoot.DATE.equals(elementType.getTypeRoot())) {
+                return values.stream().map(date -> 
Date.valueOf(LocalDate.ofEpochDay((Integer)date))).collect(Collectors.toList());
+            }
+            if (LogicalTypeRoot.ARRAY.equals(elementType.getTypeRoot())) {
+                return values.stream().map(arr -> 
convertArrayData((ArrayData)arr, elementType)).collect(Collectors.toList());
+            }
+            return values;
+        }
+        throw new UnsupportedOperationException("Unsupported array data: " + 
array.getClass());
+    }
+
+    private Object convertMapData(MapData map, LogicalType type) {
+        Map<Object, Object> result = new HashMap<>();
+        if (map instanceof GenericMapData) {
+            GenericMapData gMap = (GenericMapData)map;
+            for (Object key : 
((GenericArrayData)gMap.keyArray()).toObjectArray()) {
+                result.put(key, gMap.get(key));
+            }
+            return result;
+        }
+        if (map instanceof BinaryMapData) {
+            BinaryMapData bMap = (BinaryMapData)map;
+            LogicalType valueType = ((MapType)type).getValueType();
+            Map<?, ?> javaMap = bMap.toJavaMap(((MapType) type).getKeyType(), 
valueType);
+            for (Map.Entry<?,?> entry : javaMap.entrySet()) {
+                String key = entry.getKey().toString();
+                if (LogicalTypeRoot.MAP.equals(valueType.getTypeRoot())) {
+                    result.put(key, convertMapData((MapData)entry.getValue(), 
valueType));
+                }else if 
(LogicalTypeRoot.DATE.equals(valueType.getTypeRoot())) {
+                    result.put(key, 
Date.valueOf(LocalDate.ofEpochDay((Integer)entry.getValue())).toString());
+                }else if 
(LogicalTypeRoot.ARRAY.equals(valueType.getTypeRoot())) {
+                    result.put(key, 
convertArrayData((ArrayData)entry.getValue(), valueType));
+                }else if(entry.getValue() instanceof TimestampData){
+                    result.put(key, 
((TimestampData)entry.getValue()).toTimestamp().toString());
+                }else{
+                    result.put(key, entry.getValue().toString());
+                }
+            }
+            return result;
+        }
+        throw new UnsupportedOperationException("Unsupported map data: " + 
map.getClass());
+    }
+
+    private Object convertRowData(RowData val, int index, LogicalType type) {
+        RowType rowType = (RowType)type;
+        Map<String, Object> value = new HashMap<>();
+        RowData row = val.getRow(index, rowType.getFieldCount());
+
+        List<RowType.RowField> fields = rowType.getFields();
+        for(int i = 0;i< fields.size(); i++){
+            RowType.RowField rowField = fields.get(i);
+            SerializationConverter converter = 
createNullableExternalConverter(rowField.getType());
+            Object valTmp = converter.serialize(i, row);
+            value.put(rowField.getName(), valTmp.toString());
+        }
+        return value;
+    }
+
+    private String writeValueAsString(Object object){
+        try {
+            return objectMapper.writeValueAsString(object);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
index 8ab9a55..76e98e0 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
@@ -30,6 +30,7 @@ import org.apache.arrow.vector.TinyIntVector;
 import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.arrow.vector.VarCharVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
 import org.apache.arrow.vector.ipc.ArrowStreamReader;
 import org.apache.arrow.vector.types.Types;
 import org.apache.doris.flink.exception.DorisException;
@@ -82,10 +83,11 @@ public class RowBatch {
     private List<FieldVector> fieldVectors;
     private RootAllocator rootAllocator;
     private final Schema schema;
-
-    private final DateTimeFormatter dateTimeFormatter = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+    private final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
+    private final String DATETIMEV2_PATTERN = "yyyy-MM-dd HH:mm:ss.SSSSSS";
+    private final DateTimeFormatter dateTimeFormatter = 
DateTimeFormatter.ofPattern(DATETIME_PATTERN);
+    private final DateTimeFormatter dateTimeV2Formatter = 
DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN);
     private final DateTimeFormatter dateFormatter = 
DateTimeFormatter.ofPattern("yyyy-MM-dd");
-    private final DateTimeFormatter dateTimeV2Formatter = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
 
     public List<Row> getRowBatch() {
         return rowBatch;
@@ -152,171 +154,12 @@ public class RowBatch {
     public void convertArrowToRowBatch() throws DorisException {
         try {
             for (int col = 0; col < fieldVectors.size(); col++) {
-                FieldVector curFieldVector = fieldVectors.get(col);
-                Types.MinorType mt = curFieldVector.getMinorType();
-
+                FieldVector fieldVector = fieldVectors.get(col);
+                Types.MinorType minorType = fieldVector.getMinorType();
                 final String currentType = schema.get(col).getType();
-                switch (currentType) {
-                    case "NULL_TYPE":
-                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
-                            addValueToRow(rowIndex, null);
-                        }
-                        break;
-                    case "BOOLEAN":
-                        
Preconditions.checkArgument(mt.equals(Types.MinorType.BIT),
-                                typeMismatchMessage(currentType, mt));
-                        BitVector bitVector = (BitVector) curFieldVector;
-                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
-                            Object fieldValue = bitVector.isNull(rowIndex) ? 
null : bitVector.get(rowIndex) != 0;
-                            addValueToRow(rowIndex, fieldValue);
-                        }
-                        break;
-                    case "TINYINT":
-                        
Preconditions.checkArgument(mt.equals(Types.MinorType.TINYINT),
-                                typeMismatchMessage(currentType, mt));
-                        TinyIntVector tinyIntVector = (TinyIntVector) 
curFieldVector;
-                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
-                            Object fieldValue = tinyIntVector.isNull(rowIndex) 
? null : tinyIntVector.get(rowIndex);
-                            addValueToRow(rowIndex, fieldValue);
-                        }
-                        break;
-                    case "SMALLINT":
-                        
Preconditions.checkArgument(mt.equals(Types.MinorType.SMALLINT),
-                                typeMismatchMessage(currentType, mt));
-                        SmallIntVector smallIntVector = (SmallIntVector) 
curFieldVector;
-                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
-                            Object fieldValue = 
smallIntVector.isNull(rowIndex) ? null : smallIntVector.get(rowIndex);
-                            addValueToRow(rowIndex, fieldValue);
-                        }
-                        break;
-                    case "INT":
-                        
Preconditions.checkArgument(mt.equals(Types.MinorType.INT),
-                                typeMismatchMessage(currentType, mt));
-                        IntVector intVector = (IntVector) curFieldVector;
-                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
-                            Object fieldValue = intVector.isNull(rowIndex) ? 
null : intVector.get(rowIndex);
-                            addValueToRow(rowIndex, fieldValue);
-                        }
-                        break;
-                    case "BIGINT":
-
-                        
Preconditions.checkArgument(mt.equals(Types.MinorType.BIGINT),
-                                typeMismatchMessage(currentType, mt));
-                        BigIntVector bigIntVector = (BigIntVector) 
curFieldVector;
-                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
-                            Object fieldValue = bigIntVector.isNull(rowIndex) 
? null : bigIntVector.get(rowIndex);
-                            addValueToRow(rowIndex, fieldValue);
-                        }
-                        break;
-                    case "FLOAT":
-                        
Preconditions.checkArgument(mt.equals(Types.MinorType.FLOAT4),
-                                typeMismatchMessage(currentType, mt));
-                        Float4Vector float4Vector = (Float4Vector) 
curFieldVector;
-                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
-                            Object fieldValue = float4Vector.isNull(rowIndex) 
? null : float4Vector.get(rowIndex);
-                            addValueToRow(rowIndex, fieldValue);
-                        }
-                        break;
-                    case "TIME":
-                    case "DOUBLE":
-                        
Preconditions.checkArgument(mt.equals(Types.MinorType.FLOAT8),
-                                typeMismatchMessage(currentType, mt));
-                        Float8Vector float8Vector = (Float8Vector) 
curFieldVector;
-                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
-                            Object fieldValue = float8Vector.isNull(rowIndex) 
? null : float8Vector.get(rowIndex);
-                            addValueToRow(rowIndex, fieldValue);
-                        }
-                        break;
-                    case "BINARY":
-                        
Preconditions.checkArgument(mt.equals(Types.MinorType.VARBINARY),
-                                typeMismatchMessage(currentType, mt));
-                        VarBinaryVector varBinaryVector = (VarBinaryVector) 
curFieldVector;
-                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
-                            Object fieldValue = 
varBinaryVector.isNull(rowIndex) ? null : varBinaryVector.get(rowIndex);
-                            addValueToRow(rowIndex, fieldValue);
-                        }
-                        break;
-                    case "DECIMAL":
-                    case "DECIMALV2":
-                    case "DECIMAL32":
-                    case "DECIMAL64":
-                    case "DECIMAL128I":
-                        
Preconditions.checkArgument(mt.equals(Types.MinorType.DECIMAL),
-                                typeMismatchMessage(currentType, mt));
-                        DecimalVector decimalVector = (DecimalVector) 
curFieldVector;
-                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
-                            if (decimalVector.isNull(rowIndex)) {
-                                addValueToRow(rowIndex, null);
-                                continue;
-                            }
-                            BigDecimal value = 
decimalVector.getObject(rowIndex).stripTrailingZeros();
-                            addValueToRow(rowIndex, value);
-                        }
-                        break;
-                    case "DATE":
-                    case "DATEV2":
-                        
Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR),
-                                typeMismatchMessage(currentType, mt));
-                        VarCharVector date = (VarCharVector) curFieldVector;
-                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
-                            if (date.isNull(rowIndex)) {
-                                addValueToRow(rowIndex, null);
-                                continue;
-                            }
-                            String value = new String(date.get(rowIndex));
-                            LocalDate localDate = LocalDate.parse(value, 
dateFormatter);
-                            addValueToRow(rowIndex, localDate);
-                        }
-                        break;
-                    case "DATETIME":
-                        
Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR),
-                                typeMismatchMessage(currentType, mt));
-                        VarCharVector timeStampSecVector = (VarCharVector) 
curFieldVector;
-                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
-                            if (timeStampSecVector.isNull(rowIndex)) {
-                                addValueToRow(rowIndex, null);
-                                continue;
-                            }
-                            String value = new 
String(timeStampSecVector.get(rowIndex));
-                            LocalDateTime parse = LocalDateTime.parse(value, 
dateTimeFormatter);
-                            addValueToRow(rowIndex, parse);
-                        }
-                        break;
-                    case "DATETIMEV2":
-                        
Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR),
-                                typeMismatchMessage(currentType, mt));
-                        VarCharVector timeStampV2SecVector = (VarCharVector) 
curFieldVector;
-                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
-                            if (timeStampV2SecVector.isNull(rowIndex)) {
-                                addValueToRow(rowIndex, null);
-                                continue;
-                            }
-                            String value = new 
String(timeStampV2SecVector.get(rowIndex));
-                            LocalDateTime parse = LocalDateTime.parse(value, 
dateTimeV2Formatter);
-                            addValueToRow(rowIndex, parse);
-                        }
-                        break;
-                    case "LARGEINT":
-                    case "CHAR":
-                    case "VARCHAR":
-                    case "STRING":
-                    case "JSONB":
-                        
Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR),
-                                typeMismatchMessage(currentType, mt));
-                        VarCharVector varCharVector = (VarCharVector) 
curFieldVector;
-                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
-                            if (varCharVector.isNull(rowIndex)) {
-                                addValueToRow(rowIndex, null);
-                                continue;
-                            }
-                            String value = new 
String(varCharVector.get(rowIndex));
-                            addValueToRow(rowIndex, value);
-                        }
-                        break;
-                    default:
-                        String errMsg = "Unsupported type " + 
schema.get(col).getType();
-                        logger.error(errMsg);
-                        throw new DorisException(errMsg);
+                for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
+                    boolean passed = doConvert(col, rowIndex, minorType, 
currentType, fieldVector);
+                    Preconditions.checkArgument(passed, 
typeMismatchMessage(currentType, minorType));
                 }
             }
         } catch (Exception e) {
@@ -325,6 +168,156 @@ public class RowBatch {
         }
     }
 
+    private boolean doConvert(int col,
+                              int rowIndex,
+                              Types.MinorType minorType,
+                              String currentType,
+                              FieldVector fieldVector) throws DorisException {
+        switch (currentType) {
+            case "NULL_TYPE":
+                break;
+            case "BOOLEAN":
+                if (!minorType.equals(Types.MinorType.BIT)) return false;
+                BitVector bitVector = (BitVector) fieldVector;
+                Object fieldValue = bitVector.isNull(rowIndex) ? null : 
bitVector.get(rowIndex) != 0;
+                addValueToRow(rowIndex, fieldValue);
+                break;
+            case "TINYINT":
+                if (!minorType.equals(Types.MinorType.TINYINT)) return false;
+                TinyIntVector tinyIntVector = (TinyIntVector) fieldVector;
+                fieldValue = tinyIntVector.isNull(rowIndex) ? null : 
tinyIntVector.get(rowIndex);
+                addValueToRow(rowIndex, fieldValue);
+                break;
+            case "SMALLINT":
+                if (!minorType.equals(Types.MinorType.SMALLINT)) return false;
+                SmallIntVector smallIntVector = (SmallIntVector) fieldVector;
+                fieldValue = smallIntVector.isNull(rowIndex) ? null : 
smallIntVector.get(rowIndex);
+                addValueToRow(rowIndex, fieldValue);
+                break;
+            case "INT":
+                if (!minorType.equals(Types.MinorType.INT)) return false;
+                IntVector intVector = (IntVector) fieldVector;
+                fieldValue = intVector.isNull(rowIndex) ? null : 
intVector.get(rowIndex);
+                addValueToRow(rowIndex, fieldValue);
+                break;
+            case "BIGINT":
+                if (!minorType.equals(Types.MinorType.BIGINT)) return false;
+                BigIntVector bigIntVector = (BigIntVector) fieldVector;
+                fieldValue = bigIntVector.isNull(rowIndex) ? null : 
bigIntVector.get(rowIndex);
+                addValueToRow(rowIndex, fieldValue);
+                break;
+            case "FLOAT":
+                if (!minorType.equals(Types.MinorType.FLOAT4)) return false;
+                Float4Vector float4Vector = (Float4Vector) fieldVector;
+                fieldValue = float4Vector.isNull(rowIndex) ? null : 
float4Vector.get(rowIndex);
+                addValueToRow(rowIndex, fieldValue);
+                break;
+            case "TIME":
+            case "DOUBLE":
+                if (!minorType.equals(Types.MinorType.FLOAT8)) return false;
+                Float8Vector float8Vector = (Float8Vector) fieldVector;
+                fieldValue = float8Vector.isNull(rowIndex) ? null : 
float8Vector.get(rowIndex);
+                addValueToRow(rowIndex, fieldValue);
+                break;
+            case "BINARY":
+                if (!minorType.equals(Types.MinorType.VARBINARY)) return false;
+
+                VarBinaryVector varBinaryVector = (VarBinaryVector) 
fieldVector;
+                fieldValue = varBinaryVector.isNull(rowIndex) ? null : 
varBinaryVector.get(rowIndex);
+                addValueToRow(rowIndex, fieldValue);
+                break;
+            case "DECIMAL":
+            case "DECIMALV2":
+            case "DECIMAL32":
+            case "DECIMAL64":
+            case "DECIMAL128I":
+                if (!minorType.equals(Types.MinorType.DECIMAL)) return false;
+                DecimalVector decimalVector = (DecimalVector) fieldVector;
+                if (decimalVector.isNull(rowIndex)) {
+                    addValueToRow(rowIndex, null);
+                    break;
+                }
+                BigDecimal value = 
decimalVector.getObject(rowIndex).stripTrailingZeros();
+                addValueToRow(rowIndex, value);
+                break;
+            case "DATE":
+            case "DATEV2":
+                if (!minorType.equals(Types.MinorType.VARCHAR)) return false;
+                VarCharVector date = (VarCharVector) fieldVector;
+                if (date.isNull(rowIndex)) {
+                    addValueToRow(rowIndex, null);
+                    break;
+                }
+                String stringValue = new String(date.get(rowIndex));
+                LocalDate localDate = LocalDate.parse(stringValue, 
dateFormatter);
+                addValueToRow(rowIndex, localDate);
+                break;
+            case "DATETIME":
+                if (!minorType.equals(Types.MinorType.VARCHAR)) return false;
+                VarCharVector timeStampSecVector = (VarCharVector) fieldVector;
+                if (timeStampSecVector.isNull(rowIndex)) {
+                    addValueToRow(rowIndex, null);
+                    break;
+                }
+                stringValue = new String(timeStampSecVector.get(rowIndex));
+                LocalDateTime parse = LocalDateTime.parse(stringValue, 
dateTimeFormatter);
+                addValueToRow(rowIndex, parse);
+                break;
+            case "DATETIMEV2":
+                if (!minorType.equals(Types.MinorType.VARCHAR)) return false;
+                VarCharVector timeStampV2SecVector = (VarCharVector) 
fieldVector;
+                if (timeStampV2SecVector.isNull(rowIndex)) {
+                    addValueToRow(rowIndex, null);
+                    break;
+                }
+                stringValue = new String(timeStampV2SecVector.get(rowIndex));
+                stringValue = completeMilliseconds(stringValue);
+                parse = LocalDateTime.parse(stringValue, dateTimeV2Formatter);
+                addValueToRow(rowIndex, parse);
+                break;
+            case "LARGEINT":
+            case "CHAR":
+            case "VARCHAR":
+            case "STRING":
+            case "JSONB":
+                if (!minorType.equals(Types.MinorType.VARCHAR)) return false;
+                VarCharVector varCharVector = (VarCharVector) fieldVector;
+                if (varCharVector.isNull(rowIndex)) {
+                    addValueToRow(rowIndex, null);
+                    break;
+                }
+                stringValue = new String(varCharVector.get(rowIndex));
+                addValueToRow(rowIndex, stringValue);
+                break;
+            case "ARRAY":
+                if (!minorType.equals(Types.MinorType.LIST)) return false;
+                ListVector listVector = (ListVector) fieldVector;
+                Object listValue = listVector.isNull(rowIndex) ? null : 
listVector.getObject(rowIndex);
+                //todo: when the subtype of array is date, conversion is 
required
+                addValueToRow(rowIndex, listValue);
+                break;
+            default:
+                String errMsg = "Unsupported type " + 
schema.get(col).getType();
+                logger.error(errMsg);
+                throw new DorisException(errMsg);
+        }
+        return true;
+    }
+
+    private String completeMilliseconds(String stringValue) {
+        if(stringValue.length() == DATETIMEV2_PATTERN.length()){
+            return stringValue;
+        }
+        StringBuilder sb = new StringBuilder(stringValue);
+        if(stringValue.length() == DATETIME_PATTERN.length()){
+            sb.append(".");
+        }
+        while (sb.toString().length() < DATETIMEV2_PATTERN.length()){
+            sb.append(0);
+        }
+        return sb.toString();
+    }
+
     public List<Object> next() {
         if (!hasNext()) {
             String errMsg = "Get row offset:" + offsetInRowBatch + " larger 
than row size: " + readRowCount;
@@ -334,9 +327,9 @@ public class RowBatch {
         return rowBatch.get(offsetInRowBatch++).getCols();
     }
 
-    private String typeMismatchMessage(final String sparkType, final 
Types.MinorType arrowType) {
+    private String typeMismatchMessage(final String flinkType, final 
Types.MinorType arrowType) {
         final String messageTemplate = "FLINK type is %1$s, but arrow type is 
%2$s.";
-        return String.format(messageTemplate, sparkType, arrowType.name());
+        return String.format(messageTemplate, flinkType, arrowType.name());
     }
 
     public int getReadRowCount() {
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java
index e1ec13e..78e34aa 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java
@@ -55,7 +55,7 @@ public class RowDataSerializer implements 
DorisRecordSerializer<RowData> {
         if (JSON.equals(type)) {
             objectMapper = new ObjectMapper();
         }
-        this.rowConverter = new DorisRowConverter(dataTypes);
+        this.rowConverter = new 
DorisRowConverter().setExternalConverter(dataTypes);
     }
 
     @Override
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkArraySQLExample.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkArraySQLExample.java
new file mode 100644
index 0000000..90b7f4c
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkArraySQLExample.java
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+
+import java.util.UUID;
+
+public class DorisSinkArraySQLExample {
+
+    public static void main(String[] args) throws Exception{
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+        tEnv.executeSql("CREATE TABLE source (\n" +
+                "  `id` int,\n" +
+                "  `c_1` array<INT> ,\n" +
+                "  `c_2` array<TINYINT> ,\n" +
+                "  `c_3` array<SMALLINT> ,\n" +
+                "  `c_4` array<INT> ,\n" +
+                "  `c_5` array<BIGINT> ,\n" +
+                "  `c_6` array<BIGINT> ,\n" +
+                "  `c_7` array<FLOAT>,\n" +
+                "  `c_8` array<DOUBLE> ,\n" +
+                "  `c_9` array<DECIMAL(4,2)> ,\n" +
+                "  `c_10` array<DATE>  ,\n" +
+                "  `c_11` array<DATE>  ,\n" +
+                "  `c_12` array<TIMESTAMP>  ,\n" +
+                "  `c_13` array<TIMESTAMP>  ,\n" +
+                "  `c_14` array<CHAR(10)>  ,\n" +
+                "  `c_15` array<VARCHAR(256)>  ,\n" +
+                "  `c_16` array<STRING>  \n" +
+                ") WITH (\n" +
+                "  'connector' = 'datagen', \n" +
+                "  'fields.c_7.element.min' = '1', \n" +
+                "  'fields.c_7.element.max' = '10', \n" +
+                "  'fields.c_8.element.min' = '1', \n" +
+                "  'fields.c_8.element.max' = '10', \n" +
+                "  'fields.c_14.element.length' = '10', \n" +
+                "  'fields.c_15.element.length' = '10', \n" +
+                "  'fields.c_16.element.length' = '10', \n" +
+                "  'number-of-rows' = '5'  \n" +
+                ");");
+
+
+          tEnv.executeSql("CREATE TABLE source_doris (" +
+                  "  `id` int,\n" +
+                  "  `c_1` array<INT> ,\n" +
+                  "  `c_2` array<TINYINT> ,\n" +
+                  "  `c_3` array<SMALLINT> ,\n" +
+                  "  `c_4` array<INT> ,\n" +
+                  "  `c_5` array<BIGINT> ,\n" +
+                  "  `c_6` array<STRING> ,\n" +
+                  "  `c_7` array<FLOAT> ,\n" +
+                  "  `c_8` array<DOUBLE> ,\n" +
+                  "  `c_9` array<DECIMAL(4,2)> ,\n" +
+                  "  `c_10` array<STRING>  ,\n" + //ARRAY<DATE>
+                  "  `c_11` array<STRING>  ,\n" + //ARRAY<DATE>
+                  "  `c_12` array<STRING>  ,\n" + //ARRAY<TIMESTAMP>
+                  "  `c_13` array<STRING>  ,\n" + //ARRAY<TIMESTAMP>
+                  "  `c_14` array<CHAR(10)>  ,\n" +
+                  "  `c_15` array<VARCHAR(256)>  ,\n" +
+                  "  `c_16` array<STRING>  \n" +
+                ") WITH (" +
+                  "  'connector' = 'doris',\n" +
+                  "  'fenodes' = '127.0.0.1:8030',\n" +
+                  "  'table.identifier' = 'test.array_test_type',\n" +
+                  "  'username' = 'root',\n" +
+                  "  'password' = ''\n" +
+                ")");
+
+
+
+        // define a dynamic aggregating query
+//        final Table result = tEnv.sqlQuery("SELECT * from source_doris  ");
+//
+//        // print the result to the console
+//        tEnv.toRetractStream(result, Row.class).print();
+//        env.execute();
+
+        tEnv.executeSql(
+                "CREATE TABLE sink (" +
+                        "  `id` int,\n" +
+                        "  `c_1` array<INT> ,\n" +
+                        "  `c_2` array<TINYINT> ,\n" +
+                        "  `c_3` array<SMALLINT> ,\n" +
+                        "  `c_4` array<INT> ,\n" +
+                        "  `c_5` array<BIGINT> ,\n" +
+                        "  `c_6` array<STRING> ,\n" +
+                        "  `c_7` array<FLOAT> ,\n" +
+                        "  `c_8` array<DOUBLE> ,\n" +
+                        "  `c_9` array<DECIMAL(4,2)> ,\n" +
+                        "  `c_10` array<STRING>  ,\n" + //ARRAY<DATE>
+                        "  `c_11` array<STRING>  ,\n" + //ARRAY<DATE>
+                        "  `c_12` array<STRING>  ,\n" + //ARRAY<TIMESTAMP>
+                        "  `c_13` array<STRING>  ,\n" + //ARRAY<TIMESTAMP>
+                        "  `c_14` array<CHAR(10)>  ,\n" +
+                        "  `c_15` array<VARCHAR(256)>  ,\n" +
+                        "  `c_16` array<STRING>  \n" +
+                        ") " +
+                        "WITH (\n" +
+                        "  'connector' = 'doris',\n" +
+                        "  'fenodes' = '127.0.0.1:8030',\n" +
+                        "  'table.identifier' = 
'test.array_test_type_sink',\n" +
+                        "  'username' = 'root',\n" +
+                        "  'password' = '',\n" +
+                        "  'sink.label-prefix' = 'doris_label4"  + 
UUID.randomUUID() + "'" +
+                        ")");
+        tEnv.executeSql("INSERT INTO sink select * from source_doris");
+
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
index dae75f0..4622e4e 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
@@ -17,47 +17,127 @@
 
 package org.apache.doris.flink;
 
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+import java.util.UUID;
 
 public class DorisSourceSinkExample {
 
-    public static void main(String[] args) {
-        EnvironmentSettings settings = EnvironmentSettings.newInstance()
-                .inStreamingMode()
-                .build();
-        TableEnvironment tEnv = TableEnvironment.create(settings);
+    public static void main(String[] args) throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
         tEnv.executeSql(
                 "CREATE TABLE doris_test (" +
-                        "name STRING," +
-                        "age INT," +
-                        "price DECIMAL(5,2)," +
-                        "sale DOUBLE" +
+                        "  id int,\n" +
+                        "  c_1 boolean,\n" +
+                        "  c_2 tinyint,\n" +
+                        "  c_3 smallint,\n" +
+                        "  c_4 int,\n" +
+                        "  c_5 bigint,\n" +
+                        "  c_6 bigint,\n" +
+                        "  c_7 float,\n" +
+                        "  c_8 double,\n" +
+                        "  c_9 DECIMAL(4,2),\n" +
+                        "  c_10 DECIMAL(4,1),\n" +
+                        "  c_11 date,\n" +
+                        "  c_12 date,\n" +
+                        "  c_13 timestamp,\n" +
+                        "  c_14 timestamp,\n" +
+                        "  c_15 string,\n" +
+                        "  c_16 string,\n" +
+                        "  c_17 string,\n" +
+                        "  c_18 array<int>,\n" +
+                        "  c_19 Map<String,int>\n" +
+                        ") " +
+                        "WITH (\n" +
+                        "  'connector' = 'datagen', \n" +
+                        "  'fields.c_6.max' = '5', \n" +
+                        "  'fields.c_9.max' = '5', \n" +
+                        "  'fields.c_10.max' = '5', \n" +
+                        "  'fields.c_15.length' = '5', \n" +
+                        "  'fields.c_16.length' = '5', \n" +
+                        "  'fields.c_17.length' = '5', \n" +
+                        "  'fields.c_19.key.length' = '5', \n" +
+                        "  'connector' = 'datagen', \n" +
+                        "  'number-of-rows' = '1'  \n" +
+                        ")");
+
+        final Table result = tEnv.sqlQuery("SELECT * from doris_test  ");
+
+        // print the result to the console
+        tEnv.toRetractStream(result, Row.class).print();
+        env.execute();
+
+        tEnv.executeSql(
+                "CREATE TABLE source_doris (" +
+                        "  id int,\n" +
+                        "  c_1 boolean,\n" +
+                        "  c_2 tinyint,\n" +
+                        "  c_3 smallint,\n" +
+                        "  c_4 int,\n" +
+                        "  c_5 bigint,\n" +
+                        "  c_6 string,\n" +
+                        "  c_7 float,\n" +
+                        "  c_8 double,\n" +
+                        "  c_9 DECIMAL(4,2),\n" +
+                        "  c_10 DECIMAL(4,1),\n" +
+                        "  c_11 date,\n" +
+                        "  c_12 date,\n" +
+                        "  c_13 timestamp,\n" +
+                        "  c_14 timestamp,\n" +
+                        "  c_15 string,\n" +
+                        "  c_16 string,\n" +
+                        "  c_17 string,\n" +
+                        "  c_18 array<int>,\n" +
+                        "  c_19 string\n" +
                         ") " +
                         "WITH (\n" +
                         "  'connector' = 'doris',\n" +
-                        "  'fenodes' = 'FE_IP:8030',\n" +
-                        "  'table.identifier' = 'db.table',\n" +
+                        "  'fenodes' = '127.0.0.1:8030',\n" +
+                        "  'table.identifier' = 'test.test_all_type',\n" +
                         "  'username' = 'root',\n" +
-                        "  'password' = ''" +
+                        "  'password' = ''\n" +
                         ")");
+
         tEnv.executeSql(
                 "CREATE TABLE doris_test_sink (" +
-                        "name STRING," +
-                        "age INT," +
-                        "price DECIMAL(5,2)," +
-                        "sale DOUBLE" +
+                        "  id int,\n" +
+                        "  c_1 boolean,\n" +
+                        "  c_2 tinyint,\n" +
+                        "  c_3 smallint,\n" +
+                        "  c_4 int,\n" +
+                        "  c_5 bigint,\n" +
+                        "  c_6 string,\n" +
+                        "  c_7 float,\n" +
+                        "  c_8 double,\n" +
+                        "  c_9 DECIMAL(4,2),\n" +
+                        "  c_10 DECIMAL(4,1),\n" +
+                        "  c_11 date,\n" +
+                        "  c_12 date,\n" +
+                        "  c_13 timestamp,\n" +
+                        "  c_14 timestamp,\n" +
+                        "  c_15 string,\n" +
+                        "  c_16 string,\n" +
+                        "  c_17 string,\n" +
+                        "  c_18 array<int>,\n" +
+                        "  c_19 string\n" +
                         ") " +
                         "WITH (\n" +
                         "  'connector' = 'doris',\n" +
-                        "  'fenodes' = 'FE_IP:8030',\n" +
-                        "  'table.identifier' = 'db.table',\n" +
+                        "  'fenodes' = '127.0.0.1:8030',\n" +
+                        "  'table.identifier' = 'test.test_all_type_sink',\n" +
                         "  'username' = 'root',\n" +
                         "  'password' = '',\n" +
                         "  'sink.properties.format' = 'csv',\n" +
-                        "  'sink.label-prefix' = 'doris_csv_table'\n" +
+                        "  'sink.label-prefix' = 'doris_label4"  + 
UUID.randomUUID() + "'" +
                         ")");
-
-        tEnv.executeSql("INSERT INTO doris_test_sink select 
name,age,price,sale from doris_test");
+//
+        tEnv.executeSql("INSERT INTO doris_test_sink select * from 
source_doris");
     }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/LookupJoinCdcExample.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/LookupJoinCdcExample.java
new file mode 100644
index 0000000..d5a2b74
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/LookupJoinCdcExample.java
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.lookup;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+public class LookupJoinCdcExample {
+    public static void main(String[] args) throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+//        env.disableOperatorChaining();
+        env.enableCheckpointing(30000);
+        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        tEnv.executeSql(
+                "CREATE TABLE mysql_tb (" +
+                        "id INT," +
+                        "name STRING," +
+                        "process_time as proctime()," +
+                        "primary key(id) NOT ENFORCED" +
+                        ") " +
+                        "WITH (\n" +
+                        "  'connector' = 'mysql-cdc',\n" +
+                        "  'hostname' = '127.0.0.1',\n" +
+                        "  'port' = '3306',\n" +
+                        "  'username' = 'root',\n" +
+                        "  'password' = '123456',\n" +
+                        "  'database-name' = 'test',\n" +
+                        "  'scan.startup.mode' = 'latest-offset',\n" +
+                        "  'server-time-zone' = 'Asia/Shanghai',\n" +
+                        "  'table-name' = 'fact_table'  " +
+                        ")");
+
+        tEnv.executeSql(
+                "CREATE TABLE doris_tb (" +
+                        "id INT," +
+                        "age INT," +
+                        "primary key(id) NOT ENFORCED" +
+                        ") " +
+                        "WITH (\n" +
+                        "  'connector' = 'doris',\n" +
+                        "  'fenodes' = '127.0.0.1:8030',\n" +
+                        "  'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',\n" +
+                        "  'table.identifier' = 'test.dim_table',\n" +
+                        "  'lookup.cache.max-rows' = '1000'," +
+                        "  'lookup.cache.ttl' = '1 hour'," +
+                        "  'lookup.jdbc.async' = 'true',\n" +
+                        "  'username' = 'root',\n" +
+                        "  'password' = ''\n" +
+                        ")");
+
+        Table table = tEnv.sqlQuery("SELECT a.id, a.name, b.age\n" +
+                "FROM mysql_tb a\n" +
+                "  left join doris_tb FOR SYSTEM_TIME AS OF a.process_time AS 
b\n" +
+                "  ON a.id = b.id");
+
+        tEnv.toRetractStream(table, Row.class).print();
+
+        env.execute();
+
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to