This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8044a40b83 [INLONG-11382][SDK] Optimize all columns select of 
Transform SDK (#11383)
8044a40b83 is described below

commit 8044a40b836f5fa0a23099a212ad8a1a447d2b31
Author: vernedeng <[email protected]>
AuthorDate: Tue Oct 22 09:37:16 2024 +0800

    [INLONG-11382][SDK] Optimize all columns select of Transform SDK (#11383)
---
 .../sdk/transform/decode/AvroSourceDecoder.java    |  2 +-
 .../sdk/transform/decode/BsonSourceDecoder.java    |  2 +-
 .../sdk/transform/decode/CsvSourceDecoder.java     |  6 ++----
 .../sdk/transform/decode/JsonSourceDecoder.java    |  2 +-
 .../sdk/transform/decode/KvSourceDecoder.java      |  6 ++----
 .../sdk/transform/decode/ParquetSourceDecoder.java |  2 +-
 .../sdk/transform/decode/PbSourceDecoder.java      |  2 +-
 .../inlong/sdk/transform/decode/SourceDecoder.java | 23 +++++++++++++++++++---
 .../sdk/transform/decode/XmlSourceDecoder.java     |  2 +-
 .../sdk/transform/decode/YamlSourceDecoder.java    |  2 +-
 .../sdk/transform/encode/CsvSinkEncoder.java       | 13 ++----------
 .../inlong/sdk/transform/encode/KvSinkEncoder.java | 14 ++-----------
 .../sdk/transform/encode/MapSinkEncoder.java       | 11 +++--------
 .../sdk/transform/encode/ParquetSinkEncoder.java   | 12 ++---------
 .../inlong/sdk/transform/encode/PbSinkEncoder.java | 10 +++-------
 .../inlong/sdk/transform/encode/SinkEncoder.java   | 18 ++++++++++++++---
 .../inlong/sdk/transform/pojo/FieldInfo.java       |  4 ++++
 .../sdk/transform/process/TransformProcessor.java  | 13 ++++++------
 .../process/processor/TestCsv2StarProcessor.java   |  2 +-
 19 files changed, 69 insertions(+), 77 deletions(-)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceDecoder.java
index 0f71f28209..992dea88c6 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceDecoder.java
@@ -36,7 +36,7 @@ import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
 
-public class AvroSourceDecoder implements SourceDecoder<byte[]> {
+public class AvroSourceDecoder extends SourceDecoder<byte[]> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AvroSourceDecoder.class);
 
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceDecoder.java
index 880467ea43..6dcde13ade 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceDecoder.java
@@ -32,7 +32,7 @@ import java.math.BigDecimal;
  * BsonSourceDecoder
  */
 @Slf4j
-public class BsonSourceDecoder implements SourceDecoder<byte[]> {
+public class BsonSourceDecoder extends SourceDecoder<byte[]> {
 
     private final JsonSourceDecoder decoder;
 
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
index 2ff65b26d9..830d5c2c01 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
@@ -25,21 +25,20 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 
 import java.nio.charset.Charset;
-import java.util.List;
 
 /**
  * CsvSourceDecoder
  * 
  */
-public class CsvSourceDecoder implements SourceDecoder<String> {
+public class CsvSourceDecoder extends SourceDecoder<String> {
 
     protected CsvSourceInfo sourceInfo;
     private Charset srcCharset = Charset.defaultCharset();
     private Character delimiter = '|';
     private Character escapeChar = null;
-    private List<FieldInfo> fields;
 
     public CsvSourceDecoder(CsvSourceInfo sourceInfo) {
+        super(sourceInfo.getFields());
         this.sourceInfo = sourceInfo;
         if (sourceInfo.getDelimiter() != null) {
             this.delimiter = sourceInfo.getDelimiter();
@@ -50,7 +49,6 @@ public class CsvSourceDecoder implements 
SourceDecoder<String> {
         if (!StringUtils.isBlank(sourceInfo.getCharset())) {
             this.srcCharset = Charset.forName(sourceInfo.getCharset());
         }
-        this.fields = sourceInfo.getFields();
     }
 
     @Override
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
index ec67d95d9e..0332094090 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
@@ -35,7 +35,7 @@ import java.util.List;
  * JsonSourceDecoder
  * 
  */
-public class JsonSourceDecoder implements SourceDecoder<String> {
+public class JsonSourceDecoder extends SourceDecoder<String> {
 
     protected JsonSourceInfo sourceInfo;
     private Charset srcCharset = Charset.defaultCharset();
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
index 2bb1c366b0..e62fcccc37 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
@@ -32,7 +32,7 @@ import java.util.Map;
  * KvSourceDecoder
  * 
  */
-public class KvSourceDecoder implements SourceDecoder<String> {
+public class KvSourceDecoder extends SourceDecoder<String> {
 
     protected KvSourceInfo sourceInfo;
     private Character entryDelimiter = '&';
@@ -41,9 +41,9 @@ public class KvSourceDecoder implements SourceDecoder<String> 
{
     private Character quoteChar = '\"';
     private Character lineDelimiter = '\n';
     private Charset srcCharset = Charset.defaultCharset();
-    private List<FieldInfo> fields;
 
     public KvSourceDecoder(KvSourceInfo sourceInfo) {
+        super(sourceInfo.getFields());
         this.sourceInfo = sourceInfo;
         if (!StringUtils.isBlank(sourceInfo.getCharset())) {
             this.srcCharset = Charset.forName(sourceInfo.getCharset());
@@ -63,8 +63,6 @@ public class KvSourceDecoder implements SourceDecoder<String> 
{
         if (sourceInfo.getLineDelimiter() != null) {
             this.lineDelimiter = sourceInfo.getLineDelimiter();
         }
-
-        this.fields = sourceInfo.getFields();
     }
 
     @Override
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceDecoder.java
index 11312370eb..85e3ba319d 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceDecoder.java
@@ -41,7 +41,7 @@ import java.nio.charset.Charset;
 /**
  * PbSourceDecoder
  */
-public class ParquetSourceDecoder implements SourceDecoder<byte[]> {
+public class ParquetSourceDecoder extends SourceDecoder<byte[]> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ParquetSourceDecoder.class);
 
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
index 48f3749c45..4ee704bd0e 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
@@ -39,7 +39,7 @@ import java.util.concurrent.ConcurrentHashMap;
  * PbSourceDecoder
  * 
  */
-public class PbSourceDecoder implements SourceDecoder<byte[]> {
+public class PbSourceDecoder extends SourceDecoder<byte[]> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PbSourceDecoder.class);
 
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
index 2e410d24c3..26bbbbbaac 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
@@ -17,15 +17,32 @@
 
 package org.apache.inlong.sdk.transform.decode;
 
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
 import org.apache.inlong.sdk.transform.process.Context;
 
+import com.google.common.collect.ImmutableList;
+import lombok.Getter;
+
+import java.util.List;
+
 /**
  * SourceDecoder
  */
-public interface SourceDecoder<Input> {
+@Getter
+public abstract class SourceDecoder<Input> {
+
+    protected final List<FieldInfo> fields;
+
+    public SourceDecoder() {
+        this(ImmutableList.of());
+    }
+
+    public SourceDecoder(List<FieldInfo> fields) {
+        this.fields = fields;
+    }
 
-    SourceData decode(byte[] srcBytes, Context context);
+    public abstract SourceData decode(byte[] srcBytes, Context context);
 
-    SourceData decode(Input input, Context context);
+    public abstract SourceData decode(Input input, Context context);
 
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/XmlSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/XmlSourceDecoder.java
index 9f86c16eff..a9a6bb9a66 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/XmlSourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/XmlSourceDecoder.java
@@ -37,7 +37,7 @@ import java.util.Map;
  * XmlSourceDecoder
  */
 @Slf4j
-public class XmlSourceDecoder implements SourceDecoder<String> {
+public class XmlSourceDecoder extends SourceDecoder<String> {
 
     protected XmlSourceInfo sourceInfo;
     private Charset srcCharset = Charset.defaultCharset();
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceDecoder.java
index b7a2ba915f..5be1a0a93f 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceDecoder.java
@@ -32,7 +32,7 @@ import java.util.List;
 import java.util.Map;
 
 @Slf4j
-public class YamlSourceDecoder implements SourceDecoder<String> {
+public class YamlSourceDecoder extends SourceDecoder<String> {
 
     protected YamlSourceInfo sourceInfo;
     private Charset srcCharset = Charset.defaultCharset();
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
index 89f6f364a0..97b43c02b9 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
@@ -24,21 +24,20 @@ import org.apache.inlong.sdk.transform.process.Context;
 import org.apache.commons.lang3.StringUtils;
 
 import java.nio.charset.Charset;
-import java.util.List;
 
 /**
  * CsvSinkEncoder
  */
-public class CsvSinkEncoder implements SinkEncoder<String> {
+public class CsvSinkEncoder extends SinkEncoder<String> {
 
     protected CsvSinkInfo sinkInfo;
     protected Charset sinkCharset = Charset.defaultCharset();
     private Character delimiter = '|';
     private Character escapeChar = null;
-    private List<FieldInfo> fields;
     private StringBuilder builder = new StringBuilder();
 
     public CsvSinkEncoder(CsvSinkInfo sinkInfo) {
+        super(sinkInfo.getFields());
         this.sinkInfo = sinkInfo;
         if (sinkInfo.getDelimiter() != null) {
             this.delimiter = sinkInfo.getDelimiter();
@@ -49,7 +48,6 @@ public class CsvSinkEncoder implements SinkEncoder<String> {
         if (!StringUtils.isBlank(sinkInfo.getCharset())) {
             this.sinkCharset = Charset.forName(sinkInfo.getCharset());
         }
-        this.fields = sinkInfo.getFields();
     }
 
     /**
@@ -89,11 +87,4 @@ public class CsvSinkEncoder implements SinkEncoder<String> {
         return builder.substring(0, builder.length() - 1);
     }
 
-    /**
-     * get fields
-     * @return the fields
-     */
-    public List<FieldInfo> getFields() {
-        return fields;
-    }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
index 2822374c41..094f4d6884 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
@@ -24,21 +24,20 @@ import org.apache.inlong.sdk.transform.process.Context;
 import org.apache.commons.lang3.StringUtils;
 
 import java.nio.charset.Charset;
-import java.util.List;
 
 /**
  * KvSinkEncoder
  */
-public class KvSinkEncoder implements SinkEncoder<String> {
+public class KvSinkEncoder extends SinkEncoder<String> {
 
     protected KvSinkInfo sinkInfo;
     protected Charset sinkCharset = Charset.defaultCharset();
     private Character entryDelimiter = '&';
     private Character kvDelimiter = '=';
-    private List<FieldInfo> fields;
     private StringBuilder builder = new StringBuilder();
 
     public KvSinkEncoder(KvSinkInfo sinkInfo) {
+        super(sinkInfo.getFields());
         this.sinkInfo = sinkInfo;
         if (!StringUtils.isBlank(sinkInfo.getCharset())) {
             this.sinkCharset = Charset.forName(sinkInfo.getCharset());
@@ -49,7 +48,6 @@ public class KvSinkEncoder implements SinkEncoder<String> {
         if (sinkInfo.getKvDelimiter() != null) {
             this.kvDelimiter = sinkInfo.getKvDelimiter();
         }
-        this.fields = sinkInfo.getFields();
     }
 
     /**
@@ -78,12 +76,4 @@ public class KvSinkEncoder implements SinkEncoder<String> {
         }
         return builder.substring(0, builder.length() - 1);
     }
-
-    /**
-     * get fields
-     * @return the fields
-     */
-    public List<FieldInfo> getFields() {
-        return fields;
-    }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/MapSinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/MapSinkEncoder.java
index 0b05ce6076..c76c4e80ff 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/MapSinkEncoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/MapSinkEncoder.java
@@ -25,17 +25,17 @@ import 
org.apache.inlong.sdk.transform.process.converter.TypeConverter;
 import lombok.extern.slf4j.Slf4j;
 
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
 @Slf4j
-public class MapSinkEncoder implements SinkEncoder<Map<String, Object>> {
+public class MapSinkEncoder extends SinkEncoder<Map<String, Object>> {
 
     private final MapSinkInfo sinkInfo;
     private final Map<String, TypeConverter> converters;
 
     public MapSinkEncoder(MapSinkInfo sinkInfo) {
+        super(sinkInfo.getFields());
         this.sinkInfo = sinkInfo;
         this.converters = sinkInfo.getFields()
                 .stream()
@@ -47,7 +47,7 @@ public class MapSinkEncoder implements 
SinkEncoder<Map<String, Object>> {
     @Override
     public Map<String, Object> encode(SinkData sinkData, Context context) {
         Map<String, Object> esMap = new HashMap<>();
-        for (FieldInfo fieldInfo : sinkInfo.getFields()) {
+        for (FieldInfo fieldInfo : fields) {
             String fieldName = fieldInfo.getName();
             String strValue = sinkData.getField(fieldName);
             TypeConverter converter = converters.get(fieldName);
@@ -65,9 +65,4 @@ public class MapSinkEncoder implements 
SinkEncoder<Map<String, Object>> {
 
         return esMap;
     }
-
-    @Override
-    public List<FieldInfo> getFields() {
-        return sinkInfo.getFields();
-    }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetSinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetSinkEncoder.java
index 168d7d0c44..6d377b061c 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetSinkEncoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetSinkEncoder.java
@@ -28,7 +28,6 @@ import org.apache.parquet.schema.Types;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -38,17 +37,14 @@ import static 
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
 /**
  * ParquetSinkEncoder
  */
-public class ParquetSinkEncoder implements SinkEncoder<ByteArrayOutputStream> {
+public class ParquetSinkEncoder extends SinkEncoder<ByteArrayOutputStream> {
 
     protected ParquetSinkInfo sinkInfo;
-    protected Charset sinkCharset = Charset.defaultCharset();
-
-    private final List<FieldInfo> fields;
     private ParquetByteArrayWriter<Object[]> writer;
 
     public ParquetSinkEncoder(ParquetSinkInfo sinkInfo) {
+        super(sinkInfo.getFields());
         this.sinkInfo = sinkInfo;
-        this.fields = sinkInfo.getFields();
         ArrayList<Type> typesList = new ArrayList<>();
         for (FieldInfo fieldInfo : this.fields) {
             typesList.add(Types.required(BINARY)
@@ -88,10 +84,6 @@ public class ParquetSinkEncoder implements 
SinkEncoder<ByteArrayOutputStream> {
         return writer.getByteArrayOutputStream();
     }
 
-    @Override
-    public List<FieldInfo> getFields() {
-        return this.fields;
-    }
     public byte[] mergeByteArray(List<ByteArrayOutputStream> list) {
         if (list.isEmpty()) {
             return null;
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/PbSinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/PbSinkEncoder.java
index 367152a3ae..226405c515 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/PbSinkEncoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/PbSinkEncoder.java
@@ -28,10 +28,9 @@ import com.google.protobuf.DynamicMessage;
 
 import java.util.Base64;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
-public class PbSinkEncoder implements SinkEncoder<byte[]> {
+public class PbSinkEncoder extends SinkEncoder<byte[]> {
 
     protected PbSinkInfo sinkInfo;
 
@@ -40,9 +39,10 @@ public class PbSinkEncoder implements SinkEncoder<byte[]> {
     private final Map<String, Descriptors.FieldDescriptor.Type> fieldTypes;
 
     public PbSinkEncoder(PbSinkInfo pbSinkInfo) {
+        super(pbSinkInfo.getFields());
         this.sinkInfo = pbSinkInfo;
         this.fieldTypes = new HashMap<>();
-        for (FieldInfo field : pbSinkInfo.getFields()) {
+        for (FieldInfo field : this.fields) {
             fieldTypes.put(field.getName(), 
Descriptors.FieldDescriptor.Type.STRING);
         }
         // decode protoDescription
@@ -108,8 +108,4 @@ public class PbSinkEncoder implements SinkEncoder<byte[]> {
         }
     }
 
-    @Override
-    public List<FieldInfo> getFields() {
-        return sinkInfo.getFields();
-    }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
index a63f970295..bd804cd771 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
@@ -20,16 +20,28 @@ package org.apache.inlong.sdk.transform.encode;
 import org.apache.inlong.sdk.transform.pojo.FieldInfo;
 import org.apache.inlong.sdk.transform.process.Context;
 
+import com.google.common.collect.ImmutableList;
+import lombok.Getter;
+
 import java.util.List;
 
 /**
  * SinkEncoder
  */
-public interface SinkEncoder<Output> {
+@Getter
+public abstract class SinkEncoder<Output> {
 
     public static final String ALL_SOURCE_FIELD_SIGN = "*";
 
-    Output encode(SinkData sinkData, Context context);
+    protected final List<FieldInfo> fields;
+
+    public SinkEncoder() {
+        this(ImmutableList.of());
+    }
+
+    public SinkEncoder(List<FieldInfo> fields) {
+        this.fields = fields;
+    }
 
-    List<FieldInfo> getFields();
+    public abstract Output encode(SinkData sinkData, Context context);
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
index 2a7834112a..fe08d00bb1 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
@@ -34,6 +34,10 @@ public class FieldInfo {
 
     }
 
+    public FieldInfo(String name) {
+        this(name, TypeConverter.DefaultTypeConverter());
+    }
+
     public FieldInfo(String name, TypeConverter converter) {
         this.name = name;
         this.converter = converter;
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
index 6d650cfd4a..6cfd3cd5f7 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
@@ -25,19 +25,19 @@ import org.apache.inlong.sdk.transform.pojo.FieldInfo;
 import org.apache.inlong.sdk.transform.pojo.TransformConfig;
 import org.apache.inlong.sdk.transform.process.operator.ExpressionOperator;
 import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ColumnParser;
 import org.apache.inlong.sdk.transform.process.parser.ValueParser;
 
 import com.google.common.collect.ImmutableMap;
 import net.sf.jsqlparser.JSQLParserException;
 import net.sf.jsqlparser.parser.CCJSqlParserManager;
+import net.sf.jsqlparser.schema.Column;
 import net.sf.jsqlparser.statement.select.AllColumns;
 import net.sf.jsqlparser.statement.select.PlainSelect;
 import net.sf.jsqlparser.statement.select.Select;
 import net.sf.jsqlparser.statement.select.SelectExpressionItem;
 import net.sf.jsqlparser.statement.select.SelectItem;
 import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.StringReader;
 import java.util.ArrayList;
@@ -50,8 +50,6 @@ import java.util.Map;
  */
 public class TransformProcessor<I, O> {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(TransformProcessor.class);
-
     private static final Map<String, Object> EMPTY_EXT_PARAMS = 
ImmutableMap.of();
     private static final String DUMMY_SELECT = "select *";
 
@@ -127,9 +125,10 @@ public class TransformProcessor<I, O> {
                 this.selectItems
                         .add(new ValueParserNode(fieldName, 
OperatorTools.buildParser(exprItem.getExpression())));
             } else if (item instanceof AllColumns) {
-                fieldName = item.toString();
-                this.encoder.getFields().clear();
-                this.selectItems.add(new ValueParserNode(fieldName, null));
+                for (FieldInfo fieldInfo : decoder.getFields()) {
+                    String name = fieldInfo.getName();
+                    this.selectItems.add(new ValueParserNode(name, new 
ColumnParser(new Column(name))));
+                }
             }
         }
     }
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2StarProcessor.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2StarProcessor.java
index 9fdbf0b5d0..bcd1cfe11a 100644
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2StarProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2StarProcessor.java
@@ -76,6 +76,6 @@ public class TestCsv2StarProcessor extends 
AbstractProcessorTestBase {
 
         List<String> output4 = processor4.transform("2024-04-28 00:00:00|nok", 
new HashMap<>());
         Assert.assertEquals(1, output4.size());
-        Assert.assertEquals(output4.get(0), "2024-04-28 
00:00:00|nok|nok|2024-04-28 00:00:00");
+        Assert.assertEquals(output4.get(0), "nok|2024-04-28 00:00:00");
     }
 }

Reply via email to