This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4517d378ec [INLONG-11873][SDK] Support parsing field values from 
extended parameters (#11874)
4517d378ec is described below

commit 4517d378ece1f962f4b03d6b546c5dc9da6a2e80
Author: ChunLiang Lu <[email protected]>
AuthorDate: Tue Jun 3 12:54:22 2025 +0800

    [INLONG-11873][SDK] Support parsing field values from extended parameters 
(#11874)
---
 ...BsonSourceData.java => AbstractSourceData.java} | 28 +++++++++++++++++-----
 .../sdk/transform/decode/AvroSourceData.java       | 10 ++++++--
 .../sdk/transform/decode/AvroSourceDecoder.java    | 12 +++++-----
 .../sdk/transform/decode/BsonSourceData.java       |  6 +++--
 .../inlong/sdk/transform/decode/CsvSourceData.java | 10 ++++++--
 .../sdk/transform/decode/CsvSourceDecoder.java     |  2 +-
 .../sdk/transform/decode/JsonSourceData.java       | 12 ++++++++--
 .../sdk/transform/decode/JsonSourceDecoder.java    | 12 +++++-----
 .../inlong/sdk/transform/decode/KvSourceData.java  | 10 ++++++--
 .../sdk/transform/decode/KvSourceDecoder.java      |  2 +-
 .../sdk/transform/decode/ParquetSourceData.java    | 10 ++++++--
 .../sdk/transform/decode/ParquetSourceDecoder.java |  2 +-
 .../inlong/sdk/transform/decode/PbSourceData.java  | 10 ++++++--
 .../sdk/transform/decode/PbSourceDecoder.java      |  2 +-
 .../sdk/transform/decode/RowDataSourceData.java    |  9 +++++--
 .../sdk/transform/decode/RowDataSourceDecoder.java |  2 +-
 .../inlong/sdk/transform/decode/XmlSourceData.java | 10 ++++++--
 .../sdk/transform/decode/XmlSourceDecoder.java     |  2 +-
 .../sdk/transform/decode/YamlSourceData.java       |  2 +-
 .../TestCsv2CsvForErrorOrderProcessor.java         | 10 +++++---
 .../process/processor/TestCsv2KvProcessor.java     | 11 +++++----
 .../process/processor/TestJson2CsvProcessor.java   | 13 ++++++----
 .../process/processor/TestKv2CsvProcessor.java     | 11 +++++----
 23 files changed, 139 insertions(+), 59 deletions(-)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AbstractSourceData.java
similarity index 54%
copy from 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceData.java
copy to 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AbstractSourceData.java
index 97100b0240..c7e40282ae 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceData.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AbstractSourceData.java
@@ -17,15 +17,31 @@
 
 package org.apache.inlong.sdk.transform.decode;
 
-import com.google.gson.JsonArray;
-import com.google.gson.JsonObject;
+import org.apache.inlong.sdk.transform.process.Context;
 
 /**
- * BsonSourceData
+ * AbstractSourceData
+ * 
  */
-public class BsonSourceData extends JsonSourceData {
+public abstract class AbstractSourceData implements SourceData {
 
-    public BsonSourceData(JsonObject root, JsonArray childRoot) {
-        super(root, childRoot);
+    public static final String CTX_KEY = "$ctx.";
+
+    protected Context context;
+
+    protected boolean isContextField(String fieldName) {
+        return fieldName.startsWith(CTX_KEY);
+    }
+
+    protected String getContextField(String fieldName) {
+        if (context == null) {
+            return "";
+        }
+        if (!isContextField(fieldName)) {
+            return null;
+        }
+        String realFieldName = fieldName.substring(CTX_KEY.length());
+        String fieldValue = this.context.getStringOrDefault(realFieldName, "");
+        return fieldValue;
     }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceData.java
index 42705433f4..3b603c1c5c 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceData.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceData.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sdk.transform.decode;
 
+import org.apache.inlong.sdk.transform.process.Context;
+
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericRecord;
@@ -29,7 +31,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-public class AvroSourceData implements SourceData {
+public class AvroSourceData extends AbstractSourceData {
 
     public static final String ROOT_KEY = "$root";
 
@@ -41,10 +43,11 @@ public class AvroSourceData implements SourceData {
 
     private Charset srcCharset;
 
-    public AvroSourceData(GenericRecord root, List<GenericRecord> childRoot, 
Charset srcCharset) {
+    public AvroSourceData(GenericRecord root, List<GenericRecord> childRoot, 
Charset srcCharset, Context context) {
         this.root = root;
         this.childRoot = childRoot;
         this.srcCharset = srcCharset;
+        this.context = context;
     }
 
     @Override
@@ -59,6 +62,9 @@ public class AvroSourceData implements SourceData {
     @Override
     public String getField(int rowNum, String fieldName) {
         try {
+            if (isContextField(fieldName)) {
+                return getContextField(fieldName);
+            }
             List<AvroNode> childNodes = new ArrayList<>();
             String[] nodeStrings = fieldName.split("\\.");
             for (String nodeString : nodeStrings) {
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceDecoder.java
index 992dea88c6..360a080f76 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceDecoder.java
@@ -74,7 +74,7 @@ public class AvroSourceDecoder extends SourceDecoder<byte[]> {
             GenericRecord root = dataFileStream.next();
             List<GenericRecord> childRoot = null;
             if (CollectionUtils.isEmpty(childNodes)) {
-                return new AvroSourceData(root, null, srcCharset);
+                return new AvroSourceData(root, null, srcCharset, context);
             }
 
             Object current = root;
@@ -83,12 +83,12 @@ public class AvroSourceDecoder extends 
SourceDecoder<byte[]> {
             for (AvroNode node : childNodes) {
                 if (curSchema.getType() != Type.RECORD) {
                     // error data
-                    return new AvroSourceData(root, null, srcCharset);
+                    return new AvroSourceData(root, null, srcCharset, context);
                 }
                 Object newElement = ((GenericRecord) 
current).get(node.getName());
                 if (newElement == null) {
                     // error data
-                    return new AvroSourceData(root, null, srcCharset);
+                    return new AvroSourceData(root, null, srcCharset, context);
                 }
                 // node is not array
                 if (!node.isArray()) {
@@ -100,15 +100,15 @@ public class AvroSourceDecoder extends 
SourceDecoder<byte[]> {
                 current = getElementFromArray(node, newElement, curSchema);
                 if (current == null) {
                     // error data
-                    return new AvroSourceData(root, null, srcCharset);
+                    return new AvroSourceData(root, null, srcCharset, context);
                 }
             }
             if (curSchema.getType() != Type.ARRAY) {
                 // error data
-                return new AvroSourceData(root, null, srcCharset);
+                return new AvroSourceData(root, null, srcCharset, context);
             }
             childRoot = (List<GenericRecord>) current;
-            return new AvroSourceData(root, childRoot, srcCharset);
+            return new AvroSourceData(root, childRoot, srcCharset, context);
         } catch (Exception e) {
             LOG.error(e.getMessage(), e);
             return null;
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceData.java
index 97100b0240..7b6d31f066 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceData.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceData.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sdk.transform.decode;
 
+import org.apache.inlong.sdk.transform.process.Context;
+
 import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
 
@@ -25,7 +27,7 @@ import com.google.gson.JsonObject;
  */
 public class BsonSourceData extends JsonSourceData {
 
-    public BsonSourceData(JsonObject root, JsonArray childRoot) {
-        super(root, childRoot);
+    public BsonSourceData(JsonObject root, JsonArray childRoot, Context 
context) {
+        super(root, childRoot, context);
     }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
index e0bd9f794c..76442c7bc2 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sdk.transform.decode;
 
+import org.apache.inlong.sdk.transform.process.Context;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -26,13 +28,14 @@ import java.util.Map;
  * CsvSourceData
  * 
  */
-public class CsvSourceData implements SourceData {
+public class CsvSourceData extends AbstractSourceData {
 
     private List<Map<String, Object>> rows = new ArrayList<>();
 
     private Map<String, Object> currentRow;
 
-    public CsvSourceData() {
+    public CsvSourceData(Context context) {
+        this.context = context;
     }
 
     public void putField(String fieldName, Object fieldValue) {
@@ -54,6 +57,9 @@ public class CsvSourceData implements SourceData {
         if (rowNum >= this.rows.size()) {
             return null;
         }
+        if (isContextField(fieldName)) {
+            return getContextField(fieldName);
+        }
         Map<String, Object> targetRow = this.rows.get(rowNum);
         return targetRow.get(fieldName);
     }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
index 830d5c2c01..4eb13de398 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
@@ -60,7 +60,7 @@ public class CsvSourceDecoder extends SourceDecoder<String> {
     @Override
     public SourceData decode(String srcString, Context context) {
         String[][] rowValues = SplitUtils.splitCsv(srcString, delimiter, 
escapeChar, '\"', '\n', true);
-        CsvSourceData sourceData = new CsvSourceData();
+        CsvSourceData sourceData = new CsvSourceData(context);
         for (int i = 0; i < rowValues.length; i++) {
             String[] fieldValues = rowValues[i];
             sourceData.addRow();
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceData.java
index ea9d296a97..dba0608e4b 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceData.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceData.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sdk.transform.decode;
 
+import org.apache.inlong.sdk.transform.process.Context;
+
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
@@ -29,7 +31,7 @@ import java.util.List;
  * JsonSourceData
  * 
  */
-public class JsonSourceData implements SourceData {
+public class JsonSourceData extends AbstractSourceData {
 
     public static final String ROOT_KEY = "$root";
 
@@ -43,10 +45,12 @@ public class JsonSourceData implements SourceData {
      * Constructor
      * @param root
      * @param childRoot
+     * @param context
      */
-    public JsonSourceData(JsonObject root, JsonArray childRoot) {
+    public JsonSourceData(JsonObject root, JsonArray childRoot, Context 
context) {
         this.root = root;
         this.childRoot = childRoot;
+        this.context = context;
     }
 
     /**
@@ -71,6 +75,10 @@ public class JsonSourceData implements SourceData {
     @Override
     public String getField(int rowNum, String fieldName) {
         try {
+            if (isContextField(fieldName)) {
+                return getContextField(fieldName);
+            }
+            // split field name
             List<JsonNode> childNodes = new ArrayList<>();
             String[] nodeStrings = fieldName.split("\\.");
             for (String nodeString : nodeStrings) {
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
index 0332094090..38db4ad03d 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
@@ -86,18 +86,18 @@ public class JsonSourceDecoder extends 
SourceDecoder<String> {
         JsonObject root = gson.fromJson(srcString, JsonObject.class);
         JsonArray childRoot = null;
         if (CollectionUtils.isEmpty(childNodes)) {
-            return new JsonSourceData(root, null);
+            return new JsonSourceData(root, null, context);
         }
         JsonElement current = root;
         for (JsonNode node : childNodes) {
             if (!current.isJsonObject()) {
                 // error data
-                return new JsonSourceData(root, null);
+                return new JsonSourceData(root, null, context);
             }
             JsonElement newElement = 
current.getAsJsonObject().get(node.getName());
             if (newElement == null) {
                 // error data
-                return new JsonSourceData(root, null);
+                return new JsonSourceData(root, null, context);
             }
             // node is not array
             if (!node.isArray()) {
@@ -108,15 +108,15 @@ public class JsonSourceDecoder extends 
SourceDecoder<String> {
             current = getElementFromArray(node, newElement);
             if (current == null) {
                 // error data
-                return new JsonSourceData(root, null);
+                return new JsonSourceData(root, null, context);
             }
         }
         if (!current.isJsonArray()) {
             // error data
-            return new JsonSourceData(root, null);
+            return new JsonSourceData(root, null, context);
         }
         childRoot = current.getAsJsonArray();
-        return new JsonSourceData(root, childRoot);
+        return new JsonSourceData(root, childRoot, context);
     }
 
     private JsonElement getElementFromArray(JsonNode node, JsonElement 
curElement) {
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceData.java
index e260ae15f4..2349b94e8a 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceData.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceData.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sdk.transform.decode;
 
+import org.apache.inlong.sdk.transform.process.Context;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -25,13 +27,14 @@ import java.util.Map;
 /**
  * KvSourceData
  */
-public class KvSourceData implements SourceData {
+public class KvSourceData extends AbstractSourceData {
 
     private List<Map<String, String>> rows = new ArrayList<>();
 
     private Map<String, String> currentRow;
 
-    public KvSourceData() {
+    public KvSourceData(Context context) {
+        this.context = context;
     }
 
     public void putField(String fieldName, String fieldValue) {
@@ -53,6 +56,9 @@ public class KvSourceData implements SourceData {
         if (rowNum >= this.rows.size()) {
             return null;
         }
+        if (isContextField(fieldName)) {
+            return getContextField(fieldName);
+        }
         Map<String, String> targetRow = this.rows.get(rowNum);
         return targetRow.get(fieldName);
     }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
index e62fcccc37..049d7fb610 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
@@ -75,7 +75,7 @@ public class KvSourceDecoder extends SourceDecoder<String> {
     public SourceData decode(String srcString, Context context) {
         List<Map<String, String>> rowValues = KvUtils.splitKv(srcString, 
entryDelimiter, kvDelimiter,
                 escapeChar, quoteChar, lineDelimiter);
-        KvSourceData sourceData = new KvSourceData();
+        KvSourceData sourceData = new KvSourceData(context);
         if (CollectionUtils.isEmpty(fields)) {
             for (Map<String, String> row : rowValues) {
                 sourceData.addRow();
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceData.java
index ab294d867b..7ff6718bae 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceData.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceData.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sdk.transform.decode;
 
+import org.apache.inlong.sdk.transform.process.Context;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.parquet.example.data.Group;
 import org.apache.parquet.schema.GroupType;
@@ -29,7 +31,7 @@ import java.nio.charset.Charset;
 /**
  * ParquetSourceData
  */
-public class ParquetSourceData implements SourceData {
+public class ParquetSourceData extends AbstractSourceData {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ParquetSourceData.class);
 
@@ -44,7 +46,7 @@ public class ParquetSourceData implements SourceData {
     private Type childType;
     private int rowCount = -1;
 
-    public ParquetSourceData(Group root, String childPath, Charset srcCharset) 
{
+    public ParquetSourceData(Group root, String childPath, Charset srcCharset, 
Context context) {
         this.rootGroup = root;
         String pathStr = "";
         if (!StringUtils.isEmpty(childPath)) {
@@ -60,6 +62,7 @@ public class ParquetSourceData implements SourceData {
             }
         }
         this.srcCharset = srcCharset;
+        this.context = context;
     }
     @Override
     public int getRowCount() {
@@ -74,6 +77,9 @@ public class ParquetSourceData implements SourceData {
     public String getField(int rowNum, String fieldName) {
         String fieldValue = "";
         try {
+            if (isContextField(fieldName)) {
+                return getContextField(fieldName);
+            }
             if (StringUtils.startsWith(fieldName, ROOT_KEY)) {
                 // Dealing with multi-level paths
                 fieldName = fieldName.substring(ROOT_KEY.length());
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceDecoder.java
index 85e3ba319d..6f58ee3070 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceDecoder.java
@@ -92,7 +92,7 @@ public class ParquetSourceDecoder extends 
SourceDecoder<byte[]> {
                     for (int i = 0; i < rows; i++) {
                         Group group = recordReader.read();
                         if (group != null) {
-                            return new ParquetSourceData(group, 
this.childMessagePath, this.srcCharset);
+                            return new ParquetSourceData(group, 
this.childMessagePath, this.srcCharset, context);
                         }
                     }
                 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
index 147eaa67d1..d4bea9906e 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sdk.transform.decode;
 
+import org.apache.inlong.sdk.transform.process.Context;
+
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.DynamicMessage;
@@ -34,7 +36,7 @@ import java.util.concurrent.ConcurrentHashMap;
  * JsonSourceData
  * 
  */
-public class PbSourceData implements SourceData {
+public class PbSourceData extends AbstractSourceData {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PbSourceData.class);
 
@@ -60,13 +62,14 @@ public class PbSourceData implements SourceData {
     public PbSourceData(DynamicMessage root, List<DynamicMessage> childRoot,
             Descriptors.Descriptor rootDesc, Descriptors.Descriptor childDesc,
             Map<String, List<PbNode>> columnNodeMap,
-            Charset srcCharset) {
+            Charset srcCharset, Context context) {
         this.root = root;
         this.childRoot = childRoot;
         this.rootDesc = rootDesc;
         this.childDesc = childDesc;
         this.columnNodeMap = columnNodeMap;
         this.srcCharset = srcCharset;
+        this.context = context;
     }
 
     /**
@@ -105,6 +108,9 @@ public class PbSourceData implements SourceData {
     public String getField(int rowNum, String fieldName) {
         String fieldValue = "";
         try {
+            if (isContextField(fieldName)) {
+                return getContextField(fieldName);
+            }
             if (StringUtils.startsWith(fieldName, ROOT_KEY)) {
                 fieldValue = this.getRootField(fieldName);
             } else if (StringUtils.startsWith(fieldName, CHILD_KEY)) {
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
index 4ee704bd0e..021c625fd4 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
@@ -145,7 +145,7 @@ public class PbSourceDecoder extends SourceDecoder<byte[]> {
                     }
                 }
             }
-            return new PbSourceData(root, childRoot, rootDesc, childDesc, 
columnNodeMap, srcCharset);
+            return new PbSourceData(root, childRoot, rootDesc, childDesc, 
columnNodeMap, srcCharset, context);
         } catch (Exception e) {
             LOG.error(e.getMessage(), e);
             return null;
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceData.java
index fbfd423497..8d7d017e3b 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceData.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceData.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sdk.transform.decode;
 
+import org.apache.inlong.sdk.transform.process.Context;
 import org.apache.inlong.sdk.transform.utils.RowToFieldDataUtils;
 
 import lombok.extern.slf4j.Slf4j;
@@ -25,7 +26,7 @@ import org.apache.flink.table.data.RowData;
 import java.util.Map;
 
 @Slf4j
-public class RowDataSourceData implements SourceData {
+public class RowDataSourceData extends AbstractSourceData {
 
     private final RowData rowData;
     private final Map<String, Integer> fieldPositionMap;
@@ -34,10 +35,11 @@ public class RowDataSourceData implements SourceData {
     public RowDataSourceData(
             RowData rowData,
             Map<String, Integer> fieldPositionMap,
-            RowToFieldDataUtils.RowFieldConverter[] converters) {
+            RowToFieldDataUtils.RowFieldConverter[] converters, Context 
context) {
         this.rowData = rowData;
         this.fieldPositionMap = fieldPositionMap;
         this.converters = converters;
+        this.context = context;
     }
 
     @Override
@@ -51,6 +53,9 @@ public class RowDataSourceData implements SourceData {
             return null;
         }
         try {
+            if (isContextField(fieldName)) {
+                return getContextField(fieldName);
+            }
             int fieldPosition = fieldPositionMap.get(fieldName);
             return converters[fieldPosition].convert(rowData, fieldPosition);
         } catch (Throwable e) {
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java
index a82fc2ac6b..3565b292a8 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java
@@ -61,7 +61,7 @@ public class RowDataSourceDecoder extends 
SourceDecoder<RowData> {
 
     @Override
     public SourceData decode(RowData rowData, Context context) {
-        return new RowDataSourceData(rowData, fieldPositionMap, 
rowFieldConverters);
+        return new RowDataSourceData(rowData, fieldPositionMap, 
rowFieldConverters, context);
     }
 
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/XmlSourceData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/XmlSourceData.java
index 1e23c2b5d7..7e7fad3b84 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/XmlSourceData.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/XmlSourceData.java
@@ -17,13 +17,15 @@
 
 package org.apache.inlong.sdk.transform.decode;
 
+import org.apache.inlong.sdk.transform.process.Context;
+
 import java.util.List;
 import java.util.Map;
 
 /**
  * XmlSourceData
  */
-public class XmlSourceData implements SourceData {
+public class XmlSourceData extends AbstractSourceData {
 
     public static final String ROOT_KEY = "$root";
 
@@ -33,7 +35,7 @@ public class XmlSourceData implements SourceData {
 
     private XmlNode childRoot;
 
-    public XmlSourceData(XmlNode root, XmlNode childRoot) {
+    public XmlSourceData(XmlNode root, XmlNode childRoot, Context context) {
         this.root = root;
         this.childRoot = new XmlNode();
         if (childRoot != null) {
@@ -42,6 +44,7 @@ public class XmlSourceData implements SourceData {
                 this.childRoot = (XmlNode) value;
             }
         }
+        this.context = context;
     }
 
     @Override
@@ -61,6 +64,9 @@ public class XmlSourceData implements SourceData {
     @Override
     public String getField(int rowNum, String fieldName) {
         try {
+            if (isContextField(fieldName)) {
+                return getContextField(fieldName);
+            }
             String[] nodeString = fieldName.split("\\.");
             Object cur = null, last = null;
             int start = -1;
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/XmlSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/XmlSourceDecoder.java
index a9a6bb9a66..625f31aa93 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/XmlSourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/XmlSourceDecoder.java
@@ -83,7 +83,7 @@ public class XmlSourceDecoder extends SourceDecoder<String> {
                     cur = child.getValue();
                 }
             }
-            return new XmlSourceData(rootObj, child);
+            return new XmlSourceData(rootObj, child, context);
         } catch (Exception e) {
             log.error("Data parsing failed", e);
             return null;
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceData.java
index a9d3162fd7..fb54d63efa 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceData.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceData.java
@@ -20,7 +20,7 @@ package org.apache.inlong.sdk.transform.decode;
 import java.util.List;
 import java.util.Map;
 
-public class YamlSourceData implements SourceData {
+public class YamlSourceData extends AbstractSourceData {
 
     public static final String ROOT_KEY = "$root";
 
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2CsvForErrorOrderProcessor.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2CsvForErrorOrderProcessor.java
index d03d112776..300b5e3c9c 100644
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2CsvForErrorOrderProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2CsvForErrorOrderProcessor.java
@@ -33,6 +33,7 @@ import org.junit.Test;
 
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 public class TestCsv2CsvForErrorOrderProcessor extends 
AbstractProcessorTestBase {
 
@@ -42,16 +43,19 @@ public class TestCsv2CsvForErrorOrderProcessor extends 
AbstractProcessorTestBase
         CsvSourceInfo csvSource = new CsvSourceInfo("UTF-8", '|', '\\', 
sourceFields);
         List<FieldInfo> sinkFields = this.getTestFieldList("field1", "field2", 
"field3");
         CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', sinkFields);
-        String transformSql = "select ftime as field2,data as field3,extinfo 
as field4 from source where extinfo='ok'";
+        String transformSql =
+                "select ftime as field2,data as field3,extinfo as 
field4,$ctx.partition as field1 from source where extinfo='ok'";
         TransformConfig config = new TransformConfig(transformSql, false);
         // case1
         TransformProcessor<String, String> processor1 = TransformProcessor
                 .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
                         SinkEncoderFactory.createCsvEncoder(csvSink));
 
-        List<String> output1 = processor1.transform("2024-04-28 
00:00:00|ok|data1", new HashMap<>());
+        Map<String, Object> extParams = new HashMap<>();
+        extParams.put("partition", "2024042801");
+        List<String> output1 = processor1.transform("2024-04-28 
00:00:00|ok|data1", extParams);
         Assert.assertEquals(1, output1.size());
-        Assert.assertEquals("|2024-04-28 00:00:00|data1", output1.get(0));
+        Assert.assertEquals("2024042801|2024-04-28 00:00:00|data1", 
output1.get(0));
     }
 
     @Test
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2KvProcessor.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2KvProcessor.java
index 6b19acb11c..2b57409714 100644
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2KvProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2KvProcessor.java
@@ -30,24 +30,27 @@ import org.junit.Test;
 
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 public class TestCsv2KvProcessor extends AbstractProcessorTestBase {
 
     @Test
     public void testCsv2Kv() throws Exception {
-        List<FieldInfo> fields = this.getTestFieldList("ftime", "extinfo");
+        List<FieldInfo> fields = this.getTestFieldList("ftime", "extinfo", 
"ds");
         CsvSourceInfo csvSource = new CsvSourceInfo("UTF-8", '|', '\\', 
fields);
         KvSinkInfo kvSink = new KvSinkInfo("UTF-8", fields);
-        String transformSql = "select ftime,extinfo from source where 
extinfo='ok'";
+        String transformSql = "select ftime,extinfo,$ctx.partition from source 
where extinfo='ok'";
         TransformConfig config = new TransformConfig(transformSql);
         // case1
         TransformProcessor<String, String> processor1 = TransformProcessor
                 .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
                         SinkEncoderFactory.createKvEncoder(kvSink));
 
-        List<String> output1 = processor1.transform("2024-04-28 00:00:00|ok", 
new HashMap<>());
+        Map<String, Object> extParams = new HashMap<>();
+        extParams.put("partition", "2024042801");
+        List<String> output1 = processor1.transform("2024-04-28 00:00:00|ok", 
extParams);
         Assert.assertEquals(1, output1.size());
-        Assert.assertEquals(output1.get(0), "ftime=2024-04-28 
00:00:00&extinfo=ok");
+        Assert.assertEquals(output1.get(0), "ftime=2024-04-28 
00:00:00&extinfo=ok&ds=2024042801");
         // case2
         config.setTransformSql("select ftime,extinfo from source where 
extinfo!='ok'");
         TransformProcessor<String, String> processor2 = TransformProcessor
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestJson2CsvProcessor.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestJson2CsvProcessor.java
index f649c5a8f7..0d45398c2c 100644
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestJson2CsvProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestJson2CsvProcessor.java
@@ -30,15 +30,16 @@ import org.junit.Test;
 
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 public class TestJson2CsvProcessor extends AbstractProcessorTestBase {
 
     @Test
     public void testJson2Csv() throws Exception {
-        List<FieldInfo> fields1 = this.getTestFieldList("sid", "packageID", 
"msgTime", "msg");
+        List<FieldInfo> fields1 = this.getTestFieldList("sid", "packageID", 
"msgTime", "msg", "ds");
         JsonSourceInfo jsonSource1 = new JsonSourceInfo("UTF-8", "msgs");
         CsvSinkInfo csvSink1 = new CsvSinkInfo("UTF-8", '|', '\\', fields1);
-        String transformSql1 = "select 
$root.sid,$root.packageID,$child.msgTime,$child.msg from source";
+        String transformSql1 = "select 
$root.sid,$root.packageID,$child.msgTime,$child.msg,$ctx.partition from source";
         TransformConfig config1 = new TransformConfig(transformSql1);
         // case1
         TransformProcessor<String, String> processor1 = TransformProcessor
@@ -52,10 +53,12 @@ public class TestJson2CsvProcessor extends 
AbstractProcessorTestBase {
                 + "  {\"msg\":\"v4\",\"msgTime\":1713243918000}\n"
                 + "  ]\n"
                 + "}";
-        List<String> output1 = processor1.transform(srcString1, new 
HashMap<>());
+        Map<String, Object> extParams = new HashMap<>();
+        extParams.put("partition", "2024042801");
+        List<String> output1 = processor1.transform(srcString1, extParams);
         Assert.assertEquals(2, output1.size());
-        Assert.assertEquals(output1.get(0), 
"value1|value2|1713243918000|value4");
-        Assert.assertEquals(output1.get(1), "value1|value2|1713243918000|v4");
+        Assert.assertEquals(output1.get(0), 
"value1|value2|1713243918000|value4|2024042801");
+        Assert.assertEquals(output1.get(1), 
"value1|value2|1713243918000|v4|2024042801");
         // case2
         List<FieldInfo> fields2 = this.getTestFieldList("id", "itemId", 
"subItemId", "msg");
         JsonSourceInfo jsonSource2 = new JsonSourceInfo("UTF-8", "items");
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestKv2CsvProcessor.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestKv2CsvProcessor.java
index 82a4762f93..d8766f3e52 100644
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestKv2CsvProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestKv2CsvProcessor.java
@@ -30,23 +30,26 @@ import org.junit.Test;
 
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 public class TestKv2CsvProcessor extends AbstractProcessorTestBase {
 
     @Test
     public void testKv2Csv() throws Exception {
-        List<FieldInfo> fields = this.getTestFieldList("ftime", "extinfo");
+        List<FieldInfo> fields = this.getTestFieldList("ftime", "extinfo", 
"ds");
         KvSourceInfo kvSource = new KvSourceInfo("UTF-8", fields);
         CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', fields);
-        String transformSql = "select ftime,extinfo from source where 
extinfo='ok'";
+        String transformSql = "select ftime,extinfo,$ctx.partition from source 
where extinfo='ok'";
         TransformConfig config = new TransformConfig(transformSql);
         // case1
         TransformProcessor<String, String> processor1 = TransformProcessor
                 .create(config, SourceDecoderFactory.createKvDecoder(kvSource),
                         SinkEncoderFactory.createCsvEncoder(csvSink));
-        List<String> output1 = processor1.transform("ftime=2024-04-28 
00:00:00&extinfo=ok", new HashMap<>());
+        Map<String, Object> extParams = new HashMap<>();
+        extParams.put("partition", "2024042801");
+        List<String> output1 = processor1.transform("ftime=2024-04-28 
00:00:00&extinfo=ok", extParams);
         Assert.assertEquals(1, output1.size());
-        Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok");
+        Assert.assertEquals(output1.get(0), "2024-04-28 
00:00:00|ok|2024042801");
         // case2
         config.setTransformSql("select ftime,extinfo from source where 
extinfo!='ok'");
         TransformProcessor<String, String> processor2 = TransformProcessor


Reply via email to