This is an automated email from the ASF dual-hosted git repository.
jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new f1bdb72 [Feature](type) Support doris array type and
FlinkSQLarray,map,row type (#122)
f1bdb72 is described below
commit f1bdb72028f5921214d2cd5a303cb9bc0871562a
Author: wudi <[email protected]>
AuthorDate: Tue Mar 14 16:25:28 2023 +0800
[Feature](type) Support doris array type and FlinkSQLarray,map,row type
(#122)
* support doris array type and flinkSQL map row type
---
.../converter/DorisRowConverter.java | 128 +++++++-
.../apache/doris/flink/serialization/RowBatch.java | 331 ++++++++++-----------
.../doris/flink/sink/writer/RowDataSerializer.java | 2 +-
.../doris/flink/DorisSinkArraySQLExample.java | 129 ++++++++
.../apache/doris/flink/DorisSourceSinkExample.java | 126 ++++++--
.../doris/flink/lookup/LookupJoinCdcExample.java | 80 +++++
6 files changed, 598 insertions(+), 198 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
index 435c0f3..919c1f4 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
@@ -16,34 +16,53 @@
// under the License.
package org.apache.doris.flink.deserialization.converter;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.doris.flink.serialization.RowBatch;
+import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.binary.BinaryArrayData;
+import org.apache.flink.table.data.binary.BinaryMapData;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.util.Preconditions;
import java.io.Serializable;
import java.math.BigDecimal;
import java.sql.Date;
import java.time.LocalDate;
import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkNotNull;
public class DorisRowConverter implements Serializable {
private static final long serialVersionUID = 1L;
- private final DeserializationConverter[] deserializationConverters;
- private final SerializationConverter[] serializationConverters;
+ private DeserializationConverter[] deserializationConverters;
+ private SerializationConverter[] serializationConverters;
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
+ public DorisRowConverter(){}
public DorisRowConverter(RowType rowType) {
checkNotNull(rowType);
@@ -66,6 +85,16 @@ public class DorisRowConverter implements Serializable {
}
}
+ public DorisRowConverter setExternalConverter(DataType[] dataTypes) {
+ checkNotNull(dataTypes);
+ this.serializationConverters = new
SerializationConverter[dataTypes.length];
+ for (int i = 0; i < dataTypes.length; i++) {
+ LogicalType logicalType = dataTypes[i].getLogicalType();
+ serializationConverters[i] =
createNullableExternalConverter(logicalType);
+ }
+ return this;
+ }
+
/**
* Convert data retrieved from {@link RowBatch} to {@link RowData}.
*
@@ -180,11 +209,12 @@ public class DorisRowConverter implements Serializable {
};
case CHAR:
case VARCHAR:
- return val -> StringData.fromString((String) val);
+ return val -> StringData.fromString(val.toString());
case TIME_WITHOUT_TIME_ZONE:
case BINARY:
case VARBINARY:
case ARRAY:
+ return val -> convertArrayData(((List<?>) val).toArray(),
type);
case ROW:
case MAP:
case MULTISET:
@@ -234,11 +264,14 @@ public class DorisRowConverter implements Serializable {
return (index, val) -> val.getTimestamp(index,
localP).toTimestamp();
case TIMESTAMP_WITH_TIME_ZONE:
final int zonedP = ((ZonedTimestampType) type).getPrecision();
- return (index, val) -> val.getTimestamp(index,
zonedP).toTimestamp();
+ return (index, val) -> val.getTimestamp(index,
zonedP).toTimestamp();
case ARRAY:
- case MULTISET:
+ return (index, val) -> convertArrayData(val.getArray(index),
type);
case MAP:
+ return (index, val) ->
writeValueAsString(convertMapData(val.getMap(index), type));
case ROW:
+ return (index, val) -> writeValueAsString(convertRowData(val,
index, type));
+ case MULTISET:
case STRUCTURED_TYPE:
case DISTINCT_TYPE:
case RAW:
@@ -248,4 +281,89 @@ public class DorisRowConverter implements Serializable {
throw new UnsupportedOperationException("Unsupported type:" +
type);
}
}
+
+ private ArrayData convertArrayData(Object[] array, LogicalType type) {
+ List<LogicalType> children = type.getChildren();
+ Preconditions.checkState(children.size() > 0, "Failed to obtain the
item type of array");
+ DeserializationConverter converter =
createNullableInternalConverter(children.get(0));
+ for (int i = 0; i < array.length; i++) {
+ Object result = converter.deserialize(array[i]);
+ array[i] = result;
+ }
+ GenericArrayData arrayData = new GenericArrayData(array);
+ return arrayData;
+ }
+
+ private List<Object> convertArrayData(ArrayData array, LogicalType type){
+ if(array instanceof GenericArrayData){
+ return Arrays.asList(((GenericArrayData)array).toObjectArray());
+ }
+ if(array instanceof BinaryArrayData){
+ LogicalType elementType = ((ArrayType)type).getElementType();
+ List<Object> values = Arrays.asList(((BinaryArrayData)
array).toObjectArray(elementType));
+ if(LogicalTypeRoot.DATE.equals(elementType.getTypeRoot())) {
+ return values.stream().map(date ->
Date.valueOf(LocalDate.ofEpochDay((Integer)date))).collect(Collectors.toList());
+ }
+ if (LogicalTypeRoot.ARRAY.equals(elementType.getTypeRoot())) {
+ return values.stream().map(arr ->
convertArrayData((ArrayData)arr, elementType)).collect(Collectors.toList());
+ }
+ return values;
+ }
+ throw new UnsupportedOperationException("Unsupported array data: " +
array.getClass());
+ }
+
+ private Object convertMapData(MapData map, LogicalType type) {
+ Map<Object, Object> result = new HashMap<>();
+ if (map instanceof GenericMapData) {
+ GenericMapData gMap = (GenericMapData)map;
+ for (Object key :
((GenericArrayData)gMap.keyArray()).toObjectArray()) {
+ result.put(key, gMap.get(key));
+ }
+ return result;
+ }
+ if (map instanceof BinaryMapData) {
+ BinaryMapData bMap = (BinaryMapData)map;
+ LogicalType valueType = ((MapType)type).getValueType();
+ Map<?, ?> javaMap = bMap.toJavaMap(((MapType) type).getKeyType(),
valueType);
+ for (Map.Entry<?,?> entry : javaMap.entrySet()) {
+ String key = entry.getKey().toString();
+ if (LogicalTypeRoot.MAP.equals(valueType.getTypeRoot())) {
+ result.put(key, convertMapData((MapData)entry.getValue(),
valueType));
+ }else if
(LogicalTypeRoot.DATE.equals(valueType.getTypeRoot())) {
+ result.put(key,
Date.valueOf(LocalDate.ofEpochDay((Integer)entry.getValue())).toString());
+ }else if
(LogicalTypeRoot.ARRAY.equals(valueType.getTypeRoot())) {
+ result.put(key,
convertArrayData((ArrayData)entry.getValue(), valueType));
+ }else if(entry.getValue() instanceof TimestampData){
+ result.put(key,
((TimestampData)entry.getValue()).toTimestamp().toString());
+ }else{
+ result.put(key, entry.getValue().toString());
+ }
+ }
+ return result;
+ }
+ throw new UnsupportedOperationException("Unsupported map data: " +
map.getClass());
+ }
+
+ private Object convertRowData(RowData val, int index, LogicalType type) {
+ RowType rowType = (RowType)type;
+ Map<String, Object> value = new HashMap<>();
+ RowData row = val.getRow(index, rowType.getFieldCount());
+
+ List<RowType.RowField> fields = rowType.getFields();
+ for(int i = 0;i< fields.size(); i++){
+ RowType.RowField rowField = fields.get(i);
+ SerializationConverter converter =
createNullableExternalConverter(rowField.getType());
+ Object valTmp = converter.serialize(i, row);
+ value.put(rowField.getName(), valTmp.toString());
+ }
+ return value;
+ }
+
+ private String writeValueAsString(Object object){
+ try {
+ return objectMapper.writeValueAsString(object);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
index 8ab9a55..76e98e0 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
@@ -30,6 +30,7 @@ import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.types.Types;
import org.apache.doris.flink.exception.DorisException;
@@ -82,10 +83,11 @@ public class RowBatch {
private List<FieldVector> fieldVectors;
private RootAllocator rootAllocator;
private final Schema schema;
-
- private final DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+ private final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
+ private final String DATETIMEV2_PATTERN = "yyyy-MM-dd HH:mm:ss.SSSSSS";
+ private final DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern(DATETIME_PATTERN);
+ private final DateTimeFormatter dateTimeV2Formatter =
DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN);
private final DateTimeFormatter dateFormatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd");
- private final DateTimeFormatter dateTimeV2Formatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
public List<Row> getRowBatch() {
return rowBatch;
@@ -152,171 +154,12 @@ public class RowBatch {
public void convertArrowToRowBatch() throws DorisException {
try {
for (int col = 0; col < fieldVectors.size(); col++) {
- FieldVector curFieldVector = fieldVectors.get(col);
- Types.MinorType mt = curFieldVector.getMinorType();
-
+ FieldVector fieldVector = fieldVectors.get(col);
+ Types.MinorType minorType = fieldVector.getMinorType();
final String currentType = schema.get(col).getType();
- switch (currentType) {
- case "NULL_TYPE":
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
- addValueToRow(rowIndex, null);
- }
- break;
- case "BOOLEAN":
-
Preconditions.checkArgument(mt.equals(Types.MinorType.BIT),
- typeMismatchMessage(currentType, mt));
- BitVector bitVector = (BitVector) curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
- Object fieldValue = bitVector.isNull(rowIndex) ?
null : bitVector.get(rowIndex) != 0;
- addValueToRow(rowIndex, fieldValue);
- }
- break;
- case "TINYINT":
-
Preconditions.checkArgument(mt.equals(Types.MinorType.TINYINT),
- typeMismatchMessage(currentType, mt));
- TinyIntVector tinyIntVector = (TinyIntVector)
curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
- Object fieldValue = tinyIntVector.isNull(rowIndex)
? null : tinyIntVector.get(rowIndex);
- addValueToRow(rowIndex, fieldValue);
- }
- break;
- case "SMALLINT":
-
Preconditions.checkArgument(mt.equals(Types.MinorType.SMALLINT),
- typeMismatchMessage(currentType, mt));
- SmallIntVector smallIntVector = (SmallIntVector)
curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
- Object fieldValue =
smallIntVector.isNull(rowIndex) ? null : smallIntVector.get(rowIndex);
- addValueToRow(rowIndex, fieldValue);
- }
- break;
- case "INT":
-
Preconditions.checkArgument(mt.equals(Types.MinorType.INT),
- typeMismatchMessage(currentType, mt));
- IntVector intVector = (IntVector) curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
- Object fieldValue = intVector.isNull(rowIndex) ?
null : intVector.get(rowIndex);
- addValueToRow(rowIndex, fieldValue);
- }
- break;
- case "BIGINT":
-
-
Preconditions.checkArgument(mt.equals(Types.MinorType.BIGINT),
- typeMismatchMessage(currentType, mt));
- BigIntVector bigIntVector = (BigIntVector)
curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
- Object fieldValue = bigIntVector.isNull(rowIndex)
? null : bigIntVector.get(rowIndex);
- addValueToRow(rowIndex, fieldValue);
- }
- break;
- case "FLOAT":
-
Preconditions.checkArgument(mt.equals(Types.MinorType.FLOAT4),
- typeMismatchMessage(currentType, mt));
- Float4Vector float4Vector = (Float4Vector)
curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
- Object fieldValue = float4Vector.isNull(rowIndex)
? null : float4Vector.get(rowIndex);
- addValueToRow(rowIndex, fieldValue);
- }
- break;
- case "TIME":
- case "DOUBLE":
-
Preconditions.checkArgument(mt.equals(Types.MinorType.FLOAT8),
- typeMismatchMessage(currentType, mt));
- Float8Vector float8Vector = (Float8Vector)
curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
- Object fieldValue = float8Vector.isNull(rowIndex)
? null : float8Vector.get(rowIndex);
- addValueToRow(rowIndex, fieldValue);
- }
- break;
- case "BINARY":
-
Preconditions.checkArgument(mt.equals(Types.MinorType.VARBINARY),
- typeMismatchMessage(currentType, mt));
- VarBinaryVector varBinaryVector = (VarBinaryVector)
curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
- Object fieldValue =
varBinaryVector.isNull(rowIndex) ? null : varBinaryVector.get(rowIndex);
- addValueToRow(rowIndex, fieldValue);
- }
- break;
- case "DECIMAL":
- case "DECIMALV2":
- case "DECIMAL32":
- case "DECIMAL64":
- case "DECIMAL128I":
-
Preconditions.checkArgument(mt.equals(Types.MinorType.DECIMAL),
- typeMismatchMessage(currentType, mt));
- DecimalVector decimalVector = (DecimalVector)
curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
- if (decimalVector.isNull(rowIndex)) {
- addValueToRow(rowIndex, null);
- continue;
- }
- BigDecimal value =
decimalVector.getObject(rowIndex).stripTrailingZeros();
- addValueToRow(rowIndex, value);
- }
- break;
- case "DATE":
- case "DATEV2":
-
Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR),
- typeMismatchMessage(currentType, mt));
- VarCharVector date = (VarCharVector) curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
- if (date.isNull(rowIndex)) {
- addValueToRow(rowIndex, null);
- continue;
- }
- String value = new String(date.get(rowIndex));
- LocalDate localDate = LocalDate.parse(value,
dateFormatter);
- addValueToRow(rowIndex, localDate);
- }
- break;
- case "DATETIME":
-
Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR),
- typeMismatchMessage(currentType, mt));
- VarCharVector timeStampSecVector = (VarCharVector)
curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
- if (timeStampSecVector.isNull(rowIndex)) {
- addValueToRow(rowIndex, null);
- continue;
- }
- String value = new
String(timeStampSecVector.get(rowIndex));
- LocalDateTime parse = LocalDateTime.parse(value,
dateTimeFormatter);
- addValueToRow(rowIndex, parse);
- }
- break;
- case "DATETIMEV2":
-
Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR),
- typeMismatchMessage(currentType, mt));
- VarCharVector timeStampV2SecVector = (VarCharVector)
curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
- if (timeStampV2SecVector.isNull(rowIndex)) {
- addValueToRow(rowIndex, null);
- continue;
- }
- String value = new
String(timeStampV2SecVector.get(rowIndex));
- LocalDateTime parse = LocalDateTime.parse(value,
dateTimeV2Formatter);
- addValueToRow(rowIndex, parse);
- }
- break;
- case "LARGEINT":
- case "CHAR":
- case "VARCHAR":
- case "STRING":
- case "JSONB":
-
Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR),
- typeMismatchMessage(currentType, mt));
- VarCharVector varCharVector = (VarCharVector)
curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
- if (varCharVector.isNull(rowIndex)) {
- addValueToRow(rowIndex, null);
- continue;
- }
- String value = new
String(varCharVector.get(rowIndex));
- addValueToRow(rowIndex, value);
- }
- break;
- default:
- String errMsg = "Unsupported type " +
schema.get(col).getType();
- logger.error(errMsg);
- throw new DorisException(errMsg);
+ for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
+ boolean passed = doConvert(col, rowIndex, minorType,
currentType, fieldVector);
+ Preconditions.checkArgument(passed,
typeMismatchMessage(currentType, minorType));
}
}
} catch (Exception e) {
@@ -325,6 +168,156 @@ public class RowBatch {
}
}
+ private boolean doConvert(int col,
+ int rowIndex,
+ Types.MinorType minorType,
+ String currentType,
+ FieldVector fieldVector) throws DorisException {
+ switch (currentType) {
+ case "NULL_TYPE":
+ break;
+ case "BOOLEAN":
+ if (!minorType.equals(Types.MinorType.BIT)) return false;
+ BitVector bitVector = (BitVector) fieldVector;
+ Object fieldValue = bitVector.isNull(rowIndex) ? null :
bitVector.get(rowIndex) != 0;
+ addValueToRow(rowIndex, fieldValue);
+ break;
+ case "TINYINT":
+ if (!minorType.equals(Types.MinorType.TINYINT)) return false;
+ TinyIntVector tinyIntVector = (TinyIntVector) fieldVector;
+ fieldValue = tinyIntVector.isNull(rowIndex) ? null :
tinyIntVector.get(rowIndex);
+ addValueToRow(rowIndex, fieldValue);
+ break;
+ case "SMALLINT":
+ if (!minorType.equals(Types.MinorType.SMALLINT)) return false;
+ SmallIntVector smallIntVector = (SmallIntVector) fieldVector;
+ fieldValue = smallIntVector.isNull(rowIndex) ? null :
smallIntVector.get(rowIndex);
+ addValueToRow(rowIndex, fieldValue);
+ break;
+ case "INT":
+ if (!minorType.equals(Types.MinorType.INT)) return false;
+ IntVector intVector = (IntVector) fieldVector;
+ fieldValue = intVector.isNull(rowIndex) ? null :
intVector.get(rowIndex);
+ addValueToRow(rowIndex, fieldValue);
+ break;
+ case "BIGINT":
+ if (!minorType.equals(Types.MinorType.BIGINT)) return false;
+ BigIntVector bigIntVector = (BigIntVector) fieldVector;
+ fieldValue = bigIntVector.isNull(rowIndex) ? null :
bigIntVector.get(rowIndex);
+ addValueToRow(rowIndex, fieldValue);
+ break;
+ case "FLOAT":
+ if (!minorType.equals(Types.MinorType.FLOAT4)) return false;
+ Float4Vector float4Vector = (Float4Vector) fieldVector;
+ fieldValue = float4Vector.isNull(rowIndex) ? null :
float4Vector.get(rowIndex);
+ addValueToRow(rowIndex, fieldValue);
+ break;
+ case "TIME":
+ case "DOUBLE":
+ if (!minorType.equals(Types.MinorType.FLOAT8)) return false;
+ Float8Vector float8Vector = (Float8Vector) fieldVector;
+ fieldValue = float8Vector.isNull(rowIndex) ? null :
float8Vector.get(rowIndex);
+ addValueToRow(rowIndex, fieldValue);
+ break;
+ case "BINARY":
+ if (!minorType.equals(Types.MinorType.VARBINARY)) return false;
+
+ VarBinaryVector varBinaryVector = (VarBinaryVector)
fieldVector;
+ fieldValue = varBinaryVector.isNull(rowIndex) ? null :
varBinaryVector.get(rowIndex);
+ addValueToRow(rowIndex, fieldValue);
+ break;
+ case "DECIMAL":
+ case "DECIMALV2":
+ case "DECIMAL32":
+ case "DECIMAL64":
+ case "DECIMAL128I":
+ if (!minorType.equals(Types.MinorType.DECIMAL)) return false;
+ DecimalVector decimalVector = (DecimalVector) fieldVector;
+ if (decimalVector.isNull(rowIndex)) {
+ addValueToRow(rowIndex, null);
+ break;
+ }
+ BigDecimal value =
decimalVector.getObject(rowIndex).stripTrailingZeros();
+ addValueToRow(rowIndex, value);
+ break;
+ case "DATE":
+ case "DATEV2":
+ if (!minorType.equals(Types.MinorType.VARCHAR)) return false;
+ VarCharVector date = (VarCharVector) fieldVector;
+ if (date.isNull(rowIndex)) {
+ addValueToRow(rowIndex, null);
+ break;
+ }
+ String stringValue = new String(date.get(rowIndex));
+ LocalDate localDate = LocalDate.parse(stringValue,
dateFormatter);
+ addValueToRow(rowIndex, localDate);
+ break;
+ case "DATETIME":
+ if (!minorType.equals(Types.MinorType.VARCHAR)) return false;
+ VarCharVector timeStampSecVector = (VarCharVector) fieldVector;
+ if (timeStampSecVector.isNull(rowIndex)) {
+ addValueToRow(rowIndex, null);
+ break;
+ }
+ stringValue = new String(timeStampSecVector.get(rowIndex));
+ LocalDateTime parse = LocalDateTime.parse(stringValue,
dateTimeFormatter);
+ addValueToRow(rowIndex, parse);
+ break;
+ case "DATETIMEV2":
+ if (!minorType.equals(Types.MinorType.VARCHAR)) return false;
+ VarCharVector timeStampV2SecVector = (VarCharVector)
fieldVector;
+ if (timeStampV2SecVector.isNull(rowIndex)) {
+ addValueToRow(rowIndex, null);
+ break;
+ }
+ stringValue = new String(timeStampV2SecVector.get(rowIndex));
+ stringValue = completeMilliseconds(stringValue);
+ parse = LocalDateTime.parse(stringValue, dateTimeV2Formatter);
+ addValueToRow(rowIndex, parse);
+ break;
+ case "LARGEINT":
+ case "CHAR":
+ case "VARCHAR":
+ case "STRING":
+ case "JSONB":
+ if (!minorType.equals(Types.MinorType.VARCHAR)) return false;
+ VarCharVector varCharVector = (VarCharVector) fieldVector;
+ if (varCharVector.isNull(rowIndex)) {
+ addValueToRow(rowIndex, null);
+ break;
+ }
+ stringValue = new String(varCharVector.get(rowIndex));
+ addValueToRow(rowIndex, stringValue);
+ break;
+ case "ARRAY":
+ if (!minorType.equals(Types.MinorType.LIST)) return false;
+ ListVector listVector = (ListVector) fieldVector;
+ Object listValue = listVector.isNull(rowIndex) ? null :
listVector.getObject(rowIndex);
+ //todo: when the subtype of array is date, conversion is
required
+ addValueToRow(rowIndex, listValue);
+ break;
+ default:
+ String errMsg = "Unsupported type " +
schema.get(col).getType();
+ logger.error(errMsg);
+ throw new DorisException(errMsg);
+ }
+ return true;
+ }
+
+ private String completeMilliseconds(String stringValue) {
+ if(stringValue.length() == DATETIMEV2_PATTERN.length()){
+ return stringValue;
+ }
+ StringBuilder sb = new StringBuilder(stringValue);
+ if(stringValue.length() == DATETIME_PATTERN.length()){
+ sb.append(".");
+ }
+ while (sb.toString().length() < DATETIMEV2_PATTERN.length()){
+ sb.append(0);
+ }
+ return sb.toString();
+ }
+
public List<Object> next() {
if (!hasNext()) {
String errMsg = "Get row offset:" + offsetInRowBatch + " larger
than row size: " + readRowCount;
@@ -334,9 +327,9 @@ public class RowBatch {
return rowBatch.get(offsetInRowBatch++).getCols();
}
- private String typeMismatchMessage(final String sparkType, final
Types.MinorType arrowType) {
+ private String typeMismatchMessage(final String flinkType, final
Types.MinorType arrowType) {
final String messageTemplate = "FLINK type is %1$s, but arrow type is
%2$s.";
- return String.format(messageTemplate, sparkType, arrowType.name());
+ return String.format(messageTemplate, flinkType, arrowType.name());
}
public int getReadRowCount() {
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java
index e1ec13e..78e34aa 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java
@@ -55,7 +55,7 @@ public class RowDataSerializer implements
DorisRecordSerializer<RowData> {
if (JSON.equals(type)) {
objectMapper = new ObjectMapper();
}
- this.rowConverter = new DorisRowConverter(dataTypes);
+ this.rowConverter = new
DorisRowConverter().setExternalConverter(dataTypes);
}
@Override
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkArraySQLExample.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkArraySQLExample.java
new file mode 100644
index 0000000..90b7f4c
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkArraySQLExample.java
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+
+import java.util.UUID;
+
+public class DorisSinkArraySQLExample {
+
+ public static void main(String[] args) throws Exception{
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+ tEnv.executeSql("CREATE TABLE source (\n" +
+ " `id` int,\n" +
+ " `c_1` array<INT> ,\n" +
+ " `c_2` array<TINYINT> ,\n" +
+ " `c_3` array<SMALLINT> ,\n" +
+ " `c_4` array<INT> ,\n" +
+ " `c_5` array<BIGINT> ,\n" +
+ " `c_6` array<BIGINT> ,\n" +
+ " `c_7` array<FLOAT>,\n" +
+ " `c_8` array<DOUBLE> ,\n" +
+ " `c_9` array<DECIMAL(4,2)> ,\n" +
+ " `c_10` array<DATE> ,\n" +
+ " `c_11` array<DATE> ,\n" +
+ " `c_12` array<TIMESTAMP> ,\n" +
+ " `c_13` array<TIMESTAMP> ,\n" +
+ " `c_14` array<CHAR(10)> ,\n" +
+ " `c_15` array<VARCHAR(256)> ,\n" +
+ " `c_16` array<STRING> \n" +
+ ") WITH (\n" +
+ " 'connector' = 'datagen', \n" +
+ " 'fields.c_7.element.min' = '1', \n" +
+ " 'fields.c_7.element.max' = '10', \n" +
+ " 'fields.c_8.element.min' = '1', \n" +
+ " 'fields.c_8.element.max' = '10', \n" +
+ " 'fields.c_14.element.length' = '10', \n" +
+ " 'fields.c_15.element.length' = '10', \n" +
+ " 'fields.c_16.element.length' = '10', \n" +
+ " 'number-of-rows' = '5' \n" +
+ ");");
+
+
+ tEnv.executeSql("CREATE TABLE source_doris (" +
+ " `id` int,\n" +
+ " `c_1` array<INT> ,\n" +
+ " `c_2` array<TINYINT> ,\n" +
+ " `c_3` array<SMALLINT> ,\n" +
+ " `c_4` array<INT> ,\n" +
+ " `c_5` array<BIGINT> ,\n" +
+ " `c_6` array<STRING> ,\n" +
+ " `c_7` array<FLOAT> ,\n" +
+ " `c_8` array<DOUBLE> ,\n" +
+ " `c_9` array<DECIMAL(4,2)> ,\n" +
+ " `c_10` array<STRING> ,\n" + //ARRAY<DATE>
+ " `c_11` array<STRING> ,\n" + //ARRAY<DATE>
+ " `c_12` array<STRING> ,\n" + //ARRAY<TIMESTAMP>
+ " `c_13` array<STRING> ,\n" + //ARRAY<TIMESTAMP>
+ " `c_14` array<CHAR(10)> ,\n" +
+ " `c_15` array<VARCHAR(256)> ,\n" +
+ " `c_16` array<STRING> \n" +
+ ") WITH (" +
+ " 'connector' = 'doris',\n" +
+ " 'fenodes' = '127.0.0.1:8030',\n" +
+ " 'table.identifier' = 'test.array_test_type',\n" +
+ " 'username' = 'root',\n" +
+ " 'password' = ''\n" +
+ ")");
+
+
+
+ // define a dynamic aggregating query
+// final Table result = tEnv.sqlQuery("SELECT * from source_doris ");
+//
+// // print the result to the console
+// tEnv.toRetractStream(result, Row.class).print();
+// env.execute();
+
+ tEnv.executeSql(
+ "CREATE TABLE sink (" +
+ " `id` int,\n" +
+ " `c_1` array<INT> ,\n" +
+ " `c_2` array<TINYINT> ,\n" +
+ " `c_3` array<SMALLINT> ,\n" +
+ " `c_4` array<INT> ,\n" +
+ " `c_5` array<BIGINT> ,\n" +
+ " `c_6` array<STRING> ,\n" +
+ " `c_7` array<FLOAT> ,\n" +
+ " `c_8` array<DOUBLE> ,\n" +
+ " `c_9` array<DECIMAL(4,2)> ,\n" +
+ " `c_10` array<STRING> ,\n" + //ARRAY<DATE>
+ " `c_11` array<STRING> ,\n" + //ARRAY<DATE>
+ " `c_12` array<STRING> ,\n" + //ARRAY<TIMESTAMP>
+ " `c_13` array<STRING> ,\n" + //ARRAY<TIMESTAMP>
+ " `c_14` array<CHAR(10)> ,\n" +
+ " `c_15` array<VARCHAR(256)> ,\n" +
+ " `c_16` array<STRING> \n" +
+ ") " +
+ "WITH (\n" +
+ " 'connector' = 'doris',\n" +
+ " 'fenodes' = '127.0.0.1:8030',\n" +
+ " 'table.identifier' =
'test.array_test_type_sink',\n" +
+ " 'username' = 'root',\n" +
+ " 'password' = '',\n" +
+ " 'sink.label-prefix' = 'doris_label4" +
UUID.randomUUID() + "'" +
+ ")");
+ tEnv.executeSql("INSERT INTO sink select * from source_doris");
+
+ }
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
index dae75f0..4622e4e 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
@@ -17,47 +17,127 @@
package org.apache.doris.flink;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+import java.util.UUID;
public class DorisSourceSinkExample {
- public static void main(String[] args) {
- EnvironmentSettings settings = EnvironmentSettings.newInstance()
- .inStreamingMode()
- .build();
- TableEnvironment tEnv = TableEnvironment.create(settings);
+ public static void main(String[] args) throws Exception {
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(
"CREATE TABLE doris_test (" +
- "name STRING," +
- "age INT," +
- "price DECIMAL(5,2)," +
- "sale DOUBLE" +
+ " id int,\n" +
+ " c_1 boolean,\n" +
+ " c_2 tinyint,\n" +
+ " c_3 smallint,\n" +
+ " c_4 int,\n" +
+ " c_5 bigint,\n" +
+ " c_6 bigint,\n" +
+ " c_7 float,\n" +
+ " c_8 double,\n" +
+ " c_9 DECIMAL(4,2),\n" +
+ " c_10 DECIMAL(4,1),\n" +
+ " c_11 date,\n" +
+ " c_12 date,\n" +
+ " c_13 timestamp,\n" +
+ " c_14 timestamp,\n" +
+ " c_15 string,\n" +
+ " c_16 string,\n" +
+ " c_17 string,\n" +
+ " c_18 array<int>,\n" +
+ " c_19 Map<String,int>\n" +
+ ") " +
+ "WITH (\n" +
+ " 'connector' = 'datagen', \n" +
+ " 'fields.c_6.max' = '5', \n" +
+ " 'fields.c_9.max' = '5', \n" +
+ " 'fields.c_10.max' = '5', \n" +
+ " 'fields.c_15.length' = '5', \n" +
+ " 'fields.c_16.length' = '5', \n" +
+ " 'fields.c_17.length' = '5', \n" +
+ " 'fields.c_19.key.length' = '5', \n" +
+ " 'connector' = 'datagen', \n" +
+ " 'number-of-rows' = '1' \n" +
+ ")");
+
+ final Table result = tEnv.sqlQuery("SELECT * from doris_test ");
+
+ // print the result to the console
+ tEnv.toRetractStream(result, Row.class).print();
+ env.execute();
+
+ tEnv.executeSql(
+ "CREATE TABLE source_doris (" +
+ " id int,\n" +
+ " c_1 boolean,\n" +
+ " c_2 tinyint,\n" +
+ " c_3 smallint,\n" +
+ " c_4 int,\n" +
+ " c_5 bigint,\n" +
+ " c_6 string,\n" +
+ " c_7 float,\n" +
+ " c_8 double,\n" +
+ " c_9 DECIMAL(4,2),\n" +
+ " c_10 DECIMAL(4,1),\n" +
+ " c_11 date,\n" +
+ " c_12 date,\n" +
+ " c_13 timestamp,\n" +
+ " c_14 timestamp,\n" +
+ " c_15 string,\n" +
+ " c_16 string,\n" +
+ " c_17 string,\n" +
+ " c_18 array<int>,\n" +
+ " c_19 string\n" +
") " +
"WITH (\n" +
" 'connector' = 'doris',\n" +
- " 'fenodes' = 'FE_IP:8030',\n" +
- " 'table.identifier' = 'db.table',\n" +
+ " 'fenodes' = '127.0.0.1:8030',\n" +
+ " 'table.identifier' = 'test.test_all_type',\n" +
" 'username' = 'root',\n" +
- " 'password' = ''" +
+ " 'password' = ''\n" +
")");
+
tEnv.executeSql(
"CREATE TABLE doris_test_sink (" +
- "name STRING," +
- "age INT," +
- "price DECIMAL(5,2)," +
- "sale DOUBLE" +
+ " id int,\n" +
+ " c_1 boolean,\n" +
+ " c_2 tinyint,\n" +
+ " c_3 smallint,\n" +
+ " c_4 int,\n" +
+ " c_5 bigint,\n" +
+ " c_6 string,\n" +
+ " c_7 float,\n" +
+ " c_8 double,\n" +
+ " c_9 DECIMAL(4,2),\n" +
+ " c_10 DECIMAL(4,1),\n" +
+ " c_11 date,\n" +
+ " c_12 date,\n" +
+ " c_13 timestamp,\n" +
+ " c_14 timestamp,\n" +
+ " c_15 string,\n" +
+ " c_16 string,\n" +
+ " c_17 string,\n" +
+ " c_18 array<int>,\n" +
+ " c_19 string\n" +
") " +
"WITH (\n" +
" 'connector' = 'doris',\n" +
- " 'fenodes' = 'FE_IP:8030',\n" +
- " 'table.identifier' = 'db.table',\n" +
+ " 'fenodes' = '127.0.0.1:8030',\n" +
+ " 'table.identifier' = 'test.test_all_type_sink',\n" +
" 'username' = 'root',\n" +
" 'password' = '',\n" +
" 'sink.properties.format' = 'csv',\n" +
- " 'sink.label-prefix' = 'doris_csv_table'\n" +
+ " 'sink.label-prefix' = 'doris_label4" +
UUID.randomUUID() + "'" +
")");
-
- tEnv.executeSql("INSERT INTO doris_test_sink select
name,age,price,sale from doris_test");
+//
+ tEnv.executeSql("INSERT INTO doris_test_sink select * from
source_doris");
}
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/LookupJoinCdcExample.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/LookupJoinCdcExample.java
new file mode 100644
index 0000000..d5a2b74
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/LookupJoinCdcExample.java
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.lookup;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+public class LookupJoinCdcExample {
+ public static void main(String[] args) throws Exception {
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+// env.disableOperatorChaining();
+ env.enableCheckpointing(30000);
+ final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ tEnv.executeSql(
+ "CREATE TABLE mysql_tb (" +
+ "id INT," +
+ "name STRING," +
+ "process_time as proctime()," +
+ "primary key(id) NOT ENFORCED" +
+ ") " +
+ "WITH (\n" +
+ " 'connector' = 'mysql-cdc',\n" +
+ " 'hostname' = '127.0.0.1',\n" +
+ " 'port' = '3306',\n" +
+ " 'username' = 'root',\n" +
+ " 'password' = '123456',\n" +
+ " 'database-name' = 'test',\n" +
+ " 'scan.startup.mode' = 'latest-offset',\n" +
+ " 'server-time-zone' = 'Asia/Shanghai',\n" +
+ " 'table-name' = 'fact_table' " +
+ ")");
+
+ tEnv.executeSql(
+ "CREATE TABLE doris_tb (" +
+ "id INT," +
+ "age INT," +
+ "primary key(id) NOT ENFORCED" +
+ ") " +
+ "WITH (\n" +
+ " 'connector' = 'doris',\n" +
+ " 'fenodes' = '127.0.0.1:8030',\n" +
+ " 'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',\n" +
+ " 'table.identifier' = 'test.dim_table',\n" +
+ " 'lookup.cache.max-rows' = '1000'," +
+ " 'lookup.cache.ttl' = '1 hour'," +
+ " 'lookup.jdbc.async' = 'true',\n" +
+ " 'username' = 'root',\n" +
+ " 'password' = ''\n" +
+ ")");
+
+ Table table = tEnv.sqlQuery("SELECT a.id, a.name, b.age\n" +
+ "FROM mysql_tb a\n" +
+ " left join doris_tb FOR SYSTEM_TIME AS OF a.process_time AS
b\n" +
+ " ON a.id = b.id");
+
+ tEnv.toRetractStream(table, Row.class).print();
+
+ env.execute();
+
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]