This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 714d7af5b6 [INLONG-10109][SDK] Support to transform from Json protocol 
to CSV/KV protocol by single SQL (#10110)
714d7af5b6 is described below

commit 714d7af5b619e7cd829bfd26c879baef58b54abd
Author: 卢春亮 <[email protected]>
AuthorDate: Mon May 6 16:18:23 2024 +0800

    [INLONG-10109][SDK] Support to transform from Json protocol to CSV/KV 
protocol by single SQL (#10110)
---
 .../inlong/sdk/transform/decode/CsvSourceData.java |   4 +-
 .../decode/{CsvSourceData.java => JsonNode.java}   |  59 ++++-----
 .../sdk/transform/decode/JsonSourceData.java       | 134 +++++++++++++++++++++
 .../sdk/transform/decode/JsonSourceDecoder.java    | 123 +++++++++++++++++++
 .../inlong/sdk/transform/decode/KvSourceData.java  |   4 +-
 .../sdk/transform/encode/CsvSinkEncoder.java       |   8 ++
 .../inlong/sdk/transform/encode/KvSinkEncoder.java |   8 ++
 .../inlong/sdk/transform/encode/SinkEncoder.java   |   6 +
 .../sdk/transform/process/TransformProcessor.java  |  32 +++--
 .../transform/process/operator/OperatorTools.java  |   3 +
 .../sdk/transform/process/parser/ColumnParser.java |   7 +-
 .../transform/process/TestTransformProcessor.java  |  81 ++++++++++++-
 12 files changed, 421 insertions(+), 48 deletions(-)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
index 77f173f1a6..d4492b4b85 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
@@ -51,10 +51,10 @@ public class CsvSourceData implements SourceData {
 
     @Override
     public String getField(int rowNum, String fieldName) {
-        if (rowNum > this.rows.size()) {
+        if (rowNum >= this.rows.size()) {
             return null;
         }
-        Map<String, String> targetRow = this.rows.get(rowNum - 1);
+        Map<String, String> targetRow = this.rows.get(rowNum);
         return targetRow.get(fieldName);
     }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonNode.java
similarity index 50%
copy from 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
copy to 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonNode.java
index 77f173f1a6..e60406162f 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonNode.java
@@ -17,44 +17,35 @@
 
 package org.apache.inlong.sdk.transform.decode;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import lombok.Data;
+import org.apache.commons.lang.math.NumberUtils;
+import org.apache.commons.lang3.StringUtils;
 
 /**
- * CsvSourceData
+ * JsonNode
  * 
  */
-public class CsvSourceData implements SourceData {
-
-    private List<Map<String, String>> rows = new ArrayList<>();
-
-    private Map<String, String> currentRow;
-
-    public CsvSourceData() {
-    }
-
-    public void putField(String fieldName, String fieldValue) {
-        this.currentRow.put(fieldName, fieldValue);
-    }
-
-    public void addRow() {
-        this.currentRow = new HashMap<>();
-        rows.add(currentRow);
-    }
-
-    @Override
-    public int getRowCount() {
-        return this.rows.size();
-    }
-
-    @Override
-    public String getField(int rowNum, String fieldName) {
-        if (rowNum > this.rows.size()) {
-            return null;
+@Data
+public class JsonNode {
+
+    private String name;
+    private boolean isArray = false;
+    private int arrayIndex = -1;
+
+    public JsonNode(String nodeString) {
+        int beginIndex = nodeString.indexOf('[');
+        if (beginIndex < 0) {
+            this.name = nodeString;
+        } else {
+            this.name = StringUtils.trim(nodeString.substring(0, beginIndex));
+            int endIndex = nodeString.lastIndexOf(']');
+            if (endIndex >= 0) {
+                this.isArray = true;
+                this.arrayIndex = 
NumberUtils.toInt(nodeString.substring(beginIndex + 1, endIndex), -1);
+                if (this.arrayIndex < 0) {
+                    this.arrayIndex = 0;
+                }
+            }
         }
-        Map<String, String> targetRow = this.rows.get(rowNum - 1);
-        return targetRow.get(fieldName);
     }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceData.java
new file mode 100644
index 0000000000..5f92c34965
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceData.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * JsonSourceData
+ * 
+ */
+public class JsonSourceData implements SourceData {
+
+    public static final String ROOT_KEY = "$root";
+
+    public static final String CHILD_KEY = "$child";
+
+    private JsonObject root;
+
+    private JsonArray childRoot;
+
+    /**
+     * Constructor
+     * @param root
+     * @param childRoot
+     */
+    public JsonSourceData(JsonObject root, JsonArray childRoot) {
+        this.root = root;
+        this.childRoot = childRoot;
+    }
+
+    /**
+     * getRowCount
+     * @return
+     */
+    @Override
+    public int getRowCount() {
+        if (this.childRoot == null) {
+            return 1;
+        } else {
+            return this.childRoot.size();
+        }
+    }
+
+    /**
+     * getField
+     * @param rowNum
+     * @param fieldName
+     * @return
+     */
+    @Override
+    public String getField(int rowNum, String fieldName) {
+        try {
+            List<JsonNode> childNodes = new ArrayList<>();
+            String[] nodeStrings = fieldName.split("\\.");
+            for (String nodeString : nodeStrings) {
+                childNodes.add(new JsonNode(nodeString));
+            }
+            // parse
+            if (childNodes.size() == 0) {
+                return "";
+            }
+            // first node
+            JsonNode firstNode = childNodes.get(0);
+            JsonElement current = root;
+            if (StringUtils.equals(ROOT_KEY, firstNode.getName())) {
+                current = root;
+            } else if (StringUtils.equals(CHILD_KEY, firstNode.getName())) {
+                if (rowNum < childRoot.size()) {
+                    current = childRoot.get(rowNum);
+                } else {
+                    return "";
+                }
+            } else {
+                // error data
+                return "";
+            }
+            if (current == null) {
+                // error data
+                return "";
+            }
+            // parse other node
+            for (int i = 1; i < childNodes.size(); i++) {
+                JsonNode node = childNodes.get(i);
+                if (!current.isJsonObject()) {
+                    // error data
+                    return "";
+                }
+                JsonElement newElement = 
current.getAsJsonObject().get(node.getName());
+                if (newElement == null) {
+                    // error data
+                    return "";
+                }
+                if (!node.isArray()) {
+                    current = newElement;
+                } else {
+                    if (!newElement.isJsonArray()) {
+                        // error data
+                        return "";
+                    }
+                    JsonArray newArray = newElement.getAsJsonArray();
+                    if (node.getArrayIndex() >= newArray.size()) {
+                        // error data
+                        return "";
+                    }
+                    current = newArray.get(node.getArrayIndex());
+                }
+            }
+            return current.getAsString();
+        } catch (Exception e) {
+            return "";
+        }
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
new file mode 100644
index 0000000000..57bf6a9982
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import org.apache.commons.lang3.StringUtils;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * JsonSourceDecoder
+ * 
+ */
+public class JsonSourceDecoder implements SourceDecoder {
+
+    protected JsonSourceInfo sourceInfo;
+    private Charset srcCharset = Charset.defaultCharset();
+    private String rowsNodePath;
+    private List<JsonNode> childNodes;
+
+    private Gson gson = new Gson();
+
+    /**
+     * Constructor
+     * @param sourceInfo
+     */
+    public JsonSourceDecoder(JsonSourceInfo sourceInfo) {
+        this.sourceInfo = sourceInfo;
+        if (!StringUtils.isBlank(sourceInfo.getCharset())) {
+            this.srcCharset = Charset.forName(sourceInfo.getCharset());
+        }
+        this.rowsNodePath = sourceInfo.getRowsNodePath();
+        if (!StringUtils.isBlank(rowsNodePath)) {
+            this.childNodes = new ArrayList<>();
+            String[] nodeStrings = this.rowsNodePath.split("\\.");
+            for (String nodeString : nodeStrings) {
+                this.childNodes.add(new JsonNode(nodeString));
+            }
+        }
+    }
+
+    /**
+     * decode
+     * @param srcBytes
+     * @param extParams
+     * @return
+     */
+    @Override
+    public SourceData decode(byte[] srcBytes, Map<String, Object> extParams) {
+        String srcString = new String(srcBytes, srcCharset);
+        return this.decode(srcString, extParams);
+    }
+
+    /**
+     * decode
+     * @param srcString
+     * @param extParams
+     * @return
+     */
+    @Override
+    public SourceData decode(String srcString, Map<String, Object> extParams) {
+        JsonObject root = gson.fromJson(srcString, JsonObject.class);
+        JsonArray childRoot = null;
+        if (this.childNodes != null && this.childNodes.size() > 0) {
+            JsonElement current = root;
+            for (JsonNode node : childNodes) {
+                if (!current.isJsonObject()) {
+                    // error data
+                    return new JsonSourceData(root, childRoot);
+                }
+                JsonElement newElement = 
current.getAsJsonObject().get(node.getName());
+                if (newElement == null) {
+                    // error data
+                    return new JsonSourceData(root, childRoot);
+                }
+                if (!node.isArray()) {
+                    current = newElement;
+                } else {
+                    if (!newElement.isJsonArray()) {
+                        // error data
+                        return new JsonSourceData(root, childRoot);
+                    }
+                    JsonArray newArray = newElement.getAsJsonArray();
+                    if (node.getArrayIndex() >= newArray.size()) {
+                        // error data
+                        return new JsonSourceData(root, childRoot);
+                    }
+                    current = newArray.get(node.getArrayIndex());
+                }
+            }
+            if (!current.isJsonArray()) {
+                // error data
+                return new JsonSourceData(root, childRoot);
+            }
+            childRoot = current.getAsJsonArray();
+        }
+        SourceData sourceData = new JsonSourceData(root, childRoot);
+        return sourceData;
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceData.java
index 3e3f600197..e260ae15f4 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceData.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceData.java
@@ -50,10 +50,10 @@ public class KvSourceData implements SourceData {
 
     @Override
     public String getField(int rowNum, String fieldName) {
-        if (rowNum > this.rows.size()) {
+        if (rowNum >= this.rows.size()) {
             return null;
         }
-        Map<String, String> targetRow = this.rows.get(rowNum - 1);
+        Map<String, String> targetRow = this.rows.get(rowNum);
         return targetRow.get(fieldName);
     }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
index cb3c9405a0..a991d137b9 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
@@ -74,4 +74,12 @@ public class CsvSinkEncoder implements SinkEncoder {
         }
         return builder.substring(0, builder.length() - 1);
     }
+
+    /**
+     * get fields
+     * @return the fields
+     */
+    public List<FieldInfo> getFields() {
+        return fields;
+    }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
index 3fcc31107e..56f35ec319 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
@@ -61,4 +61,12 @@ public class KvSinkEncoder implements SinkEncoder {
         }
         return builder.substring(0, builder.length() - 1);
     }
+
+    /**
+     * get fields
+     * @return the fields
+     */
+    public List<FieldInfo> getFields() {
+        return fields;
+    }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
index ab83d21a0b..3839da0160 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
@@ -17,10 +17,16 @@
 
 package org.apache.inlong.sdk.transform.encode;
 
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+
+import java.util.List;
+
 /**
  * SinkEncoder
  */
 public interface SinkEncoder {
 
     String encode(SinkData sinkData);
+
+    List<FieldInfo> getFields();
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
index c08a6b3bc5..ad4c4b4304 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.transform.process;
 
 import org.apache.inlong.sdk.transform.decode.CsvSourceDecoder;
+import org.apache.inlong.sdk.transform.decode.JsonSourceDecoder;
 import org.apache.inlong.sdk.transform.decode.KvSourceDecoder;
 import org.apache.inlong.sdk.transform.decode.SourceData;
 import org.apache.inlong.sdk.transform.decode.SourceDecoder;
@@ -28,6 +29,8 @@ import org.apache.inlong.sdk.transform.encode.SinkData;
 import org.apache.inlong.sdk.transform.encode.SinkEncoder;
 import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
 import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo;
 import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
 import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
 import org.apache.inlong.sdk.transform.pojo.SinkInfo;
@@ -103,6 +106,8 @@ public class TransformProcessor {
             this.decoder = new CsvSourceDecoder((CsvSourceInfo) sourceInfo);
         } else if (sourceInfo instanceof KvSourceInfo) {
             this.decoder = new KvSourceDecoder((KvSourceInfo) sourceInfo);
+        } else if (sourceInfo instanceof JsonSourceInfo) {
+            this.decoder = new JsonSourceDecoder((JsonSourceInfo) sourceInfo);
         }
     }
 
@@ -122,24 +127,35 @@ public class TransformProcessor {
         this.where = 
OperatorTools.buildOperator(this.transformSelect.getWhere());
         List<SelectItem> items = this.transformSelect.getSelectItems();
         this.selectItemMap = new HashMap<>(items.size());
-        for (SelectItem item : items) {
+        List<FieldInfo> fields = this.encoder.getFields();
+        for (int i = 0; i < items.size(); i++) {
+            SelectItem item = items.get(i);
+            String fieldName = null;
+            if (i < fields.size()) {
+                fieldName = fields.get(i).getName();
+            }
             if (item instanceof SelectExpressionItem) {
                 SelectExpressionItem exprItem = (SelectExpressionItem) item;
-                if (exprItem.getAlias() == null) {
-                    this.selectItemMap.put(exprItem.toString(),
-                            
OperatorTools.buildParser(exprItem.getExpression()));
-                } else {
-                    this.selectItemMap.put(exprItem.getAlias().getName(),
-                            
OperatorTools.buildParser(exprItem.getExpression()));
+                if (fieldName == null) {
+                    if (exprItem.getAlias() == null) {
+                        fieldName = exprItem.toString();
+                    } else {
+                        fieldName = exprItem.getAlias().getName();
+                    }
                 }
+                this.selectItemMap.put(fieldName,
+                        OperatorTools.buildParser(exprItem.getExpression()));
             }
         }
     }
 
     public List<String> transform(byte[] srcBytes, Map<String, Object> 
extParams) {
         SourceData sourceData = this.decoder.decode(srcBytes, extParams);
+        if (sourceData == null) {
+            return null;
+        }
         List<String> sinkDatas = new ArrayList<>(sourceData.getRowCount());
-        for (int i = 1; i <= sourceData.getRowCount(); i++) {
+        for (int i = 0; i < sourceData.getRowCount(); i++) {
             if (this.where != null && !this.where.check(sourceData, i)) {
                 continue;
             }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
index c0e059f266..8afe2f0c74 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
@@ -23,6 +23,7 @@ import 
org.apache.inlong.sdk.transform.process.parser.StringParser;
 import org.apache.inlong.sdk.transform.process.parser.ValueParser;
 
 import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
 import net.sf.jsqlparser.expression.LongValue;
 import net.sf.jsqlparser.expression.NotExpression;
 import net.sf.jsqlparser.expression.Parenthesis;
@@ -75,6 +76,8 @@ public class OperatorTools {
             return new StringParser((StringValue) expr);
         } else if (expr instanceof LongValue) {
             return new LongParser((LongValue) expr);
+        } else if (expr instanceof Function) {
+            return new ColumnParser((Function) expr);
         }
         return null;
     }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
index b6dd579d2a..1216cfff74 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sdk.transform.process.parser;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
 
+import net.sf.jsqlparser.expression.Function;
 import net.sf.jsqlparser.schema.Column;
 
 /**
@@ -30,7 +31,11 @@ public class ColumnParser implements ValueParser {
     private String fieldName;
 
     public ColumnParser(Column expr) {
-        this.fieldName = expr.getColumnName();
+        this.fieldName = expr.toString();
+    }
+
+    public ColumnParser(Function expr) {
+        this.fieldName = expr.toString().replace('(', '[').replace(')', ']');
     }
 
     /**
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
index 9c75456df6..498fad7f79 100644
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sdk.transform.process;
 import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
 import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
 import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo;
 import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
 import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
 import org.apache.inlong.sdk.transform.pojo.SinkInfo;
@@ -69,7 +70,7 @@ public class TestTransformProcessor {
     }
 
     @Test
-    public void testKvCsv() {
+    public void testKv2Csv() {
         try {
             List<FieldInfo> fields = new ArrayList<>();
             FieldInfo ftime = new FieldInfo();
@@ -97,6 +98,84 @@ public class TestTransformProcessor {
         }
     }
 
+    @Test
+    public void testJson2Csv() {
+        try {
+            List<FieldInfo> fields = new ArrayList<>();
+            FieldInfo sid = new FieldInfo();
+            sid.setName("sid");
+            fields.add(sid);
+            FieldInfo packageID = new FieldInfo();
+            packageID.setName("packageID");
+            fields.add(packageID);
+            FieldInfo msgTime = new FieldInfo();
+            msgTime.setName("msgTime");
+            fields.add(msgTime);
+            FieldInfo msg = new FieldInfo();
+            msg.setName("msg");
+            fields.add(msg);
+            SourceInfo jsonSource = new JsonSourceInfo("UTF-8", "msgs");
+            SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+            String transformSql = "select 
$root.sid,$root.packageID,$child.msgTime,$child.msg from source";
+            TransformConfig config = new TransformConfig(jsonSource, csvSink, 
transformSql);
+            // case1
+            TransformProcessor processor = new TransformProcessor(config);
+            String srcString = "{\n"
+                    + "  \"sid\":\"value1\",\n"
+                    + "  \"packageID\":\"value2\",\n"
+                    + "  \"msgs\":[\n"
+                    + "  {\"msg\":\"value4\",\"msgTime\":1713243918000},\n"
+                    + "  {\"msg\":\"v4\",\"msgTime\":1713243918000}\n"
+                    + "  ]\n"
+                    + "}";
+            List<String> output = processor.transform(srcString, new 
HashMap<>());
+            Assert.assertTrue(output.size() == 2);
+            Assert.assertEquals(output.get(0), 
"value1|value2|1713243918000|value4");
+            Assert.assertEquals(output.get(1), 
"value1|value2|1713243918000|v4");
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    public void testJson2CsvForOne() {
+        try {
+            List<FieldInfo> fields = new ArrayList<>();
+            FieldInfo sid = new FieldInfo();
+            sid.setName("sid");
+            fields.add(sid);
+            FieldInfo packageID = new FieldInfo();
+            packageID.setName("packageID");
+            fields.add(packageID);
+            FieldInfo msgTime = new FieldInfo();
+            msgTime.setName("msgTime");
+            fields.add(msgTime);
+            FieldInfo msg = new FieldInfo();
+            msg.setName("msg");
+            fields.add(msg);
+            SourceInfo jsonSource = new JsonSourceInfo("UTF-8", "");
+            SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+            String transformSql =
+                    "select 
$root.sid,$root.packageID,$root.msgs(1).msgTime,$root.msgs(0).msg from source";
+            TransformConfig config = new TransformConfig(jsonSource, csvSink, 
transformSql);
+            // case1
+            TransformProcessor processor = new TransformProcessor(config);
+            String srcString = "{\n"
+                    + "  \"sid\":\"value1\",\n"
+                    + "  \"packageID\":\"value2\",\n"
+                    + "  \"msgs\":[\n"
+                    + "  {\"msg\":\"value4\",\"msgTime\":1713243918000},\n"
+                    + "  {\"msg\":\"v4\",\"msgTime\":1713243918000}\n"
+                    + "  ]\n"
+                    + "}";
+            List<String> output = processor.transform(srcString, new 
HashMap<>());
+            Assert.assertTrue(output.size() == 1);
+            Assert.assertEquals(output.get(0), 
"value1|value2|1713243918000|value4");
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
     @Test
     public void testKvCsvByJsonConfig() {
         try {

Reply via email to