This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 535059e25d [INLONG-11616][SDK] Use self-defined Field and RowData 
conversion utils (#11617)
535059e25d is described below

commit 535059e25d7f3d5630f2f592c557f80a8917e8a4
Author: vernedeng <[email protected]>
AuthorDate: Thu Dec 19 22:02:25 2024 +0800

    [INLONG-11616][SDK] Use self-defined Field and RowData conversion utils 
(#11617)
---
 .../sdk/transform/decode/RowDataSourceData.java    |   6 +-
 .../sdk/transform/decode/RowDataSourceDecoder.java |   8 +-
 .../sdk/transform/encode/RowDataSinkEncoder.java   |  23 +-
 .../sdk/transform/utils/FieldToRowDataUtils.java   |  20 +-
 .../sdk/transform/utils/RowToFieldDataUtils.java   | 258 +++++++++++++++++++++
 .../formats/base/FieldToRowDataConverters.java     |  13 +-
 6 files changed, 291 insertions(+), 37 deletions(-)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceData.java
index 3e6ee9fc39..fbfd423497 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceData.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceData.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.sdk.transform.decode;
 
-import org.apache.inlong.sort.formats.base.RowDataToFieldConverters;
+import org.apache.inlong.sdk.transform.utils.RowToFieldDataUtils;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.table.data.RowData;
@@ -29,12 +29,12 @@ public class RowDataSourceData implements SourceData {
 
     private final RowData rowData;
     private final Map<String, Integer> fieldPositionMap;
-    private final RowDataToFieldConverters.RowFieldConverter[] converters;
+    private final RowToFieldDataUtils.RowFieldConverter[] converters;
 
     public RowDataSourceData(
             RowData rowData,
             Map<String, Integer> fieldPositionMap,
-            RowDataToFieldConverters.RowFieldConverter[] converters) {
+            RowToFieldDataUtils.RowFieldConverter[] converters) {
         this.rowData = rowData;
         this.fieldPositionMap = fieldPositionMap;
         this.converters = converters;
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java
index fdd6c4ce08..a82fc2ac6b 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java
@@ -20,7 +20,7 @@ package org.apache.inlong.sdk.transform.decode;
 import org.apache.inlong.sdk.transform.pojo.FieldInfo;
 import org.apache.inlong.sdk.transform.pojo.RowDataSourceInfo;
 import org.apache.inlong.sdk.transform.process.Context;
-import org.apache.inlong.sort.formats.base.RowDataToFieldConverters;
+import org.apache.inlong.sdk.transform.utils.RowToFieldDataUtils;
 import org.apache.inlong.sort.formats.base.TableFormatForRowDataUtils;
 
 import org.apache.flink.table.data.RowData;
@@ -32,16 +32,16 @@ import java.util.Map;
 public class RowDataSourceDecoder extends SourceDecoder<RowData> {
 
     private final Map<String, Integer> fieldPositionMap;
-    private final RowDataToFieldConverters.RowFieldConverter[] 
rowFieldConverters;
+    private final RowToFieldDataUtils.RowFieldConverter[] rowFieldConverters;
 
     public RowDataSourceDecoder(RowDataSourceInfo sourceInfo) {
         super(sourceInfo.getFields());
         List<FieldInfo> fields = sourceInfo.getFields();
         this.fieldPositionMap = parseFieldPositionMap(fields);
 
-        rowFieldConverters = new 
RowDataToFieldConverters.RowFieldConverter[fields.size()];
+        rowFieldConverters = new 
RowToFieldDataUtils.RowFieldConverter[fields.size()];
         for (int i = 0; i < rowFieldConverters.length; i++) {
-            rowFieldConverters[i] = 
RowDataToFieldConverters.createNullableRowFieldConverter(
+            rowFieldConverters[i] = 
RowToFieldDataUtils.createNullableRowFieldConverter(
                     
TableFormatForRowDataUtils.deriveLogicalType(fields.get(i).getFormatInfo()));
         }
     }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java
index f2203cb3cd..1cab9f6fe3 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java
@@ -17,43 +17,28 @@
 
 package org.apache.inlong.sdk.transform.encode;
 
-import org.apache.inlong.sdk.transform.pojo.FieldInfo;
 import org.apache.inlong.sdk.transform.pojo.RowDataSinkInfo;
 import org.apache.inlong.sdk.transform.process.Context;
-import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
+import org.apache.inlong.sdk.transform.utils.FieldToRowDataUtils;
 import org.apache.inlong.sort.formats.base.TableFormatUtils;
 
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 public class RowDataSinkEncoder extends SinkEncoder<RowData> {
 
-    private final FieldToRowDataConverters.FieldToRowDataConverter[] 
fieldToRowDataConverters;
-    private final Map<String, Integer> fieldPositionMap;
+    private final FieldToRowDataUtils.FieldToRowDataConverter[] 
fieldToRowDataConverters;
 
     public RowDataSinkEncoder(RowDataSinkInfo sinkInfo) {
         super(sinkInfo.getFields());
-        this.fieldPositionMap = parseFieldPositionMap(fields);
 
-        fieldToRowDataConverters = new 
FieldToRowDataConverters.FieldToRowDataConverter[fields.size()];
+        fieldToRowDataConverters = new 
FieldToRowDataUtils.FieldToRowDataConverter[fields.size()];
         for (int i = 0; i < fields.size(); i++) {
-            fieldToRowDataConverters[i] = 
FieldToRowDataConverters.createConverter(
+            fieldToRowDataConverters[i] = FieldToRowDataUtils.createConverter(
                     
TableFormatUtils.deriveLogicalType(fields.get(i).getFormatInfo()));
         }
     }
 
-    private Map<String, Integer> parseFieldPositionMap(List<FieldInfo> fields) 
{
-        Map<String, Integer> map = new HashMap<>();
-        for (int i = 0; i < fields.size(); i++) {
-            map.put(fields.get(i).getName(), i);
-        }
-        return map;
-    }
-
     @Override
     public RowData encode(SinkData sinkData, Context context) {
         GenericRowData rowData = new 
GenericRowData(fieldToRowDataConverters.length);
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/FieldToRowDataConverters.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java
similarity index 89%
copy from 
inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/FieldToRowDataConverters.java
copy to 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java
index 8d6a5473f7..e27c0da21f 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/FieldToRowDataConverters.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.base;
+package org.apache.inlong.sdk.transform.utils;
 
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.GenericArrayData;
@@ -36,10 +36,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
-/**
- * Converters to convert field object to RowData field.
- */
-public class FieldToRowDataConverters implements Serializable {
+public class FieldToRowDataUtils {
 
     private static final long serialVersionUID = 1L;
 
@@ -70,17 +67,24 @@ public class FieldToRowDataConverters implements 
Serializable {
             case NULL:
                 return (obj) -> null;
             case BOOLEAN:
+                return obj -> Boolean.parseBoolean(obj.toString());
             case TINYINT:
+                return obj -> Byte.parseByte(obj.toString());
             case SMALLINT:
-            case INTEGER:
+                return obj -> Short.parseShort(obj.toString());
             case INTERVAL_YEAR_MONTH:
-            case BIGINT:
+            case INTEGER:
+                return obj -> Integer.parseInt(obj.toString());
             case INTERVAL_DAY_TIME:
+            case BIGINT:
+                return obj -> Long.parseLong(obj.toString());
             case FLOAT:
+                return obj -> Float.parseFloat(obj.toString());
             case DOUBLE:
+                return obj -> Double.parseDouble(obj.toString());
             case BINARY:
             case VARBINARY:
-                return (obj) -> obj;
+                return obj -> obj.toString().getBytes();
             case CHAR:
             case VARCHAR:
                 return (obj -> StringData.fromString((String) obj));
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/RowToFieldDataUtils.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/RowToFieldDataUtils.java
new file mode 100644
index 0000000000..878c87ced0
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/RowToFieldDataUtils.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.utils;
+
+import org.apache.inlong.sort.formats.base.RowDataToFieldConverters;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.types.Row;
+
+import java.io.Serializable;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.Arrays;
+
+public class RowToFieldDataUtils {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Runtime converter that converts objects of Flink Table & SQL internal 
data structures to
+     * corresponding {@link Object}s.
+     */
+    public interface RowFieldConverter extends Serializable {
+
+        Object convert(RowData row, int pos);
+    }
+
+    private interface ArrayElementConverter extends Serializable {
+
+        Object convert(ArrayData array, int pos);
+    }
+
+    public static RowFieldConverter 
createNullableRowFieldConverter(LogicalType fieldType) {
+        final RowFieldConverter fieldConverter = 
createRowFieldConverter(fieldType);
+        return (row, pos) -> {
+            if (row.isNullAt(pos)) {
+                return null;
+            }
+            return fieldConverter.convert(row, pos);
+        };
+    }
+
+    private static RowFieldConverter createRowFieldConverter(LogicalType 
fieldType) {
+        switch (fieldType.getTypeRoot()) {
+            case NULL:
+                return (row, pos) -> null;
+            case BOOLEAN:
+                return RowData::getBoolean;
+            case TINYINT:
+                return RowData::getByte;
+            case SMALLINT:
+                return RowData::getShort;
+            case INTEGER:
+            case INTERVAL_YEAR_MONTH:
+                return RowData::getInt;
+            case BIGINT:
+            case INTERVAL_DAY_TIME:
+                return RowData::getLong;
+            case FLOAT:
+                return RowData::getFloat;
+            case DOUBLE:
+                return RowData::getDouble;
+            case CHAR:
+            case VARCHAR:
+                return (row, pos) -> row.getString(pos).toString();
+            case BINARY:
+            case VARBINARY:
+                return RowData::getBinary;
+            case DATE:
+                return (row, pos) -> convertDate(row.getLong(pos));
+            case TIME_WITHOUT_TIME_ZONE:
+                return (row, pos) -> convertTime(row.getInt(pos));
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                final int timestampPrecision = ((TimestampType) 
fieldType).getPrecision();
+                return (row, pos) -> convertTimestamp(
+                        row.getTimestamp(pos, timestampPrecision));
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                final int zonedTimestampPrecision =
+                        ((LocalZonedTimestampType) fieldType).getPrecision();
+                return (row, pos) -> convertTimestamp(
+                        row.getTimestamp(pos, zonedTimestampPrecision));
+            case DECIMAL:
+                return createDecimalRowFieldConverter((DecimalType) fieldType);
+            case ARRAY:
+                return createArrayRowFieldConverter((ArrayType) fieldType);
+            case ROW:
+                return createRowRowFieldConverter((RowType) fieldType);
+            case MAP:
+            case MULTISET:
+            case RAW:
+            default:
+                throw new UnsupportedOperationException("Unsupported type: " + 
fieldType);
+        }
+    }
+
+    private static ArrayElementConverter createNullableArrayElementConverter(
+            LogicalType fieldType) {
+        final ArrayElementConverter elementConverter = 
createArrayElementConverter(fieldType);
+        return (array, pos) -> {
+            if (array.isNullAt(pos)) {
+                return null;
+            }
+            return elementConverter.convert(array, pos);
+        };
+    }
+
+    private static ArrayElementConverter 
createArrayElementConverter(LogicalType fieldType) {
+        switch (fieldType.getTypeRoot()) {
+            case NULL:
+                return (array, pos) -> null;
+            case BOOLEAN:
+                return ArrayData::getBoolean;
+            case TINYINT:
+                return ArrayData::getByte;
+            case SMALLINT:
+                return ArrayData::getShort;
+            case INTEGER:
+            case INTERVAL_YEAR_MONTH:
+                return ArrayData::getInt;
+            case BIGINT:
+            case INTERVAL_DAY_TIME:
+                return ArrayData::getLong;
+            case FLOAT:
+                return ArrayData::getFloat;
+            case DOUBLE:
+                return ArrayData::getDouble;
+            case CHAR:
+            case VARCHAR:
+                return (array, pos) -> array.getString(pos).toString();
+            case BINARY:
+            case VARBINARY:
+                return ArrayData::getBinary;
+            case DATE:
+                return (array, pos) -> convertDate(array.getLong(pos));
+            case TIME_WITHOUT_TIME_ZONE:
+                return (array, pos) -> convertTime(array.getInt(pos));
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                final int timestampPrecision = ((TimestampType) 
fieldType).getPrecision();
+                return (array, pos) -> convertTimestamp(
+                        array.getTimestamp(pos, timestampPrecision));
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                final int localZonedTimestampPrecision =
+                        ((LocalZonedTimestampType) fieldType).getPrecision();
+                return (array, pos) -> convertTimestamp(
+                        array.getTimestamp(pos, localZonedTimestampPrecision));
+            case DECIMAL:
+                return createDecimalArrayElementConverter((DecimalType) 
fieldType);
+            // we don't support ARRAY and ROW in an ARRAY, see
+            // CsvRowSchemaConverter#validateNestedField
+            case ARRAY:
+            case ROW:
+            case MAP:
+            case MULTISET:
+            case RAW:
+            default:
+                throw new UnsupportedOperationException("Unsupported type: " + 
fieldType);
+        }
+    }
+
+    // 
------------------------------------------------------------------------------------------
+    // Field/Element Converters
+    // 
------------------------------------------------------------------------------------------
+
+    private static RowFieldConverter 
createDecimalRowFieldConverter(DecimalType decimalType) {
+        final int precision = decimalType.getPrecision();
+        final int scale = decimalType.getScale();
+        return (row, pos) -> {
+            DecimalData decimal = row.getDecimal(pos, precision, scale);
+            return decimal.toBigDecimal();
+        };
+    }
+
+    private static ArrayElementConverter createDecimalArrayElementConverter(
+            DecimalType decimalType) {
+        final int precision = decimalType.getPrecision();
+        final int scale = decimalType.getScale();
+        return (array, pos) -> {
+            DecimalData decimal = array.getDecimal(pos, precision, scale);
+            return decimal.toBigDecimal();
+        };
+    }
+
+    private static Date convertDate(long days) {
+        LocalDate localDate = LocalDate.ofEpochDay(days);
+        return Date.valueOf(localDate);
+    }
+
+    private static Time convertTime(int millisecond) {
+        LocalTime time = LocalTime.ofNanoOfDay(millisecond * 1000_000L);
+        return Time.valueOf(time);
+    }
+
+    private static Timestamp convertTimestamp(TimestampData timestamp) {
+        return timestamp.toTimestamp();
+    }
+
+    private static RowFieldConverter createArrayRowFieldConverter(ArrayType 
type) {
+        LogicalType elementType = type.getElementType();
+        final ArrayElementConverter elementConverter =
+                createNullableArrayElementConverter(elementType);
+        return (row, pos) -> {
+            ArrayData arrayData = row.getArray(pos);
+            int numElements = arrayData.size();
+            Object[] result = new Object[numElements];
+            for (int i = 0; i < numElements; i++) {
+                result[i] = elementConverter.convert(arrayData, i);
+            }
+            return result;
+        };
+    }
+
+    private static RowFieldConverter createRowRowFieldConverter(RowType type) {
+        LogicalType[] fieldTypes =
+                type.getFields().stream()
+                        .map(RowType.RowField::getType)
+                        .toArray(LogicalType[]::new);
+        final RowFieldConverter[] fieldConverters =
+                Arrays.stream(fieldTypes)
+                        
.map(RowDataToFieldConverters::createNullableRowFieldConverter)
+                        .toArray(RowFieldConverter[]::new);
+        final int rowArity = type.getFieldCount();
+        return (row, pos) -> {
+            final RowData value = row.getRow(pos, rowArity);
+            Row result = new Row(rowArity);
+            for (int i = 0; i < rowArity; i++) {
+                result.setField(i, fieldConverters[i].convert(value, i));
+            }
+            return result;
+        };
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/FieldToRowDataConverters.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/FieldToRowDataConverters.java
index 8d6a5473f7..adeee86fa6 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/FieldToRowDataConverters.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/FieldToRowDataConverters.java
@@ -70,17 +70,24 @@ public class FieldToRowDataConverters implements 
Serializable {
             case NULL:
                 return (obj) -> null;
             case BOOLEAN:
+                return obj -> Boolean.parseBoolean(obj.toString());
             case TINYINT:
+                return obj -> Byte.parseByte(obj.toString());
             case SMALLINT:
-            case INTEGER:
+                return obj -> Short.parseShort(obj.toString());
             case INTERVAL_YEAR_MONTH:
-            case BIGINT:
+            case INTEGER:
+                return obj -> Integer.parseInt(obj.toString());
             case INTERVAL_DAY_TIME:
+            case BIGINT:
+                return obj -> Long.parseLong(obj.toString());
             case FLOAT:
+                return obj -> Float.parseFloat(obj.toString());
             case DOUBLE:
+                return obj -> Double.parseDouble(obj.toString());
             case BINARY:
             case VARBINARY:
-                return (obj) -> obj;
+                return obj -> obj.toString().getBytes();
             case CHAR:
             case VARCHAR:
                 return (obj -> StringData.fromString((String) obj));

Reply via email to