This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e0bbbb8510 [INLONG-10568][Sort] Change the starrocks connector UNKNOWN
datatype handle method (#10570)
e0bbbb8510 is described below
commit e0bbbb85107f2c4fb48918df7f5004f76be807e0
Author: XiaoYou201 <[email protected]>
AuthorDate: Sun Jul 7 12:17:18 2024 +0800
[INLONG-10568][Sort] Change the starrocks connector UNKNOWN datatype handle
method (#10570)
---
.../sink/table/StarRocksDynamicTableSink.java | 1 -
.../table/sink/table/StarRocksSinkOP.java | 42 +++
.../sink/table/StarRocksTableRowTransformer.java | 320 +++++++++++++++++++++
3 files changed, 362 insertions(+), 1 deletion(-)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicTableSink.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicTableSink.java
index c5df06a3ab..418e99c4df 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicTableSink.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicTableSink.java
@@ -17,7 +17,6 @@
package org.apache.inlong.sort.starrocks.table.sink.table;
-import com.starrocks.connector.flink.row.sink.StarRocksTableRowTransformer;
import
com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionBase;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
import org.apache.flink.api.common.typeinfo.TypeInformation;
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksSinkOP.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksSinkOP.java
new file mode 100644
index 0000000000..d37d7b0209
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksSinkOP.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.starrocks.table.sink.table;
+
+import org.apache.flink.types.RowKind;
+
+/**
+ * StarRocks sink operator.
+ * copy from {@link com.starrocks.connector.flink.row.sink.StarRocksSinkOP}
+ * not modified
+ */
+public enum StarRocksSinkOP {
+
+ UPSERT, DELETE;
+
+ public static final String COLUMN_KEY = "__op";
+
+ static StarRocksSinkOP parse(RowKind kind) {
+ if (RowKind.INSERT.equals(kind) || RowKind.UPDATE_AFTER.equals(kind)) {
+ return UPSERT;
+ }
+ if (RowKind.DELETE.equals(kind) || RowKind.UPDATE_BEFORE.equals(kind))
{
+ return DELETE;
+ }
+ throw new RuntimeException("Unsupported row kind.");
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksTableRowTransformer.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksTableRowTransformer.java
new file mode 100644
index 0000000000..6b4586a630
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksTableRowTransformer.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.starrocks.table.sink.table;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.JSONSerializer;
+import com.alibaba.fastjson.serializer.ObjectSerializer;
+import com.alibaba.fastjson.serializer.SerializeConfig;
+import com.alibaba.fastjson.serializer.SerializeWriter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.starrocks.connector.flink.row.sink.StarRocksIRowTransformer;
+import com.starrocks.connector.flink.table.StarRocksDataType;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.binary.BinaryArrayData;
+import org.apache.flink.table.data.binary.BinaryMapData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Type;
+import java.sql.Date;
+import java.text.SimpleDateFormat;
+import java.time.LocalDate;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * copy from {@link
com.starrocks.connector.flink.row.sink.StarRocksTableRowTransformer}
+ */
+public class StarRocksTableRowTransformer implements
StarRocksIRowTransformer<RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ private TypeInformation<RowData> rowDataTypeInfo;
+ private Function<RowData, RowData> valueTransform;
+ private String[] columnNames;
+ private DataType[] columnDataTypes;
+ private Map<String, StarRocksDataType> columns;
+ private final SimpleDateFormat dateFormatter = new
SimpleDateFormat("yyyy-MM-dd");
+
+ public StarRocksTableRowTransformer(TypeInformation<RowData>
rowDataTypeInfo) {
+ this.rowDataTypeInfo = rowDataTypeInfo;
+ }
+
+ @Override
+ public void setStarRocksColumns(Map<String, StarRocksDataType> columns) {
+ this.columns = columns;
+ }
+
+ @Override
+ public void setTableSchema(TableSchema ts) {
+ this.columnNames = ts.getFieldNames();
+ this.columnDataTypes = ts.getFieldDataTypes();
+ }
+
+ @Override
+ public void setRuntimeContext(RuntimeContext runtimeCtx) {
+ final TypeSerializer<RowData> typeSerializer =
+
rowDataTypeInfo.createSerializer(runtimeCtx.getExecutionConfig());
+ valueTransform =
+ runtimeCtx.getExecutionConfig().isObjectReuseEnabled() ?
typeSerializer::copy : Function.identity();
+ SerializeConfig.getGlobalInstance().put(BinaryStringData.class, new
BinaryStringDataSerializer());
+ SerializeConfig.getGlobalInstance().put(DecimalData.class, new
DecimalDataSerializer());
+ SerializeConfig.getGlobalInstance().put(TimestampData.class, new
TimestampDataSerializer());
+ }
+
+ @Override
+ public Object[] transform(RowData record, boolean supportUpsertDelete) {
+ RowData transformRecord = valueTransform.apply(record);
+ Object[] values = new Object[columnDataTypes.length +
(supportUpsertDelete ? 1 : 0)];
+ int idx = 0;
+ for (DataType dataType : columnDataTypes) {
+ values[idx] = typeConvertion(dataType.getLogicalType(),
transformRecord, idx);
+ idx++;
+ }
+ if (supportUpsertDelete) {
+ // set `__op` column
+ values[idx] = StarRocksSinkOP.parse(record.getRowKind()).ordinal();
+ }
+ return values;
+ }
+
+ private Object typeConvertion(LogicalType type, RowData record, int pos) {
+ if (record.isNullAt(pos)) {
+ return null;
+ }
+ switch (type.getTypeRoot()) {
+ case BOOLEAN:
+ return record.getBoolean(pos) ? 1L : 0L;
+ case TINYINT:
+ return record.getByte(pos);
+ case SMALLINT:
+ return record.getShort(pos);
+ case INTEGER:
+ return record.getInt(pos);
+ case BIGINT:
+ return record.getLong(pos);
+ case FLOAT:
+ return record.getFloat(pos);
+ case DOUBLE:
+ return record.getDouble(pos);
+ case CHAR:
+ case VARCHAR:
+ String sValue = record.getString(pos).toString();
+ if (columns == null) {
+ return sValue;
+ }
+ StarRocksDataType starRocksDataType =
+ columns.getOrDefault(columnNames[pos],
StarRocksDataType.UNKNOWN);
+ if (starRocksDataType == StarRocksDataType.UNKNOWN) {
+ return sValue;
+ }
+ if ((starRocksDataType == StarRocksDataType.JSON)
+ && (sValue.charAt(0) == '{' || sValue.charAt(0) ==
'[')) {
+ // The json string need to be converted to a json object,
and to the json string
+ // again via JSON.toJSONString in
StarRocksJsonSerializer#serialize. Otherwise,
+ // the final json string in stream load will not be
correct. For example, the received
+ // string is "{"a": 1, "b": 2}", and if input it to
JSON.toJSONString directly, the
+ // result will be "{\"a\": 1, \"b\": 2}" which will not be
recognized as a json in
+ // StarRocks
+ return JSON.parse(sValue);
+ }
+ return sValue;
+ case DATE:
+ return
dateFormatter.format(Date.valueOf(LocalDate.ofEpochDay(record.getInt(pos))));
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ final int timestampPrecision = ((TimestampType)
type).getPrecision();
+ return record.getTimestamp(pos,
timestampPrecision).toLocalDateTime().toString();
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ int localZonedTimestampPrecision = ((LocalZonedTimestampType)
type).getPrecision();
+ return record.getTimestamp(pos,
localZonedTimestampPrecision).toLocalDateTime().toString();
+ case DECIMAL: // for both largeint and decimal
+ final int decimalPrecision = ((DecimalType)
type).getPrecision();
+ final int decimalScale = ((DecimalType) type).getScale();
+ return record.getDecimal(pos, decimalPrecision,
decimalScale).toBigDecimal();
+ case BINARY:
+ final byte[] bts = record.getBinary(pos);
+ long value = 0;
+ for (int i = 0; i < bts.length; i++) {
+ value += (bts[bts.length - i - 1] & 0xffL) << (8 * i);
+ }
+ return value;
+ case ARRAY:
+ return convertNestedArray(record.getArray(pos), type);
+ case MAP:
+ return convertNestedMap(record.getMap(pos), type);
+ case ROW:
+ RowType rType = (RowType) type;
+ Map<String, Object> m = new HashMap<>();
+ RowData row = record.getRow(pos, rType.getFieldCount());
+ rType.getFields().parallelStream().forEach(
+ f -> m.put(f.getName(), typeConvertion(f.getType(),
row, rType.getFieldIndex(f.getName()))));
+ if (columns == null) {
+ return m;
+ }
+ StarRocksDataType rStarRocksDataType =
+ columns.getOrDefault(columnNames[pos],
StarRocksDataType.UNKNOWN);
+ if (rStarRocksDataType == StarRocksDataType.STRING) {
+ return JSON.toJSONString(m);
+ }
+ return m;
+ default:
+ throw new UnsupportedOperationException("Unsupported type:" +
type);
+ }
+ }
+
+ private List<Object> convertNestedArray(ArrayData arrData, LogicalType
type) {
+ if (arrData instanceof GenericArrayData) {
+ return Lists.newArrayList(((GenericArrayData)
arrData).toObjectArray());
+ }
+ if (arrData instanceof BinaryArrayData) {
+ LogicalType lt = ((ArrayType) type).getElementType();
+ List<Object> data = Lists.newArrayList(((BinaryArrayData)
arrData).toObjectArray(lt));
+ if (LogicalTypeRoot.ROW.equals(lt.getTypeRoot())) {
+ RowType rType = (RowType) lt;
+ // parse nested row data
+ return data.parallelStream().map(row -> {
+ Map<String, Object> m = Maps.newHashMap();
+ rType.getFields().parallelStream().forEach(f ->
m.put(f.getName(),
+ typeConvertion(f.getType(), (RowData) row,
rType.getFieldIndex(f.getName()))));
+ return JSON.toJSONString(m);
+ }).collect(Collectors.toList());
+ }
+ if (LogicalTypeRoot.MAP.equals(lt.getTypeRoot())) {
+ // traversal of the nested map
+ return data.parallelStream().map(m ->
convertNestedMap((MapData) m, lt)).collect(Collectors.toList());
+ }
+ if (LogicalTypeRoot.DATE.equals(lt.getTypeRoot())) {
+ return data.parallelStream()
+ .map(date ->
dateFormatter.format(Date.valueOf(LocalDate.ofEpochDay((Integer) date))))
+ .collect(Collectors.toList());
+ }
+ if (LogicalTypeRoot.ARRAY.equals(lt.getTypeRoot())) {
+ // traversal of the nested array
+ return data.parallelStream().map(arr ->
convertNestedArray((ArrayData) arr, lt))
+ .collect(Collectors.toList());
+ }
+ return data;
+ }
+ throw new UnsupportedOperationException(String.format("Unsupported
array data: %s", arrData.getClass()));
+ }
+
+ private Map<Object, Object> convertNestedMap(MapData mapData, LogicalType
type) {
+ if (mapData instanceof GenericMapData) {
+ HashMap<Object, Object> m = Maps.newHashMap();
+ for (Object k : ((GenericArrayData) ((GenericMapData)
mapData).keyArray()).toObjectArray()) {
+ m.put(k, ((GenericMapData) mapData).get(k));
+ }
+ return m;
+ }
+ if (mapData instanceof BinaryMapData) {
+ Map<Object, Object> result = Maps.newHashMap();
+ LogicalType valType = ((MapType) type).getValueType();
+ Map<?, ?> javaMap = ((BinaryMapData) mapData).toJavaMap(((MapType)
type).getKeyType(), valType);
+ for (Map.Entry<?, ?> en : javaMap.entrySet()) {
+ if (LogicalTypeRoot.MAP.equals(valType.getTypeRoot())) {
+ // traversal of the nested map
+ result.put(en.getKey().toString(),
convertNestedMap((MapData) en.getValue(), valType));
+ continue;
+ }
+ if (LogicalTypeRoot.DATE.equals(valType.getTypeRoot())) {
+ result.put(en.getKey().toString(),
+
dateFormatter.format(Date.valueOf(LocalDate.ofEpochDay((Integer)
en.getValue()))));
+ continue;
+ }
+ if (LogicalTypeRoot.ARRAY.equals(valType.getTypeRoot())) {
+ result.put(en.getKey().toString(),
convertNestedArray((ArrayData) en.getValue(), valType));
+ continue;
+ }
+ result.put(en.getKey().toString(), en.getValue());
+ }
+ return result;
+ }
+ throw new UnsupportedOperationException(String.format("Unsupported map
data: %s", mapData.getClass()));
+ }
+
+}
+final class BinaryStringDataSerializer implements ObjectSerializer,
Serializable {
+
+ private static final long serialVersionUID = 1L;
+ @Override
+ public void write(JSONSerializer serializer, Object object, Object
fieldName, Type fieldType, int features)
+ throws IOException {
+ SerializeWriter out = serializer.getWriter();
+ if (null == object) {
+ serializer.getWriter().writeNull();
+ return;
+ }
+ BinaryStringData strData = (BinaryStringData) object;
+ out.writeString(strData.toString());
+ }
+}
+
+final class DecimalDataSerializer implements ObjectSerializer, Serializable {
+
+ private static final long serialVersionUID = 1L;
+ @Override
+ public void write(JSONSerializer serializer, Object object, Object
fieldName, Type fieldType, int features)
+ throws IOException {
+ SerializeWriter out = serializer.getWriter();
+ if (null == object) {
+ serializer.getWriter().writeNull();
+ return;
+ }
+ out.writeString(((DecimalData) object).toBigDecimal().toPlainString());
+ }
+}
+
+final class TimestampDataSerializer implements ObjectSerializer, Serializable {
+
+ private static final long serialVersionUID = 1L;
+ @Override
+ public void write(JSONSerializer serializer, Object object, Object
fieldName, Type fieldType, int features)
+ throws IOException {
+ SerializeWriter out = serializer.getWriter();
+ if (null == object) {
+ serializer.getWriter().writeNull();
+ return;
+ }
+ out.writeString(((TimestampData) object).toLocalDateTime().toString());
+ }
+}