This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 535059e25d [INLONG-11616][SDK] Use self-defined Field and RowData
conversion utils (#11617)
535059e25d is described below
commit 535059e25d7f3d5630f2f592c557f80a8917e8a4
Author: vernedeng <[email protected]>
AuthorDate: Thu Dec 19 22:02:25 2024 +0800
[INLONG-11616][SDK] Use self-defined Field and RowData conversion utils
(#11617)
---
.../sdk/transform/decode/RowDataSourceData.java | 6 +-
.../sdk/transform/decode/RowDataSourceDecoder.java | 8 +-
.../sdk/transform/encode/RowDataSinkEncoder.java | 23 +-
.../sdk/transform/utils/FieldToRowDataUtils.java | 20 +-
.../sdk/transform/utils/RowToFieldDataUtils.java | 258 +++++++++++++++++++++
.../formats/base/FieldToRowDataConverters.java | 13 +-
6 files changed, 291 insertions(+), 37 deletions(-)
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceData.java
index 3e6ee9fc39..fbfd423497 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceData.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceData.java
@@ -17,7 +17,7 @@
package org.apache.inlong.sdk.transform.decode;
-import org.apache.inlong.sort.formats.base.RowDataToFieldConverters;
+import org.apache.inlong.sdk.transform.utils.RowToFieldDataUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.table.data.RowData;
@@ -29,12 +29,12 @@ public class RowDataSourceData implements SourceData {
private final RowData rowData;
private final Map<String, Integer> fieldPositionMap;
- private final RowDataToFieldConverters.RowFieldConverter[] converters;
+ private final RowToFieldDataUtils.RowFieldConverter[] converters;
public RowDataSourceData(
RowData rowData,
Map<String, Integer> fieldPositionMap,
- RowDataToFieldConverters.RowFieldConverter[] converters) {
+ RowToFieldDataUtils.RowFieldConverter[] converters) {
this.rowData = rowData;
this.fieldPositionMap = fieldPositionMap;
this.converters = converters;
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java
index fdd6c4ce08..a82fc2ac6b 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java
@@ -20,7 +20,7 @@ package org.apache.inlong.sdk.transform.decode;
import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.pojo.RowDataSourceInfo;
import org.apache.inlong.sdk.transform.process.Context;
-import org.apache.inlong.sort.formats.base.RowDataToFieldConverters;
+import org.apache.inlong.sdk.transform.utils.RowToFieldDataUtils;
import org.apache.inlong.sort.formats.base.TableFormatForRowDataUtils;
import org.apache.flink.table.data.RowData;
@@ -32,16 +32,16 @@ import java.util.Map;
public class RowDataSourceDecoder extends SourceDecoder<RowData> {
private final Map<String, Integer> fieldPositionMap;
- private final RowDataToFieldConverters.RowFieldConverter[]
rowFieldConverters;
+ private final RowToFieldDataUtils.RowFieldConverter[] rowFieldConverters;
public RowDataSourceDecoder(RowDataSourceInfo sourceInfo) {
super(sourceInfo.getFields());
List<FieldInfo> fields = sourceInfo.getFields();
this.fieldPositionMap = parseFieldPositionMap(fields);
- rowFieldConverters = new
RowDataToFieldConverters.RowFieldConverter[fields.size()];
+ rowFieldConverters = new
RowToFieldDataUtils.RowFieldConverter[fields.size()];
for (int i = 0; i < rowFieldConverters.length; i++) {
- rowFieldConverters[i] =
RowDataToFieldConverters.createNullableRowFieldConverter(
+ rowFieldConverters[i] =
RowToFieldDataUtils.createNullableRowFieldConverter(
TableFormatForRowDataUtils.deriveLogicalType(fields.get(i).getFormatInfo()));
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java
index f2203cb3cd..1cab9f6fe3 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java
@@ -17,43 +17,28 @@
package org.apache.inlong.sdk.transform.encode;
-import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.pojo.RowDataSinkInfo;
import org.apache.inlong.sdk.transform.process.Context;
-import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
+import org.apache.inlong.sdk.transform.utils.FieldToRowDataUtils;
import org.apache.inlong.sort.formats.base.TableFormatUtils;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
public class RowDataSinkEncoder extends SinkEncoder<RowData> {
- private final FieldToRowDataConverters.FieldToRowDataConverter[]
fieldToRowDataConverters;
- private final Map<String, Integer> fieldPositionMap;
+ private final FieldToRowDataUtils.FieldToRowDataConverter[]
fieldToRowDataConverters;
public RowDataSinkEncoder(RowDataSinkInfo sinkInfo) {
super(sinkInfo.getFields());
- this.fieldPositionMap = parseFieldPositionMap(fields);
- fieldToRowDataConverters = new
FieldToRowDataConverters.FieldToRowDataConverter[fields.size()];
+ fieldToRowDataConverters = new
FieldToRowDataUtils.FieldToRowDataConverter[fields.size()];
for (int i = 0; i < fields.size(); i++) {
- fieldToRowDataConverters[i] =
FieldToRowDataConverters.createConverter(
+ fieldToRowDataConverters[i] = FieldToRowDataUtils.createConverter(
TableFormatUtils.deriveLogicalType(fields.get(i).getFormatInfo()));
}
}
- private Map<String, Integer> parseFieldPositionMap(List<FieldInfo> fields)
{
- Map<String, Integer> map = new HashMap<>();
- for (int i = 0; i < fields.size(); i++) {
- map.put(fields.get(i).getName(), i);
- }
- return map;
- }
-
@Override
public RowData encode(SinkData sinkData, Context context) {
GenericRowData rowData = new
GenericRowData(fieldToRowDataConverters.length);
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/FieldToRowDataConverters.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java
similarity index 89%
copy from
inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/FieldToRowDataConverters.java
copy to
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java
index 8d6a5473f7..e27c0da21f 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/FieldToRowDataConverters.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.formats.base;
+package org.apache.inlong.sdk.transform.utils;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericArrayData;
@@ -36,10 +36,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
-/**
- * Converters to convert field object to RowData field.
- */
-public class FieldToRowDataConverters implements Serializable {
+public class FieldToRowDataUtils {
private static final long serialVersionUID = 1L;
@@ -70,17 +67,24 @@ public class FieldToRowDataConverters implements
Serializable {
case NULL:
return (obj) -> null;
case BOOLEAN:
+ return obj -> Boolean.parseBoolean(obj.toString());
case TINYINT:
+ return obj -> Byte.parseByte(obj.toString());
case SMALLINT:
- case INTEGER:
+ return obj -> Short.parseShort(obj.toString());
case INTERVAL_YEAR_MONTH:
- case BIGINT:
+ case INTEGER:
+ return obj -> Integer.parseInt(obj.toString());
case INTERVAL_DAY_TIME:
+ case BIGINT:
+ return obj -> Long.parseLong(obj.toString());
case FLOAT:
+ return obj -> Float.parseFloat(obj.toString());
case DOUBLE:
+ return obj -> Double.parseDouble(obj.toString());
case BINARY:
case VARBINARY:
- return (obj) -> obj;
+ return obj -> obj.toString().getBytes();
case CHAR:
case VARCHAR:
return (obj -> StringData.fromString((String) obj));
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/RowToFieldDataUtils.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/RowToFieldDataUtils.java
new file mode 100644
index 0000000000..878c87ced0
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/RowToFieldDataUtils.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.utils;
+
+import org.apache.inlong.sort.formats.base.RowDataToFieldConverters;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.types.Row;
+
+import java.io.Serializable;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.Arrays;
+
+public class RowToFieldDataUtils {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Runtime converter that converts objects of Flink Table & SQL internal
data structures to
+ * corresponding {@link Object}s.
+ */
+ public interface RowFieldConverter extends Serializable {
+
+ Object convert(RowData row, int pos);
+ }
+
+ private interface ArrayElementConverter extends Serializable {
+
+ Object convert(ArrayData array, int pos);
+ }
+
+ public static RowFieldConverter
createNullableRowFieldConverter(LogicalType fieldType) {
+ final RowFieldConverter fieldConverter =
createRowFieldConverter(fieldType);
+ return (row, pos) -> {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
+ return fieldConverter.convert(row, pos);
+ };
+ }
+
+ private static RowFieldConverter createRowFieldConverter(LogicalType
fieldType) {
+ switch (fieldType.getTypeRoot()) {
+ case NULL:
+ return (row, pos) -> null;
+ case BOOLEAN:
+ return RowData::getBoolean;
+ case TINYINT:
+ return RowData::getByte;
+ case SMALLINT:
+ return RowData::getShort;
+ case INTEGER:
+ case INTERVAL_YEAR_MONTH:
+ return RowData::getInt;
+ case BIGINT:
+ case INTERVAL_DAY_TIME:
+ return RowData::getLong;
+ case FLOAT:
+ return RowData::getFloat;
+ case DOUBLE:
+ return RowData::getDouble;
+ case CHAR:
+ case VARCHAR:
+ return (row, pos) -> row.getString(pos).toString();
+ case BINARY:
+ case VARBINARY:
+ return RowData::getBinary;
+ case DATE:
+ return (row, pos) -> convertDate(row.getLong(pos));
+ case TIME_WITHOUT_TIME_ZONE:
+ return (row, pos) -> convertTime(row.getInt(pos));
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ final int timestampPrecision = ((TimestampType)
fieldType).getPrecision();
+ return (row, pos) -> convertTimestamp(
+ row.getTimestamp(pos, timestampPrecision));
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ final int zonedTimestampPrecision =
+ ((LocalZonedTimestampType) fieldType).getPrecision();
+ return (row, pos) -> convertTimestamp(
+ row.getTimestamp(pos, zonedTimestampPrecision));
+ case DECIMAL:
+ return createDecimalRowFieldConverter((DecimalType) fieldType);
+ case ARRAY:
+ return createArrayRowFieldConverter((ArrayType) fieldType);
+ case ROW:
+ return createRowRowFieldConverter((RowType) fieldType);
+ case MAP:
+ case MULTISET:
+ case RAW:
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " +
fieldType);
+ }
+ }
+
+ private static ArrayElementConverter createNullableArrayElementConverter(
+ LogicalType fieldType) {
+ final ArrayElementConverter elementConverter =
createArrayElementConverter(fieldType);
+ return (array, pos) -> {
+ if (array.isNullAt(pos)) {
+ return null;
+ }
+ return elementConverter.convert(array, pos);
+ };
+ }
+
+ private static ArrayElementConverter
createArrayElementConverter(LogicalType fieldType) {
+ switch (fieldType.getTypeRoot()) {
+ case NULL:
+ return (array, pos) -> null;
+ case BOOLEAN:
+ return ArrayData::getBoolean;
+ case TINYINT:
+ return ArrayData::getByte;
+ case SMALLINT:
+ return ArrayData::getShort;
+ case INTEGER:
+ case INTERVAL_YEAR_MONTH:
+ return ArrayData::getInt;
+ case BIGINT:
+ case INTERVAL_DAY_TIME:
+ return ArrayData::getLong;
+ case FLOAT:
+ return ArrayData::getFloat;
+ case DOUBLE:
+ return ArrayData::getDouble;
+ case CHAR:
+ case VARCHAR:
+ return (array, pos) -> array.getString(pos).toString();
+ case BINARY:
+ case VARBINARY:
+ return ArrayData::getBinary;
+ case DATE:
+ return (array, pos) -> convertDate(array.getLong(pos));
+ case TIME_WITHOUT_TIME_ZONE:
+ return (array, pos) -> convertTime(array.getInt(pos));
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ final int timestampPrecision = ((TimestampType)
fieldType).getPrecision();
+ return (array, pos) -> convertTimestamp(
+ array.getTimestamp(pos, timestampPrecision));
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ final int localZonedTimestampPrecision =
+ ((LocalZonedTimestampType) fieldType).getPrecision();
+ return (array, pos) -> convertTimestamp(
+ array.getTimestamp(pos, localZonedTimestampPrecision));
+ case DECIMAL:
+ return createDecimalArrayElementConverter((DecimalType)
fieldType);
+ // we don't support ARRAY and ROW in an ARRAY, see
+ // CsvRowSchemaConverter#validateNestedField
+ case ARRAY:
+ case ROW:
+ case MAP:
+ case MULTISET:
+ case RAW:
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " +
fieldType);
+ }
+ }
+
+ //
------------------------------------------------------------------------------------------
+ // Field/Element Converters
+ //
------------------------------------------------------------------------------------------
+
+ private static RowFieldConverter
createDecimalRowFieldConverter(DecimalType decimalType) {
+ final int precision = decimalType.getPrecision();
+ final int scale = decimalType.getScale();
+ return (row, pos) -> {
+ DecimalData decimal = row.getDecimal(pos, precision, scale);
+ return decimal.toBigDecimal();
+ };
+ }
+
+ private static ArrayElementConverter createDecimalArrayElementConverter(
+ DecimalType decimalType) {
+ final int precision = decimalType.getPrecision();
+ final int scale = decimalType.getScale();
+ return (array, pos) -> {
+ DecimalData decimal = array.getDecimal(pos, precision, scale);
+ return decimal.toBigDecimal();
+ };
+ }
+
+ private static Date convertDate(long days) {
+ LocalDate localDate = LocalDate.ofEpochDay(days);
+ return Date.valueOf(localDate);
+ }
+
+ private static Time convertTime(int millisecond) {
+ LocalTime time = LocalTime.ofNanoOfDay(millisecond * 1000_000L);
+ return Time.valueOf(time);
+ }
+
+ private static Timestamp convertTimestamp(TimestampData timestamp) {
+ return timestamp.toTimestamp();
+ }
+
+ private static RowFieldConverter createArrayRowFieldConverter(ArrayType
type) {
+ LogicalType elementType = type.getElementType();
+ final ArrayElementConverter elementConverter =
+ createNullableArrayElementConverter(elementType);
+ return (row, pos) -> {
+ ArrayData arrayData = row.getArray(pos);
+ int numElements = arrayData.size();
+ Object[] result = new Object[numElements];
+ for (int i = 0; i < numElements; i++) {
+ result[i] = elementConverter.convert(arrayData, i);
+ }
+ return result;
+ };
+ }
+
+ private static RowFieldConverter createRowRowFieldConverter(RowType type) {
+ LogicalType[] fieldTypes =
+ type.getFields().stream()
+ .map(RowType.RowField::getType)
+ .toArray(LogicalType[]::new);
+ final RowFieldConverter[] fieldConverters =
+ Arrays.stream(fieldTypes)
+
.map(RowDataToFieldConverters::createNullableRowFieldConverter)
+ .toArray(RowFieldConverter[]::new);
+ final int rowArity = type.getFieldCount();
+ return (row, pos) -> {
+ final RowData value = row.getRow(pos, rowArity);
+ Row result = new Row(rowArity);
+ for (int i = 0; i < rowArity; i++) {
+ result.setField(i, fieldConverters[i].convert(value, i));
+ }
+ return result;
+ };
+ }
+}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/FieldToRowDataConverters.java
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/FieldToRowDataConverters.java
index 8d6a5473f7..adeee86fa6 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/FieldToRowDataConverters.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/FieldToRowDataConverters.java
@@ -70,17 +70,24 @@ public class FieldToRowDataConverters implements
Serializable {
case NULL:
return (obj) -> null;
case BOOLEAN:
+ return obj -> Boolean.parseBoolean(obj.toString());
case TINYINT:
+ return obj -> Byte.parseByte(obj.toString());
case SMALLINT:
- case INTEGER:
+ return obj -> Short.parseShort(obj.toString());
case INTERVAL_YEAR_MONTH:
- case BIGINT:
+ case INTEGER:
+ return obj -> Integer.parseInt(obj.toString());
case INTERVAL_DAY_TIME:
+ case BIGINT:
+ return obj -> Long.parseLong(obj.toString());
case FLOAT:
+ return obj -> Float.parseFloat(obj.toString());
case DOUBLE:
+ return obj -> Double.parseDouble(obj.toString());
case BINARY:
case VARBINARY:
- return (obj) -> obj;
+ return obj -> obj.toString().getBytes();
case CHAR:
case VARCHAR:
return (obj -> StringData.fromString((String) obj));