This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3a297fbdd7 [INLONG-11611][SDK] Transform SDK supports RowData source
and sink (#11613)
3a297fbdd7 is described below
commit 3a297fbdd7c5600dc2c2f98e6c474c7803dd67a1
Author: vernedeng <[email protected]>
AuthorDate: Thu Dec 19 12:30:54 2024 +0800
[INLONG-11611][SDK] Transform SDK supports RowData source and sink (#11613)
Co-authored-by: vernedeng <[email protected]>
---
inlong-sdk/transform-sdk/pom.xml | 18 ++++++
.../sdk/transform/decode/RowDataSourceData.java | 62 +++++++++++++++++++
.../sdk/transform/decode/RowDataSourceDecoder.java | 67 ++++++++++++++++++++
.../sdk/transform/decode/SourceDecoderFactory.java | 5 ++
.../sdk/transform/encode/RowDataSinkEncoder.java | 69 +++++++++++++++++++++
.../sdk/transform/encode/SinkEncoderFactory.java | 5 ++
.../inlong/sdk/transform/pojo/FieldInfo.java | 7 +++
.../pojo/{FieldInfo.java => RowDataSinkInfo.java} | 30 ++++-----
.../{FieldInfo.java => RowDataSourceInfo.java} | 32 ++++------
.../apache/inlong/sdk/transform/pojo/SinkInfo.java | 1 +
.../processor/AbstractProcessorTestBase.java | 2 +
.../processor/TestRowData2RowDataProcessor.java | 72 ++++++++++++++++++++++
12 files changed, 331 insertions(+), 39 deletions(-)
diff --git a/inlong-sdk/transform-sdk/pom.xml b/inlong-sdk/transform-sdk/pom.xml
index 18db54765e..9450a4fcc8 100644
--- a/inlong-sdk/transform-sdk/pom.xml
+++ b/inlong-sdk/transform-sdk/pom.xml
@@ -46,6 +46,12 @@
<artifactId>sdk-common</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
@@ -110,6 +116,18 @@
<groupId>org.dom4j</groupId>
<artifactId>dom4j</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-format-common</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-format-rowdata-base</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceData.java
new file mode 100644
index 0000000000..3e6ee9fc39
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceData.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import org.apache.inlong.sort.formats.base.RowDataToFieldConverters;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.table.data.RowData;
+
+import java.util.Map;
+
+@Slf4j
+public class RowDataSourceData implements SourceData {
+
+ private final RowData rowData;
+ private final Map<String, Integer> fieldPositionMap;
+ private final RowDataToFieldConverters.RowFieldConverter[] converters;
+
+ public RowDataSourceData(
+ RowData rowData,
+ Map<String, Integer> fieldPositionMap,
+ RowDataToFieldConverters.RowFieldConverter[] converters) {
+ this.rowData = rowData;
+ this.fieldPositionMap = fieldPositionMap;
+ this.converters = converters;
+ }
+
+ @Override
+ public int getRowCount() {
+ return 1;
+ }
+
+ @Override
+ public Object getField(int rowNum, String fieldName) {
+ if (rowNum != 0) {
+ return null;
+ }
+ try {
+ int fieldPosition = fieldPositionMap.get(fieldName);
+ return converters[fieldPosition].convert(rowData, fieldPosition);
+ } catch (Throwable e) {
+ log.error("failed to convert field={}", fieldName, e);
+ return null;
+ }
+ }
+
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java
new file mode 100644
index 0000000000..fdd6c4ce08
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.RowDataSourceInfo;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sort.formats.base.RowDataToFieldConverters;
+import org.apache.inlong.sort.formats.base.TableFormatForRowDataUtils;
+
+import org.apache.flink.table.data.RowData;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RowDataSourceDecoder extends SourceDecoder<RowData> {
+
+ private final Map<String, Integer> fieldPositionMap;
+ private final RowDataToFieldConverters.RowFieldConverter[]
rowFieldConverters;
+
+ public RowDataSourceDecoder(RowDataSourceInfo sourceInfo) {
+ super(sourceInfo.getFields());
+ List<FieldInfo> fields = sourceInfo.getFields();
+ this.fieldPositionMap = parseFieldPositionMap(fields);
+
+ rowFieldConverters = new
RowDataToFieldConverters.RowFieldConverter[fields.size()];
+ for (int i = 0; i < rowFieldConverters.length; i++) {
+ rowFieldConverters[i] =
RowDataToFieldConverters.createNullableRowFieldConverter(
+
TableFormatForRowDataUtils.deriveLogicalType(fields.get(i).getFormatInfo()));
+ }
+ }
+
+ private Map<String, Integer> parseFieldPositionMap(List<FieldInfo> fields)
{
+ Map<String, Integer> map = new HashMap<>();
+ for (int i = 0; i < fields.size(); i++) {
+ map.put(fields.get(i).getName(), i);
+ }
+ return map;
+ }
+
+ @Override
+ public SourceData decode(byte[] srcBytes, Context context) {
+ throw new UnsupportedOperationException("do not support decoding bytes
for row data decoder");
+ }
+
+ @Override
+ public SourceData decode(RowData rowData, Context context) {
+ return new RowDataSourceData(rowData, fieldPositionMap,
rowFieldConverters);
+ }
+
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java
index 76e856be14..dafd891435 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java
@@ -24,6 +24,7 @@ import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo;
import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
import org.apache.inlong.sdk.transform.pojo.ParquetSourceInfo;
import org.apache.inlong.sdk.transform.pojo.PbSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.RowDataSourceInfo;
import org.apache.inlong.sdk.transform.pojo.XmlSourceInfo;
import org.apache.inlong.sdk.transform.pojo.YamlSourceInfo;
@@ -65,4 +66,8 @@ public class SourceDecoderFactory {
return new YamlSourceDecoder(sourceInfo);
}
+ public static RowDataSourceDecoder createRowDecoder(RowDataSourceInfo
sourceInfo) {
+ return new RowDataSourceDecoder(sourceInfo);
+ }
+
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java
new file mode 100644
index 0000000000..f2203cb3cd
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.encode;
+
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.RowDataSinkInfo;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
+import org.apache.inlong.sort.formats.base.TableFormatUtils;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RowDataSinkEncoder extends SinkEncoder<RowData> {
+
+ private final FieldToRowDataConverters.FieldToRowDataConverter[]
fieldToRowDataConverters;
+ private final Map<String, Integer> fieldPositionMap;
+
+ public RowDataSinkEncoder(RowDataSinkInfo sinkInfo) {
+ super(sinkInfo.getFields());
+ this.fieldPositionMap = parseFieldPositionMap(fields);
+
+ fieldToRowDataConverters = new
FieldToRowDataConverters.FieldToRowDataConverter[fields.size()];
+ for (int i = 0; i < fields.size(); i++) {
+ fieldToRowDataConverters[i] =
FieldToRowDataConverters.createConverter(
+
TableFormatUtils.deriveLogicalType(fields.get(i).getFormatInfo()));
+ }
+ }
+
+ private Map<String, Integer> parseFieldPositionMap(List<FieldInfo> fields)
{
+ Map<String, Integer> map = new HashMap<>();
+ for (int i = 0; i < fields.size(); i++) {
+ map.put(fields.get(i).getName(), i);
+ }
+ return map;
+ }
+
+ @Override
+ public RowData encode(SinkData sinkData, Context context) {
+ GenericRowData rowData = new
GenericRowData(fieldToRowDataConverters.length);
+
+ for (int i = 0; i < fields.size(); i++) {
+ String fieldName = fields.get(i).getName();
+ String fieldValue = sinkData.getField(fieldName);
+ rowData.setField(i,
fieldToRowDataConverters[i].convert(fieldValue));
+ }
+
+ return rowData;
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
index 1778aba180..8d82970e7a 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
@@ -22,6 +22,7 @@ import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
import org.apache.inlong.sdk.transform.pojo.MapSinkInfo;
import org.apache.inlong.sdk.transform.pojo.ParquetSinkInfo;
import org.apache.inlong.sdk.transform.pojo.PbSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.RowDataSinkInfo;
public class SinkEncoderFactory {
@@ -45,4 +46,8 @@ public class SinkEncoderFactory {
return new PbSinkEncoder(pbSinkInfo);
}
+ public static RowDataSinkEncoder createRowEncoder(RowDataSinkInfo
rowDataSinkInfo) {
+ return new RowDataSinkEncoder(rowDataSinkInfo);
+ }
+
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
index fe08d00bb1..eaf1b7a9eb 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sdk.transform.pojo;
+import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
import org.apache.inlong.sdk.transform.process.converter.TypeConverter;
import lombok.Data;
@@ -28,6 +29,7 @@ import lombok.Data;
public class FieldInfo {
private String name;
+ private FormatInfo formatInfo;
private TypeConverter converter = TypeConverter.DefaultTypeConverter();
public FieldInfo() {
@@ -42,4 +44,9 @@ public class FieldInfo {
this.name = name;
this.converter = converter;
}
+
+ public FieldInfo(String name, TypeConverter converter, FormatInfo
formatInfo) {
+ this(name, converter);
+ this.formatInfo = formatInfo;
+ }
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/RowDataSinkInfo.java
similarity index 64%
copy from
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
copy to
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/RowDataSinkInfo.java
index fe08d00bb1..d88ddbb6eb 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/RowDataSinkInfo.java
@@ -17,29 +17,21 @@
package org.apache.inlong.sdk.transform.pojo;
-import org.apache.inlong.sdk.transform.process.converter.TypeConverter;
-
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
+import lombok.experimental.SuperBuilder;
-/**
- * FieldInfo
- */
-@Data
-public class FieldInfo {
-
- private String name;
- private TypeConverter converter = TypeConverter.DefaultTypeConverter();
+import java.util.List;
- public FieldInfo() {
-
- }
+@JsonIgnoreProperties(ignoreUnknown = true)
+@Data
+@SuperBuilder
+public class RowDataSinkInfo extends SinkInfo {
- public FieldInfo(String name) {
- this(name, TypeConverter.DefaultTypeConverter());
- }
+ private List<FieldInfo> fields;
- public FieldInfo(String name, TypeConverter converter) {
- this.name = name;
- this.converter = converter;
+ public RowDataSinkInfo(String charset, List<FieldInfo> fields) {
+ super(SinkInfo.ROWDATA, charset);
+ this.fields = fields;
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/RowDataSourceInfo.java
similarity index 63%
copy from
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
copy to
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/RowDataSourceInfo.java
index fe08d00bb1..18aa936bc9 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/RowDataSourceInfo.java
@@ -17,29 +17,21 @@
package org.apache.inlong.sdk.transform.pojo;
-import org.apache.inlong.sdk.transform.process.converter.TypeConverter;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Getter;
+import lombok.experimental.SuperBuilder;
-import lombok.Data;
+import java.util.List;
-/**
- * FieldInfo
- */
-@Data
-public class FieldInfo {
-
- private String name;
- private TypeConverter converter = TypeConverter.DefaultTypeConverter();
-
- public FieldInfo() {
+@JsonIgnoreProperties(ignoreUnknown = true)
+@SuperBuilder
+@Getter
+public class RowDataSourceInfo extends SourceInfo {
- }
-
- public FieldInfo(String name) {
- this(name, TypeConverter.DefaultTypeConverter());
- }
+ private List<FieldInfo> fields;
- public FieldInfo(String name, TypeConverter converter) {
- this.name = name;
- this.converter = converter;
+ public RowDataSourceInfo(String charset, List<FieldInfo> fields) {
+ super(charset);
+ this.fields = fields;
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
index 022af73fd5..89cf599d98 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
@@ -46,6 +46,7 @@ public abstract class SinkInfo {
public static final String ES_MAP = "es_map";
public static final String PARQUET = "parquet";
public static final String PB = "pb";
+ public static final String ROWDATA = "rowdata";
@JsonIgnore
private String type;
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
index 3322d83199..bc0fb0371e 100644
---
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sdk.transform.process.processor;
+import
org.apache.inlong.common.pojo.sort.dataflow.field.format.StringFormatInfo;
import org.apache.inlong.sdk.transform.decode.ParquetInputByteArray;
import org.apache.inlong.sdk.transform.pojo.FieldInfo;
@@ -48,6 +49,7 @@ public abstract class AbstractProcessorTestBase {
for (String fieldName : fieldNames) {
FieldInfo field = new FieldInfo();
field.setName(fieldName);
+ field.setFormatInfo(new StringFormatInfo());
fields.add(field);
}
return fields;
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestRowData2RowDataProcessor.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestRowData2RowDataProcessor.java
new file mode 100644
index 0000000000..9bb997e9af
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestRowData2RowDataProcessor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.processor;
+
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.RowDataSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.RowDataSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+public class TestRowData2RowDataProcessor extends AbstractProcessorTestBase {
+
+ @Test
+ public void testRowData2RowData() throws Exception {
+ List<FieldInfo> fields1 = this.getTestFieldList("sid", "packageID",
"msgTime", "msg");
+ RowDataSourceInfo sourceInfo = new RowDataSourceInfo("utf-8", fields1);
+ List<FieldInfo> fields2 = this.getTestFieldList("f1", "f2", "f3",
"f4");
+ RowDataSinkInfo sinkInfo = new RowDataSinkInfo("utf-8", fields2);
+
+ String transformSql = "select msgTime ,msg, packageID, sid";
+ TransformConfig config = new TransformConfig(transformSql);
+ TransformProcessor<RowData, RowData> processor =
+ TransformProcessor.create(
+ config,
+ SourceDecoderFactory.createRowDecoder(sourceInfo),
+ SinkEncoderFactory.createRowEncoder(sinkInfo));
+
+ RowData sourceRow = createRowData();
+
+ List<RowData> sinkRow = processor.transform(sourceRow);
+ RowData expectedRow = sinkRow.get(0);
+ Assert.assertEquals("2024-12-19T11:00:55.212",
expectedRow.getString(0).toString());
+ Assert.assertEquals("msg111", expectedRow.getString(1).toString());
+ Assert.assertEquals("pack123", expectedRow.getString(2).toString());
+ Assert.assertEquals("s123", expectedRow.getString(3).toString());
+
+ }
+
+ private RowData createRowData() {
+ GenericRowData rowData = new GenericRowData(4);
+ rowData.setField(0, StringData.fromString("s123"));
+ rowData.setField(1, StringData.fromString("pack123"));
+ rowData.setField(2, StringData.fromString("2024-12-19T11:00:55.212"));
+ rowData.setField(3, StringData.fromString("msg111"));
+ return rowData;
+ }
+}