This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a297fbdd7 [INLONG-11611][SDK] Transform SDK supports RowData source 
and sink (#11613)
3a297fbdd7 is described below

commit 3a297fbdd7c5600dc2c2f98e6c474c7803dd67a1
Author: vernedeng <[email protected]>
AuthorDate: Thu Dec 19 12:30:54 2024 +0800

    [INLONG-11611][SDK] Transform SDK supports RowData source and sink (#11613)
    
    Co-authored-by: vernedeng <[email protected]>
---
 inlong-sdk/transform-sdk/pom.xml                   | 18 ++++++
 .../sdk/transform/decode/RowDataSourceData.java    | 62 +++++++++++++++++++
 .../sdk/transform/decode/RowDataSourceDecoder.java | 67 ++++++++++++++++++++
 .../sdk/transform/decode/SourceDecoderFactory.java |  5 ++
 .../sdk/transform/encode/RowDataSinkEncoder.java   | 69 +++++++++++++++++++++
 .../sdk/transform/encode/SinkEncoderFactory.java   |  5 ++
 .../inlong/sdk/transform/pojo/FieldInfo.java       |  7 +++
 .../pojo/{FieldInfo.java => RowDataSinkInfo.java}  | 30 ++++-----
 .../{FieldInfo.java => RowDataSourceInfo.java}     | 32 ++++------
 .../apache/inlong/sdk/transform/pojo/SinkInfo.java |  1 +
 .../processor/AbstractProcessorTestBase.java       |  2 +
 .../processor/TestRowData2RowDataProcessor.java    | 72 ++++++++++++++++++++++
 12 files changed, 331 insertions(+), 39 deletions(-)

diff --git a/inlong-sdk/transform-sdk/pom.xml b/inlong-sdk/transform-sdk/pom.xml
index 18db54765e..9450a4fcc8 100644
--- a/inlong-sdk/transform-sdk/pom.xml
+++ b/inlong-sdk/transform-sdk/pom.xml
@@ -46,6 +46,12 @@
             <artifactId>sdk-common</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpclient</artifactId>
@@ -110,6 +116,18 @@
             <groupId>org.dom4j</groupId>
             <artifactId>dom4j</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-format-common</artifactId>
+            <version>${project.version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-format-rowdata-base</artifactId>
+            <version>${project.version}</version>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceData.java
new file mode 100644
index 0000000000..3e6ee9fc39
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceData.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import org.apache.inlong.sort.formats.base.RowDataToFieldConverters;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.table.data.RowData;
+
+import java.util.Map;
+
+@Slf4j
+public class RowDataSourceData implements SourceData {
+
+    private final RowData rowData;
+    private final Map<String, Integer> fieldPositionMap;
+    private final RowDataToFieldConverters.RowFieldConverter[] converters;
+
+    public RowDataSourceData(
+            RowData rowData,
+            Map<String, Integer> fieldPositionMap,
+            RowDataToFieldConverters.RowFieldConverter[] converters) {
+        this.rowData = rowData;
+        this.fieldPositionMap = fieldPositionMap;
+        this.converters = converters;
+    }
+
+    @Override
+    public int getRowCount() {
+        return 1;
+    }
+
+    @Override
+    public Object getField(int rowNum, String fieldName) {
+        if (rowNum != 0) {
+            return null;
+        }
+        try {
+            int fieldPosition = fieldPositionMap.get(fieldName);
+            return converters[fieldPosition].convert(rowData, fieldPosition);
+        } catch (Throwable e) {
+            log.error("failed to convert field={}", fieldName, e);
+            return null;
+        }
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java
new file mode 100644
index 0000000000..fdd6c4ce08
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.RowDataSourceInfo;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sort.formats.base.RowDataToFieldConverters;
+import org.apache.inlong.sort.formats.base.TableFormatForRowDataUtils;
+
+import org.apache.flink.table.data.RowData;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RowDataSourceDecoder extends SourceDecoder<RowData> {
+
+    private final Map<String, Integer> fieldPositionMap;
+    private final RowDataToFieldConverters.RowFieldConverter[] 
rowFieldConverters;
+
+    public RowDataSourceDecoder(RowDataSourceInfo sourceInfo) {
+        super(sourceInfo.getFields());
+        List<FieldInfo> fields = sourceInfo.getFields();
+        this.fieldPositionMap = parseFieldPositionMap(fields);
+
+        rowFieldConverters = new 
RowDataToFieldConverters.RowFieldConverter[fields.size()];
+        for (int i = 0; i < rowFieldConverters.length; i++) {
+            rowFieldConverters[i] = 
RowDataToFieldConverters.createNullableRowFieldConverter(
+                    
TableFormatForRowDataUtils.deriveLogicalType(fields.get(i).getFormatInfo()));
+        }
+    }
+
+    private Map<String, Integer> parseFieldPositionMap(List<FieldInfo> fields) 
{
+        Map<String, Integer> map = new HashMap<>();
+        for (int i = 0; i < fields.size(); i++) {
+            map.put(fields.get(i).getName(), i);
+        }
+        return map;
+    }
+
+    @Override
+    public SourceData decode(byte[] srcBytes, Context context) {
+        throw new UnsupportedOperationException("do not support decoding bytes 
for row data decoder");
+    }
+
+    @Override
+    public SourceData decode(RowData rowData, Context context) {
+        return new RowDataSourceData(rowData, fieldPositionMap, 
rowFieldConverters);
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java
index 76e856be14..dafd891435 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java
@@ -24,6 +24,7 @@ import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo;
 import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
 import org.apache.inlong.sdk.transform.pojo.ParquetSourceInfo;
 import org.apache.inlong.sdk.transform.pojo.PbSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.RowDataSourceInfo;
 import org.apache.inlong.sdk.transform.pojo.XmlSourceInfo;
 import org.apache.inlong.sdk.transform.pojo.YamlSourceInfo;
 
@@ -65,4 +66,8 @@ public class SourceDecoderFactory {
         return new YamlSourceDecoder(sourceInfo);
     }
 
+    public static RowDataSourceDecoder createRowDecoder(RowDataSourceInfo 
sourceInfo) {
+        return new RowDataSourceDecoder(sourceInfo);
+    }
+
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java
new file mode 100644
index 0000000000..f2203cb3cd
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.encode;
+
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.RowDataSinkInfo;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
+import org.apache.inlong.sort.formats.base.TableFormatUtils;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RowDataSinkEncoder extends SinkEncoder<RowData> {
+
+    private final FieldToRowDataConverters.FieldToRowDataConverter[] 
fieldToRowDataConverters;
+    private final Map<String, Integer> fieldPositionMap;
+
+    public RowDataSinkEncoder(RowDataSinkInfo sinkInfo) {
+        super(sinkInfo.getFields());
+        this.fieldPositionMap = parseFieldPositionMap(fields);
+
+        fieldToRowDataConverters = new 
FieldToRowDataConverters.FieldToRowDataConverter[fields.size()];
+        for (int i = 0; i < fields.size(); i++) {
+            fieldToRowDataConverters[i] = 
FieldToRowDataConverters.createConverter(
+                    
TableFormatUtils.deriveLogicalType(fields.get(i).getFormatInfo()));
+        }
+    }
+
+    private Map<String, Integer> parseFieldPositionMap(List<FieldInfo> fields) 
{
+        Map<String, Integer> map = new HashMap<>();
+        for (int i = 0; i < fields.size(); i++) {
+            map.put(fields.get(i).getName(), i);
+        }
+        return map;
+    }
+
+    @Override
+    public RowData encode(SinkData sinkData, Context context) {
+        GenericRowData rowData = new 
GenericRowData(fieldToRowDataConverters.length);
+
+        for (int i = 0; i < fields.size(); i++) {
+            String fieldName = fields.get(i).getName();
+            String fieldValue = sinkData.getField(fieldName);
+            rowData.setField(i, 
fieldToRowDataConverters[i].convert(fieldValue));
+        }
+
+        return rowData;
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
index 1778aba180..8d82970e7a 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
@@ -22,6 +22,7 @@ import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
 import org.apache.inlong.sdk.transform.pojo.MapSinkInfo;
 import org.apache.inlong.sdk.transform.pojo.ParquetSinkInfo;
 import org.apache.inlong.sdk.transform.pojo.PbSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.RowDataSinkInfo;
 
 public class SinkEncoderFactory {
 
@@ -45,4 +46,8 @@ public class SinkEncoderFactory {
         return new PbSinkEncoder(pbSinkInfo);
     }
 
+    public static RowDataSinkEncoder createRowEncoder(RowDataSinkInfo 
rowDataSinkInfo) {
+        return new RowDataSinkEncoder(rowDataSinkInfo);
+    }
+
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
index fe08d00bb1..eaf1b7a9eb 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sdk.transform.pojo;
 
+import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
 import org.apache.inlong.sdk.transform.process.converter.TypeConverter;
 
 import lombok.Data;
@@ -28,6 +29,7 @@ import lombok.Data;
 public class FieldInfo {
 
     private String name;
+    private FormatInfo formatInfo;
     private TypeConverter converter = TypeConverter.DefaultTypeConverter();
 
     public FieldInfo() {
@@ -42,4 +44,9 @@ public class FieldInfo {
         this.name = name;
         this.converter = converter;
     }
+
+    public FieldInfo(String name, TypeConverter converter, FormatInfo 
formatInfo) {
+        this(name, converter);
+        this.formatInfo = formatInfo;
+    }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/RowDataSinkInfo.java
similarity index 64%
copy from 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
copy to 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/RowDataSinkInfo.java
index fe08d00bb1..d88ddbb6eb 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/RowDataSinkInfo.java
@@ -17,29 +17,21 @@
 
 package org.apache.inlong.sdk.transform.pojo;
 
-import org.apache.inlong.sdk.transform.process.converter.TypeConverter;
-
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import lombok.Data;
+import lombok.experimental.SuperBuilder;
 
-/**
- * FieldInfo
- */
-@Data
-public class FieldInfo {
-
-    private String name;
-    private TypeConverter converter = TypeConverter.DefaultTypeConverter();
+import java.util.List;
 
-    public FieldInfo() {
-
-    }
+@JsonIgnoreProperties(ignoreUnknown = true)
+@Data
+@SuperBuilder
+public class RowDataSinkInfo extends SinkInfo {
 
-    public FieldInfo(String name) {
-        this(name, TypeConverter.DefaultTypeConverter());
-    }
+    private List<FieldInfo> fields;
 
-    public FieldInfo(String name, TypeConverter converter) {
-        this.name = name;
-        this.converter = converter;
+    public RowDataSinkInfo(String charset, List<FieldInfo> fields) {
+        super(SinkInfo.ROWDATA, charset);
+        this.fields = fields;
     }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/RowDataSourceInfo.java
similarity index 63%
copy from 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
copy to 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/RowDataSourceInfo.java
index fe08d00bb1..18aa936bc9 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/RowDataSourceInfo.java
@@ -17,29 +17,21 @@
 
 package org.apache.inlong.sdk.transform.pojo;
 
-import org.apache.inlong.sdk.transform.process.converter.TypeConverter;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Getter;
+import lombok.experimental.SuperBuilder;
 
-import lombok.Data;
+import java.util.List;
 
-/**
- * FieldInfo
- */
-@Data
-public class FieldInfo {
-
-    private String name;
-    private TypeConverter converter = TypeConverter.DefaultTypeConverter();
-
-    public FieldInfo() {
+@JsonIgnoreProperties(ignoreUnknown = true)
+@SuperBuilder
+@Getter
+public class RowDataSourceInfo extends SourceInfo {
 
-    }
-
-    public FieldInfo(String name) {
-        this(name, TypeConverter.DefaultTypeConverter());
-    }
+    private List<FieldInfo> fields;
 
-    public FieldInfo(String name, TypeConverter converter) {
-        this.name = name;
-        this.converter = converter;
+    public RowDataSourceInfo(String charset, List<FieldInfo> fields) {
+        super(charset);
+        this.fields = fields;
     }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
index 022af73fd5..89cf599d98 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
@@ -46,6 +46,7 @@ public abstract class SinkInfo {
     public static final String ES_MAP = "es_map";
     public static final String PARQUET = "parquet";
     public static final String PB = "pb";
+    public static final String ROWDATA = "rowdata";
 
     @JsonIgnore
     private String type;
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
index 3322d83199..bc0fb0371e 100644
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sdk.transform.process.processor;
 
+import 
org.apache.inlong.common.pojo.sort.dataflow.field.format.StringFormatInfo;
 import org.apache.inlong.sdk.transform.decode.ParquetInputByteArray;
 import org.apache.inlong.sdk.transform.pojo.FieldInfo;
 
@@ -48,6 +49,7 @@ public abstract class AbstractProcessorTestBase {
         for (String fieldName : fieldNames) {
             FieldInfo field = new FieldInfo();
             field.setName(fieldName);
+            field.setFormatInfo(new StringFormatInfo());
             fields.add(field);
         }
         return fields;
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestRowData2RowDataProcessor.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestRowData2RowDataProcessor.java
new file mode 100644
index 0000000000..9bb997e9af
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestRowData2RowDataProcessor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.processor;
+
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.RowDataSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.RowDataSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+public class TestRowData2RowDataProcessor extends AbstractProcessorTestBase {
+
+    @Test
+    public void testRowData2RowData() throws Exception {
+        List<FieldInfo> fields1 = this.getTestFieldList("sid", "packageID", 
"msgTime", "msg");
+        RowDataSourceInfo sourceInfo = new RowDataSourceInfo("utf-8", fields1);
+        List<FieldInfo> fields2 = this.getTestFieldList("f1", "f2", "f3", 
"f4");
+        RowDataSinkInfo sinkInfo = new RowDataSinkInfo("utf-8", fields2);
+
+        String transformSql = "select msgTime ,msg, packageID, sid";
+        TransformConfig config = new TransformConfig(transformSql);
+        TransformProcessor<RowData, RowData> processor =
+                TransformProcessor.create(
+                        config,
+                        SourceDecoderFactory.createRowDecoder(sourceInfo),
+                        SinkEncoderFactory.createRowEncoder(sinkInfo));
+
+        RowData sourceRow = createRowData();
+
+        List<RowData> sinkRow = processor.transform(sourceRow);
+        RowData expectedRow = sinkRow.get(0);
+        Assert.assertEquals("2024-12-19T11:00:55.212", 
expectedRow.getString(0).toString());
+        Assert.assertEquals("msg111", expectedRow.getString(1).toString());
+        Assert.assertEquals("pack123", expectedRow.getString(2).toString());
+        Assert.assertEquals("s123", expectedRow.getString(3).toString());
+
+    }
+
+    private RowData createRowData() {
+        GenericRowData rowData = new GenericRowData(4);
+        rowData.setField(0, StringData.fromString("s123"));
+        rowData.setField(1, StringData.fromString("pack123"));
+        rowData.setField(2, StringData.fromString("2024-12-19T11:00:55.212"));
+        rowData.setField(3, StringData.fromString("msg111"));
+        return rowData;
+    }
+}

Reply via email to