This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new edf93bd547 [INLONG-10999][SDK] Support to return raw data by star sign 
in transformer SQL (#11004)
edf93bd547 is described below

commit edf93bd547a09df6b1d7a6d57ea3d719d9f63f4f
Author: ChunLiang Lu <[email protected]>
AuthorDate: Tue Sep 3 21:49:46 2024 +0800

    [INLONG-10999][SDK] Support to return raw data by star sign in transformer 
SQL (#11004)
    
    * [INLONG-10999][SDK] Support to return raw data by star sign in 
transformer SQL
    
    * add more UT Case
    
    * fix code format problems
    
    * fix pom.xml problem
---
 .../sdk/transform/encode/CsvSinkEncoder.java       |  6 +-
 .../inlong/sdk/transform/encode/KvSinkEncoder.java |  6 +-
 .../inlong/sdk/transform/encode/SinkEncoder.java   |  2 +
 .../sdk/transform/process/TransformProcessor.java  | 34 +++++++---
 .../ValueParserNode.java}                          | 19 +++---
 .../transform/process/TestTransformProcessor.java  | 76 ++++++++++++++++++++++
 6 files changed, 122 insertions(+), 21 deletions(-)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
index ce47a0072c..89f6f364a0 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
@@ -66,7 +66,11 @@ public class CsvSinkEncoder implements SinkEncoder<String> {
             } else {
                 for (String fieldName : sinkData.keyList()) {
                     String fieldValue = sinkData.getField(fieldName);
-                    EscapeUtils.escapeContent(builder, delimiter, escapeChar, 
fieldValue);
+                    if (StringUtils.equals(fieldName, ALL_SOURCE_FIELD_SIGN)) {
+                        builder.append(fieldValue);
+                    } else {
+                        EscapeUtils.escapeContent(builder, delimiter, 
escapeChar, fieldValue);
+                    }
                     builder.append(delimiter);
                 }
             }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
index 7460ec95c2..2822374c41 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
@@ -63,7 +63,11 @@ public class KvSinkEncoder implements SinkEncoder<String> {
         if (fields == null || fields.size() == 0) {
             for (String fieldName : sinkData.keyList()) {
                 String fieldValue = sinkData.getField(fieldName);
-                
builder.append(fieldName).append(kvDelimiter).append(fieldValue).append(entryDelimiter);
+                if (StringUtils.equals(fieldName, ALL_SOURCE_FIELD_SIGN)) {
+                    builder.append(fieldValue).append(entryDelimiter);
+                } else {
+                    
builder.append(fieldName).append(kvDelimiter).append(fieldValue).append(entryDelimiter);
+                }
             }
         } else {
             for (FieldInfo field : fields) {
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
index 7f845a99d6..a63f970295 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
@@ -27,6 +27,8 @@ import java.util.List;
  */
 public interface SinkEncoder<Output> {
 
+    public static final String ALL_SOURCE_FIELD_SIGN = "*";
+
     Output encode(SinkData sinkData, Context context);
 
     List<FieldInfo> getFields();
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
index 9944268dda..acb7e62e07 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
@@ -31,23 +31,23 @@ import 
org.apache.inlong.sdk.transform.process.parser.ValueParser;
 import com.google.common.collect.ImmutableMap;
 import net.sf.jsqlparser.JSQLParserException;
 import net.sf.jsqlparser.parser.CCJSqlParserManager;
+import net.sf.jsqlparser.statement.select.AllColumns;
 import net.sf.jsqlparser.statement.select.PlainSelect;
 import net.sf.jsqlparser.statement.select.Select;
 import net.sf.jsqlparser.statement.select.SelectExpressionItem;
 import net.sf.jsqlparser.statement.select.SelectItem;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.StringReader;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 
 /**
  * TransformProcessor
- * 
+ *
  */
 public class TransformProcessor<I, O> {
 
@@ -61,7 +61,9 @@ public class TransformProcessor<I, O> {
 
     private PlainSelect transformSelect;
     private ExpressionOperator where;
-    private Map<String, ValueParser> selectItemMap;
+    private List<ValueParserNode> selectItems;
+
+    private boolean includeAllSourceFields = false;
 
     public static <I, O> TransformProcessor<I, O> create(
             TransformConfig config,
@@ -91,7 +93,7 @@ public class TransformProcessor<I, O> {
         this.transformSelect = (PlainSelect) select.getSelectBody();
         this.where = 
OperatorTools.buildOperator(this.transformSelect.getWhere());
         List<SelectItem> items = this.transformSelect.getSelectItems();
-        this.selectItemMap = new HashMap<>(items.size());
+        this.selectItems = new ArrayList<>(items.size());
         List<FieldInfo> fields = this.encoder.getFields();
         for (int i = 0; i < items.size(); i++) {
             SelectItem item = items.get(i);
@@ -108,8 +110,12 @@ public class TransformProcessor<I, O> {
                         fieldName = exprItem.getAlias().getName();
                     }
                 }
-                this.selectItemMap.put(fieldName,
-                        OperatorTools.buildParser(exprItem.getExpression()));
+                this.selectItems
+                        .add(new ValueParserNode(fieldName, 
OperatorTools.buildParser(exprItem.getExpression())));
+            } else if (item instanceof AllColumns) {
+                fieldName = item.toString();
+                this.encoder.getFields().clear();
+                this.selectItems.add(new ValueParserNode(fieldName, null));
             }
         }
     }
@@ -137,10 +143,18 @@ public class TransformProcessor<I, O> {
 
             // parse value
             SinkData sinkData = new DefaultSinkData();
-            for (Entry<String, ValueParser> entry : 
this.selectItemMap.entrySet()) {
-                String fieldName = entry.getKey();
+            for (ValueParserNode node : this.selectItems) {
+                String fieldName = node.getFieldName();
+                ValueParser parser = node.getParser();
+                if (parser == null && StringUtils.equals(fieldName, 
SinkEncoder.ALL_SOURCE_FIELD_SIGN)) {
+                    if (input instanceof String) {
+                        sinkData.addField(fieldName, (String) input);
+                    } else {
+                        sinkData.addField(fieldName, "");
+                    }
+                    continue;
+                }
                 try {
-                    ValueParser parser = entry.getValue();
                     Object fieldValue = parser.parse(sourceData, i, context);
                     sinkData.addField(fieldName, String.valueOf(fieldValue));
                 } catch (Throwable t) {
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/ValueParserNode.java
similarity index 71%
copy from 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
copy to 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/ValueParserNode.java
index 7f845a99d6..e36c0c9c6a 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/ValueParserNode.java
@@ -15,19 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sdk.transform.encode;
+package org.apache.inlong.sdk.transform.process;
 
-import org.apache.inlong.sdk.transform.pojo.FieldInfo;
-import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
 
-import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Data;
 
 /**
- * SinkEncoder
+ * ValueParserNode
  */
-public interface SinkEncoder<Output> {
+@AllArgsConstructor
+@Data
+public class ValueParserNode {
 
-    Output encode(SinkData sinkData, Context context);
-
-    List<FieldInfo> getFields();
+    private String fieldName;
+    private ValueParser parser;
 }
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
index 3413f1aca3..8448260252 100644
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
@@ -350,4 +350,80 @@ public class TestTransformProcessor {
         List<String> output = processor.transform(srcBytes, new HashMap<>());
         Assert.assertEquals(2, output.size());
     }
+    @Test
+    public void testCsv2Star() throws Exception {
+        List<FieldInfo> fields = this.getTestFieldList("ftime", "extinfo");
+        CsvSourceInfo csvSource = new CsvSourceInfo("UTF-8", '|', '\\', 
fields);
+        CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', new 
ArrayList<>());
+        String transformSql = "select *";
+        TransformConfig config = new TransformConfig(transformSql);
+        // case1
+        TransformProcessor<String, String> processor1 = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createCsvEncoder(csvSink));
+
+        List<String> output1 = processor1.transform("2024-04-28 00:00:00|ok", 
new HashMap<>());
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok");
+        // case2
+        config.setTransformSql("select * from source where extinfo!='ok'");
+        TransformProcessor<String, String> processor2 = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createCsvEncoder(csvSink));
+
+        List<String> output2 = processor2.transform("2024-04-28 00:00:00|ok", 
new HashMap<>());
+        Assert.assertEquals(0, output2.size());
+        // case3
+        config.setTransformSql("select *,extinfo,ftime from source where 
extinfo!='ok'");
+        TransformProcessor<String, String> processor3 = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createCsvEncoder(csvSink));
+
+        List<String> output3 = processor3.transform("2024-04-28 00:00:00|nok", 
new HashMap<>());
+        Assert.assertEquals(1, output3.size());
+        Assert.assertEquals(output3.get(0), "2024-04-28 
00:00:00|nok|nok|2024-04-28 00:00:00");
+        // case4
+        CsvSourceInfo csvSourceNoField = new CsvSourceInfo("UTF-8", '|', '\\', 
new ArrayList<>());
+        CsvSinkInfo csvSinkNoField = new CsvSinkInfo("UTF-8", '|', '\\', new 
ArrayList<>());
+        config.setTransformSql("select *,$2,$1 from source where $2='nok'");
+        TransformProcessor<String, String> processor4 = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSourceNoField),
+                        SinkEncoderFactory.createCsvEncoder(csvSinkNoField));
+
+        List<String> output4 = processor4.transform("2024-04-28 00:00:00|nok", 
new HashMap<>());
+        Assert.assertEquals(1, output4.size());
+        Assert.assertEquals(output4.get(0), "2024-04-28 
00:00:00|nok|nok|2024-04-28 00:00:00");
+    }
+
+    @Test
+    public void testKv2Star() throws Exception {
+        List<FieldInfo> fields = this.getTestFieldList("ftime", "extinfo");
+        KvSourceInfo kvSource = new KvSourceInfo("UTF-8", fields);
+        KvSinkInfo kvSink = new KvSinkInfo("UTF-8", new ArrayList<>());
+        String transformSql = "select *";
+        TransformConfig config = new TransformConfig(transformSql);
+        // case1
+        TransformProcessor<String, String> processor1 = TransformProcessor
+                .create(config, SourceDecoderFactory.createKvDecoder(kvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        List<String> output1 = processor1.transform("ftime=2024-04-28 
00:00:00&extinfo=ok", new HashMap<>());
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output1.get(0), "ftime=2024-04-28 
00:00:00&extinfo=ok");
+        // case2
+        config.setTransformSql("select * from source where extinfo!='ok'");
+        TransformProcessor<String, String> processor2 = TransformProcessor
+                .create(config, SourceDecoderFactory.createKvDecoder(kvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        List<String> output2 = processor2.transform("ftime=2024-04-28 
00:00:00&extinfo=ok", new HashMap<>());
+        Assert.assertEquals(0, output2.size());
+        // case3
+        config.setTransformSql("select *,extinfo e1,ftime f1 from source where 
extinfo!='ok'");
+        TransformProcessor<String, String> processor3 = TransformProcessor
+                .create(config, SourceDecoderFactory.createKvDecoder(kvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+
+        List<String> output3 = processor3.transform("ftime=2024-04-28 
00:00:00&extinfo=nok", new HashMap<>());
+        Assert.assertEquals(1, output3.size());
+        Assert.assertEquals(output3.get(0), "ftime=2024-04-28 
00:00:00&extinfo=nok&e1=nok&f1=2024-04-28 00:00:00");
+    }
 }

Reply via email to