This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ae3c3dfa11 [INLONG-12117][SDK] Support 
concat_struct/extract_struct/extract_binary function (#12118)
ae3c3dfa11 is described below

commit ae3c3dfa11eacb61fe135016e0580e977559b966
Author: ChunLiang Lu <[email protected]>
AuthorDate: Mon May 18 16:48:34 2026 +0800

    [INLONG-12117][SDK] Support concat_struct/extract_struct/extract_binary 
function (#12118)
    
    * [INLONG-12117][SDK] Support concat_struct/extract_struct/extract_binary 
function
    
    * fix AI comments
    
    * fix comments
    
    * For enum types, return the integer index.
    
    * Allow extract_struct to accept the results of extract_binary as its 
parameters. And the extract_struct function supports extracting array nodes.
    
    * Add support for array nodes in the extract_struct function.
    
    * New function concat_struct(node1, node2, ...) — picks values from 
multiple PB nodes and assembles them into one RowData.
    New function extract_struct_excluding(structPath, excludeChild1, 
excludeChild2, ...) — returns a copy of the struct field with the given 
sub-nodes removed.
    Composition extract_binary(extract_struct_excluding(...)) re-encodes the 
trimmed message as a byte[], allowing bulky sub-nodes to be stripped before 
being written to a binary sink column.
    Multi-level protobuf path lookups now reuse cached intermediate node 
values, avoiding repeated tree descents and improving transform throughput.
---
 .../sdk/transform/decode/AbstractSourceData.java   |   4 +-
 .../apache/inlong/sdk/transform/decode/PbNode.java | 196 ++++--
 .../inlong/sdk/transform/decode/PbSourceData.java  | 734 +++++++++++++++++----
 .../sdk/transform/decode/PbSourceDecoder.java      |   4 +-
 .../process/function/FunctionConstant.java         |   2 +
 .../process/function/pb/ConcatStructFunction.java  |  72 ++
 .../process/function/pb/ExtractBinaryFunction.java | 217 ++++++
 .../pb/ExtractStructExcludingFunction.java         | 182 +++++
 .../process/function/pb/ExtractStructFunction.java | 179 +++++
 .../sdk/transform/process/parser/ColumnParser.java |   7 +
 .../process/function/TestFunctionDoc.java          |   2 +-
 .../process/processor/TestPb2RowDataProcessor.java | 137 ++++
 12 files changed, 1566 insertions(+), 170 deletions(-)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AbstractSourceData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AbstractSourceData.java
index c7e40282ae..0caf9619eb 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AbstractSourceData.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AbstractSourceData.java
@@ -29,11 +29,11 @@ public abstract class AbstractSourceData implements 
SourceData {
 
     protected Context context;
 
-    protected boolean isContextField(String fieldName) {
+    public boolean isContextField(String fieldName) {
         return fieldName.startsWith(CTX_KEY);
     }
 
-    protected String getContextField(String fieldName) {
+    public String getContextField(String fieldName) {
         if (context == null) {
             return "";
         }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbNode.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbNode.java
index 310f6b1abb..7351958216 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbNode.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbNode.java
@@ -23,6 +23,8 @@ import 
com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
 import lombok.Data;
 import org.apache.commons.lang.math.NumberUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -34,63 +36,118 @@ import java.util.List;
 @Data
 public class PbNode {
 
+    public static final Logger LOG = LoggerFactory.getLogger(PbNode.class);
+
     private String name;
     private FieldDescriptor fieldDesc;
-    private Descriptors.Descriptor messageType;
     private boolean isLastNode = false;
-    private boolean isArray = false;
-    private int arrayIndex = -1;
-    private boolean isMap = false;
+    // primitive
+    private boolean isPrimitiveType = false;
+    // array
+    private boolean isArrayType = false;
+    private boolean hasArrayIndex = false;
+    private Integer arrayIndex;
+    // struct
+    private boolean isStructType = false;
+    // map
     private boolean isMapType = false;
-    private String mapKey = "";
+    private boolean hasMapKey = false;
+    private Object mapKey;
     private FieldDescriptor mapKeyDesc;
     private FieldDescriptor mapValueDesc;
+    // parent path
+    private String parentPath;
+    private String currentPath;
+    private String currentIndexPath;
 
-    public PbNode(Descriptors.Descriptor messageDesc, String nodeString, 
boolean isLastNode) {
-        int beginIndex = nodeString.indexOf('(');
-        if (beginIndex < 0) {
-            this.name = nodeString;
-            if (isMapDescriptor(messageDesc)) {
-                FieldDescriptor valueFieldDesc = 
messageDesc.getFields().get(1);
-                Descriptors.Descriptor valueTypeDesc = 
valueFieldDesc.getMessageType();
-                this.fieldDesc = valueTypeDesc.findFieldByName(name);
-                if (this.fieldDesc.getJavaType() == JavaType.MESSAGE) {
-                    this.messageType = this.fieldDesc.getMessageType();
+    public PbNode(Descriptors.Descriptor parentDesc, String parentPath, String 
nodeString, boolean isLastNode) {
+        try {
+            if (parentDesc == null) {
+                return;
+            }
+            this.isLastNode = isLastNode;
+            // parse name & index
+            int beginIndex = nodeString.indexOf('(');
+            String indexString = null;
+            if (beginIndex < 0) {
+                this.name = nodeString;
+            } else {
+                this.name = StringUtils.trim(nodeString.substring(0, 
beginIndex));
+                int endIndex = nodeString.lastIndexOf(')');
+                if (endIndex >= 0) {
+                    indexString = nodeString.substring(beginIndex + 1, 
endIndex);
                 }
+            }
+            // cache path key
+            this.parentPath = parentPath;
+            if (this.parentPath == null) {
+                this.currentPath = this.name;
+                this.currentIndexPath = nodeString;
             } else {
-                this.fieldDesc = messageDesc.findFieldByName(name);
-                if (this.fieldDesc.getJavaType() == JavaType.MESSAGE) {
-                    this.messageType = this.fieldDesc.getMessageType();
-                    if (isMapDescriptor(messageType)) {
-                        this.isMapType = true;
-                    }
+                this.currentPath = this.parentPath + "." + this.name;
+                this.currentIndexPath = this.parentPath + "." + nodeString;
+            }
+            // field desc
+            this.fieldDesc = parentDesc.findFieldByName(name);
+            if (this.fieldDesc == null) {
+                return;
+            }
+            // map
+            if (this.fieldDesc.getJavaType() == JavaType.MESSAGE
+                    && isMapDescriptor(this.fieldDesc.getMessageType())) {
+                this.isMapType = true;
+                this.mapKeyDesc = 
this.fieldDesc.getMessageType().getFields().get(0);
+                this.mapValueDesc = 
this.fieldDesc.getMessageType().getFields().get(1);
+                if (indexString != null) {
+                    this.hasMapKey = true;
+                    this.mapKey = parseMapKey(indexString, mapKeyDesc);
                 }
+                return;
             }
-        } else {
-            this.name = StringUtils.trim(nodeString.substring(0, beginIndex));
-            this.fieldDesc = messageDesc.findFieldByName(name);
-            if (this.fieldDesc.getJavaType() == JavaType.MESSAGE) {
-                this.messageType = this.fieldDesc.getMessageType();
-                int endIndex = nodeString.lastIndexOf(')');
-                if (isMapDescriptor(messageType)) {
-                    this.isMap = true;
-                    if (endIndex >= 0) {
-                        this.mapKey = nodeString.substring(beginIndex + 1, 
endIndex);
-                        this.mapKeyDesc = messageType.getFields().get(0);
-                        this.mapValueDesc = messageType.getFields().get(1);
-                    }
-                } else {
-                    this.isArray = true;
-                    if (endIndex >= 0) {
-                        this.arrayIndex = 
NumberUtils.toInt(nodeString.substring(beginIndex + 1, endIndex), -1);
-                        if (this.arrayIndex < 0) {
-                            this.arrayIndex = 0;
-                        }
-                    }
+            // array
+            if (this.fieldDesc.isRepeated()) {
+                this.isArrayType = true;
+                this.arrayIndex = NumberUtils.toInt(indexString, -1);
+                if (arrayIndex >= 0) {
+                    this.hasArrayIndex = true;
                 }
+                return;
+            }
+            // struct
+            if (this.fieldDesc.getJavaType() == JavaType.MESSAGE) {
+                this.isStructType = true;
+                return;
             }
+            // primitive
+            this.isPrimitiveType = true;
+        } catch (RuntimeException t) {
+            LOG.error("Fail to 
PbNode,error:{},fullName:{},nodePath:{},isLastNode:{}",
+                    t.getMessage(), parentDesc.getName(), nodeString, 
isLastNode, t);
+            throw t;
+        }
+    }
+
+    private static Object parseMapKey(String indexString, FieldDescriptor 
mapKeyDesc) {
+        switch (mapKeyDesc.getJavaType()) {
+            case STRING:
+                return indexString;
+            case INT:
+                return NumberUtils.toInt(indexString, 0);
+            case LONG:
+                return NumberUtils.toLong(indexString, 0);
+            case FLOAT:
+                return NumberUtils.toFloat(indexString, 0);
+            case DOUBLE:
+                return NumberUtils.toDouble(indexString, 0);
+            case BOOLEAN:
+                return Boolean.TRUE.toString().equals(indexString);
+            case ENUM:
+                return mapKeyDesc.getEnumType().findValueByName(indexString);
+            case BYTE_STRING:
+            case MESSAGE:
+            default:
+                return indexString;
         }
-        this.isLastNode = isLastNode;
     }
 
     /**
@@ -105,16 +162,61 @@ public class PbNode {
         }
         List<PbNode> nodes = new ArrayList<>();
         String[] nodeStrings = nodePath.split("\\.");
-        int lastIndex = nodeStrings.length - 1;
+        final int lastIndex = nodeStrings.length - 1;
+        String parentPath = null;
+        StringBuilder currentPathBuilder = new StringBuilder();
         Descriptors.Descriptor current = rootDesc;
         for (int i = 0; i <= lastIndex; i++) {
             if (current == null) {
                 return null;
             }
+            // pbNode
             String nodeString = nodeStrings[i];
-            PbNode pbNode = new PbNode(current, nodeString, (i == lastIndex));
-            current = pbNode.getMessageType();
-            nodes.add(pbNode);
+            PbNode pbNode = new PbNode(current, parentPath, nodeString, (i == 
lastIndex));
+            if (parentPath == null) {
+                currentPathBuilder.append(nodeString);
+            } else {
+                currentPathBuilder.append(".").append(nodeString);
+            }
+            parentPath = currentPathBuilder.toString();
+            if (pbNode.getFieldDesc() == null) {
+                return null;
+            }
+            // primitive
+            if (pbNode.isPrimitiveType()) {
+                current = null;
+                nodes.add(pbNode);
+                continue;
+            } else if (pbNode.isArrayType()) {
+                // array
+                if (pbNode.getFieldDesc().getJavaType() == JavaType.MESSAGE) {
+                    current = pbNode.getFieldDesc().getMessageType();
+                } else {
+                    current = null;
+                }
+                nodes.add(pbNode);
+                continue;
+            } else if (pbNode.isMapType()) {
+                // map
+                if (pbNode.isHasMapKey()) {
+                    if (pbNode.getMapValueDesc().getJavaType() == 
JavaType.MESSAGE) {
+                        current = pbNode.getMapValueDesc().getMessageType();
+                    } else {
+                        current = null;
+                    }
+                } else {
+                    current = null;
+                }
+                nodes.add(pbNode);
+                continue;
+            } else if (pbNode.isStructType()) {
+                // struct
+                current = pbNode.getFieldDesc().getMessageType();
+                nodes.add(pbNode);
+                continue;
+            } else {
+                return null;
+            }
         }
         return nodes;
     }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
index e46fb47d8d..ca63769c84 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
@@ -21,12 +21,16 @@ import org.apache.inlong.sdk.transform.process.Context;
 
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.EnumValueDescriptor;
 import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
 import com.google.protobuf.DynamicMessage;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.data.GenericMapData;
 import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.binary.BinaryStringData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,6 +39,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -45,6 +50,8 @@ public class PbSourceData extends AbstractSourceData {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PbSourceData.class);
 
+    public static final String ROOT = "$root";
+
     public static final String ROOT_KEY = "$root.";
 
     public static final String CHILD_KEY = "$child.";
@@ -61,6 +68,9 @@ public class PbSourceData extends AbstractSourceData {
 
     protected Charset srcCharset;
 
+    private Map<DynamicMessage, Map<String, Object>> nodeValueCache = new 
HashMap<>();
+    private Map<DynamicMessage, Map<String, Map<Object, Object>>> mapNodeCache 
= new HashMap<>();
+
     /**
      * Constructor
      */
@@ -110,18 +120,216 @@ public class PbSourceData extends AbstractSourceData {
      * @return
      */
     @Override
+    @SuppressWarnings({"rawtypes", "unchecked"})
     public Object getField(int rowNum, String fieldName) {
-        Object fieldValue = "";
         try {
+            // check(root);
             if (isContextField(fieldName)) {
                 return getContextField(fieldName);
             }
+            Object fieldValue = findFieldNode(rowNum, fieldName);
+            List<PbNode> childNodes = this.columnNodeMap.get(fieldName);
+            if (childNodes == null || childNodes.size() == 0) {
+                return null;
+            }
+            PbNode lastNode = childNodes.get(childNodes.size() - 1);
+            // primitive
+            if (lastNode.isPrimitiveType()) {
+                if (fieldValue instanceof ByteString) {
+                    ByteString byteString = (ByteString) fieldValue;
+                    return byteString.toByteArray();
+                } else {
+                    return fieldValue;
+                }
+            }
+            // struct
+            if (lastNode.isStructType()) {
+                if (!(fieldValue instanceof DynamicMessage)) {
+                    return null;
+                }
+                return 
buildStructData(lastNode.getFieldDesc().getMessageType(), (DynamicMessage) 
fieldValue);
+            }
+            // array
+            if (lastNode.isArrayType()) {
+                if (!lastNode.isHasArrayIndex()) {
+                    if (!(fieldValue instanceof List)) {
+                        return null;
+                    }
+                    List<Object> valueList = (List) fieldValue;
+                    List<Object> result = new ArrayList<>(valueList.size());
+                    for (Object value : valueList) {
+                        
result.add(this.buildFieldValue(lastNode.getFieldDesc(), value));
+                    }
+                    return new GenericArrayData(result.toArray());
+                }
+                return this.buildFieldValue(lastNode.getFieldDesc(), 
fieldValue);
+            }
+            // map
+            if (lastNode.isMapType()) {
+                if (!lastNode.isHasMapKey()) {
+                    return 
buildMapData(lastNode.getFieldDesc().getMessageType(), fieldValue);
+                }
+                return this.buildFieldValue(lastNode.getMapValueDesc(), 
fieldValue);
+            }
+            return null;
+        } catch (Exception e) {
+            LOG.error("fail to getField,error:{},rowNum:{},fieldName:{}", 
e.getMessage(), rowNum, fieldName, e);
+            return null;
+        }
+    }
+
+    public Object buildFieldValue(FieldDescriptor fieldDesc, Object nodeValue) 
{
+        if (fieldDesc == null || nodeValue == null) {
+            return null;
+        }
+        switch (fieldDesc.getJavaType()) {
+            case STRING:
+            case INT:
+            case LONG:
+            case FLOAT:
+            case DOUBLE:
+            case BOOLEAN:
+                return nodeValue;
+            case ENUM:
+                if (nodeValue instanceof EnumValueDescriptor) {
+                    EnumValueDescriptor enumDesc = (EnumValueDescriptor) 
nodeValue;
+                    return enumDesc.getIndex();
+                }
+                return null;
+            case BYTE_STRING:
+                if (nodeValue instanceof ByteString) {
+                    return ((ByteString) nodeValue).toByteArray();
+                } else {
+                    return nodeValue;
+                }
+            case MESSAGE:
+                return this.buildStructData(fieldDesc.getMessageType(), 
nodeValue);
+            default:
+                return String.valueOf(nodeValue);
+        }
+    }
+
+    public Object buildStructData(Descriptors.Descriptor messageType, Object 
nodeValue) {
+        // map
+        if (PbNode.isMapDescriptor(messageType)) {
+            return this.buildMapData(messageType, nodeValue);
+        }
+        // struct
+        if (!(nodeValue instanceof DynamicMessage)) {
+            return null;
+        }
+        DynamicMessage msgObj = (DynamicMessage) nodeValue;
+        GenericRowData result = new 
GenericRowData(messageType.getFields().size());
+        int index = 0;
+        for (FieldDescriptor fieldDesc : messageType.getFields()) {
+            Object fieldValue = msgObj.getField(fieldDesc);
+            if (fieldValue == null) {
+                result.setField(index++, null);
+                continue;
+            }
+            // field
+            if (!fieldDesc.isRepeated()) {
+                Object fieldResult = this.buildFieldValue(fieldDesc, 
fieldValue);
+                result.setField(index++, fieldResult);
+                continue;
+            }
+            // array
+            if (!(fieldValue instanceof List)) {
+                result.setField(index++, null);
+                continue;
+            }
+            // map
+            if (fieldDesc.getJavaType().equals(JavaType.MESSAGE)
+                    && PbNode.isMapDescriptor(fieldDesc.getMessageType())) {
+                result.setField(index++, 
buildMapData(fieldDesc.getMessageType(), fieldValue));
+            } else {
+                List<?> valueList = (List<?>) fieldValue;
+                List<Object> fieldResult = new ArrayList<>(valueList.size());
+                for (Object value : valueList) {
+                    fieldResult.add(this.buildFieldValue(fieldDesc, value));
+                }
+                result.setField(index++, new 
GenericArrayData(fieldResult.toArray()));
+            }
+        }
+        return result;
+    }
+
+    protected Object buildMapData(Descriptors.Descriptor messageType, Object 
nodeValue) {
+        if (!(nodeValue instanceof List)) {
+            return null;
+        }
+        Descriptors.FieldDescriptor keyField = 
messageType.findFieldByNumber(1);
+        Descriptors.FieldDescriptor valueField = 
messageType.findFieldByNumber(2);
+        List<?> subNodeValueList = (List<?>) nodeValue;
+        Map<Object, Object> result = new HashMap<>();
+        for (Object value : subNodeValueList) {
+            if (!(value instanceof DynamicMessage)) {
+                continue;
+            }
+            DynamicMessage subnodeValue = (DynamicMessage) value;
+            Object keyValue = buildFieldValue(keyField, 
subnodeValue.getField(keyField));
+            Object valueValue = buildFieldValue(valueField, 
subnodeValue.getField(valueField));
+            result.put(keyValue, valueValue);
+        }
+        return new GenericMapData(result);
+    }
+
+    /**
+     * get rootDesc
+     * @return the rootDesc
+     */
+    public Descriptors.Descriptor getRootDesc() {
+        return rootDesc;
+    }
+
+    /**
+     * get childDesc
+     * @return the childDesc
+     */
+    public Descriptors.Descriptor getChildDesc() {
+        return childDesc;
+    }
+
+    /**
+     * get root
+     * @return the root
+     */
+    public DynamicMessage getRoot() {
+        return root;
+    }
+
+    /**
+     * get childRoot
+     * @return the childRoot
+     */
+    public List<DynamicMessage> getChildRoot() {
+        return childRoot;
+    }
+
+    public Object findFieldNode(int rowNum, String fieldName) {
+        Object fieldValue = "";
+        try {
             if (StringUtils.startsWith(fieldName, ROOT_KEY)) {
-                fieldValue = this.getRootField(fieldName);
+                fieldValue = this.findRootField(fieldName);
             } else if (StringUtils.startsWith(fieldName, CHILD_KEY)) {
                 if (childRoot != null && rowNum < childRoot.size()) {
-                    fieldValue = this.getChildField(rowNum, fieldName);
+                    fieldValue = this.findChildField(rowNum, fieldName);
+                }
+            } else {
+                List<PbNode> childNodes = this.columnNodeMap.get(fieldName);
+                if (childNodes == null) {
+                    childNodes = PbNode.parseNodePath(rootDesc, fieldName);
+                    if (childNodes == null) {
+                        childNodes = new ArrayList<>();
+                    }
+                    this.columnNodeMap.put(fieldName, childNodes);
+                }
+                // error config
+                if (childNodes.size() == 0) {
+                    return "";
                 }
+                // parse other node
+                fieldValue = this.findNodeValueByCache(childNodes, root);
             }
             return fieldValue;
         } catch (Exception e) {
@@ -130,12 +338,25 @@ public class PbSourceData extends AbstractSourceData {
         return fieldValue;
     }
 
-    /**
-     * getRootField
-     * @param fieldName
-     * @return
-     */
-    private Object getRootField(String srcFieldName) {
+    public List<PbNode> parseStructNodeList(String srcFieldName, Descriptor 
currentDesc) {
+        List<PbNode> childNodes = this.columnNodeMap.get(srcFieldName);
+        if (childNodes == null) {
+            String fieldName = srcFieldName;
+            if (StringUtils.startsWith(fieldName, ROOT_KEY)) {
+                fieldName = srcFieldName.substring(ROOT_KEY.length());
+            } else if (StringUtils.startsWith(fieldName, CHILD_KEY)) {
+                fieldName = srcFieldName.substring(CHILD_KEY.length());
+            }
+            childNodes = PbNode.parseNodePath(currentDesc, fieldName);
+            if (childNodes == null) {
+                childNodes = new ArrayList<>();
+            }
+            this.columnNodeMap.put(srcFieldName, childNodes);
+        }
+        return childNodes;
+    }
+
+    private Object findRootField(String srcFieldName) {
         List<PbNode> childNodes = this.columnNodeMap.get(srcFieldName);
         if (childNodes == null) {
             String fieldName = srcFieldName.substring(ROOT_KEY.length());
@@ -147,20 +368,14 @@ public class PbSourceData extends AbstractSourceData {
         }
         // error config
         if (childNodes.size() == 0) {
-            return "";
+            return null;
         }
         // parse other node
-        Object fieldValue = this.getNodeValue(childNodes, root);
+        Object fieldValue = this.findNodeValueByCache(childNodes, root);
         return fieldValue;
     }
 
-    /**
-     * getChildField
-     * @param rowNum
-     * @param srcFieldName
-     * @return
-     */
-    private Object getChildField(int rowNum, String srcFieldName) {
+    private Object findChildField(int rowNum, String srcFieldName) {
         if (this.childRoot == null || this.childDesc == null) {
             return "";
         }
@@ -179,18 +394,111 @@ public class PbSourceData extends AbstractSourceData {
         }
         // parse other node
         DynamicMessage child = childRoot.get(rowNum);
-        Object fieldValue = this.getNodeValue(childNodes, child);
+        Object fieldValue = this.findNodeValueByCache(childNodes, child);
         return fieldValue;
     }
 
-    /**
-     * getNodeValue
-     * @param childNodes
-     * @param root
-     * @return
-     */
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    private Object getNodeValue(List<PbNode> childNodes, DynamicMessage root) {
+    public Object findNodeValueByCache(List<PbNode> childNodes, DynamicMessage 
root) {
+        Map<String, Object> subNodeValueCache = 
this.nodeValueCache.computeIfAbsent(root,
+                k -> new HashMap<>());
+        Map<String, Map<Object, Object>> subMapNodeCache = 
this.mapNodeCache.computeIfAbsent(root,
+                k -> new HashMap<>());
+        for (int i = childNodes.size() - 1; i >= 0; i--) {
+            PbNode node = childNodes.get(i);
+            // index path
+            Object subNodeValue = 
subNodeValueCache.get(node.getCurrentIndexPath());
+            if (subNodeValue != null) {
+                if (i == childNodes.size() - 1) {
+                    return subNodeValue;
+                } else {
+                    if (subNodeValue instanceof DynamicMessage) {
+                        List<PbNode> subChildNodes = childNodes.subList(i + 1, 
childNodes.size());
+                        return this.findNodeValue(subChildNodes, 
(DynamicMessage) subNodeValue);
+                    } else {
+                        return null;
+                    }
+                }
+            }
+            // path
+            subNodeValue = subNodeValueCache.get(node.getCurrentPath());
+            if (subNodeValue != null) {
+                if (i == childNodes.size() - 1) {
+                    return subNodeValue;
+                } else {
+                    // primitive
+                    if (node.isPrimitiveType()) {
+                        return null;
+                    }
+                    // struct
+                    if (node.isStructType()) {
+                        List<PbNode> subChildNodes = childNodes.subList(i + 1, 
childNodes.size());
+                        return this.findNodeValue(subChildNodes, 
(DynamicMessage) subNodeValue);
+                    }
+                    // array
+                    if (node.isArrayType()) {
+                        if (!node.isHasArrayIndex()) {
+                            return null;
+                        }
+                        if (!(subNodeValue instanceof List)) {
+                            return null;
+                        }
+                        List<?> nodeValueList = (List<?>) subNodeValue;
+                        if (node.getArrayIndex() >= nodeValueList.size()) {
+                            return null;
+                        }
+                        Object newNode = 
nodeValueList.get(node.getArrayIndex());
+                        if (!(newNode instanceof DynamicMessage)) {
+                            return null;
+                        }
+                        List<PbNode> subChildNodes = childNodes.subList(i + 1, 
childNodes.size());
+                        return this.findNodeValue(subChildNodes, 
(DynamicMessage) newNode);
+                    }
+                    // map
+                    if (node.isMapType()) {
+                        if (!node.isHasMapKey()) {
+                            return null;
+                        }
+                        final Object mapNodeValue = subNodeValue;
+                        Map<Object, Object> mapCache = 
subMapNodeCache.computeIfAbsent(node.getCurrentPath(),
+                                k -> parseMapNode(mapNodeValue, node));
+                        Object fieldValue = mapCache.get(node.getMapKey());
+                        if (fieldValue == null || !(fieldValue instanceof 
DynamicMessage)) {
+                            return null;
+                        }
+                        List<PbNode> subChildNodes = childNodes.subList(i + 1, 
childNodes.size());
+                        return this.findNodeValue(subChildNodes, 
(DynamicMessage) fieldValue);
+                    }
+                    return null;
+                }
+            }
+        }
+        return this.findNodeValue(childNodes, root);
+    }
+
+    private static Map<Object, Object> parseMapNode(Object nodeValue, PbNode 
node) {
+        if (!(nodeValue instanceof List)) {
+            return new HashMap<>();
+        }
+        List<?> nodeValueList = (List<?>) nodeValue;
+        Map<Object, Object> mapCache = new HashMap<>();
+        for (Object value : nodeValueList) {
+            if (!(value instanceof DynamicMessage)) {
+                continue;
+            }
+            DynamicMessage msg = (DynamicMessage) value;
+            Object keyValue = msg.getField(node.getMapKeyDesc());
+            Object valueValue = msg.getField(node.getMapValueDesc());
+            mapCache.put(keyValue, valueValue);
+        }
+        return mapCache;
+    }
+
+    // @SuppressWarnings({"rawtypes", "unchecked"})
+    public Object findNodeValue(List<PbNode> childNodes, DynamicMessage root) {
+        Map<String, Object> subNodeValueCache = 
this.nodeValueCache.computeIfAbsent(root,
+                k -> new HashMap<>());
+        Map<String, Map<Object, Object>> subMapNodeCache = 
this.mapNodeCache.computeIfAbsent(root,
+                k -> new HashMap<>());
         DynamicMessage current = root;
         for (int i = 0; i < childNodes.size(); i++) {
             PbNode node = childNodes.get(i);
@@ -199,120 +507,310 @@ public class PbSourceData extends AbstractSourceData {
                 // error data
                 break;
             }
-            if (!node.isLastNode()) {
-                if (node.isArray()) {
-                    current = (DynamicMessage) ((List) 
nodeValue).get(node.getArrayIndex());
-                } else if (node.isMap()) {
-                    List<DynamicMessage> nodeValueList = 
(List<DynamicMessage>) nodeValue;
-                    DynamicMessage newCurrent = null;
-                    for (DynamicMessage subnodeValue : nodeValueList) {
-                        String keyValue = 
String.valueOf(subnodeValue.getField(node.getMapKeyDesc()));
-                        if (StringUtils.equals(keyValue, node.getMapKey())) {
-                            newCurrent = (DynamicMessage) 
subnodeValue.getField(node.getMapValueDesc());
-                            break;
-                        }
+            if (node.isLastNode()) {
+                // primitive
+                if (node.isPrimitiveType()) {
+                    if (nodeValue instanceof ByteString) {
+                        ByteString byteString = (ByteString) nodeValue;
+                        return byteString.toByteArray();
+                    } else if 
(node.getFieldDesc().getJavaType().equals(JavaType.STRING)) {
+                        return new BinaryStringData(String.valueOf(nodeValue));
+                    } else {
+                        return nodeValue;
                     }
-                    if (newCurrent == null) {
+                }
+                // struct
+                if (node.isStructType()) {
+                    subNodeValueCache.put(node.getCurrentPath(), nodeValue);
+                    return nodeValue;
+                }
+                // array
+                if (node.isArrayType()) {
+                    subNodeValueCache.put(node.getCurrentPath(), nodeValue);
+                    if (!node.isHasArrayIndex()) {
+                        return nodeValue;
+                    }
+                    if (!(nodeValue instanceof List)) {
+                        return null;
+                    }
+                    List<?> nodeValueList = (List<?>) nodeValue;
+                    if (node.getArrayIndex() >= nodeValueList.size()) {
+                        return null;
+                    }
+                    Object arrayIndexNodeValue = 
nodeValueList.get(node.getArrayIndex());
+                    subNodeValueCache.put(node.getCurrentIndexPath(), 
arrayIndexNodeValue);
+                    return arrayIndexNodeValue;
+                }
+                // map
+                if (node.isMapType()) {
+                    subNodeValueCache.put(node.getCurrentPath(), nodeValue);
+                    if (!node.isHasMapKey()) {
+                        return nodeValue;
+                    }
+                    final Object mapNodeValue = nodeValue;
+                    Map<Object, Object> mapCache = 
subMapNodeCache.computeIfAbsent(node.getCurrentPath(),
+                            k -> parseMapNode(mapNodeValue, node));
+                    Object fieldValue = mapCache.get(node.getMapKey());
+                    subNodeValueCache.put(node.getCurrentIndexPath(), 
fieldValue);
+                    return fieldValue;
+                }
+                return null;
+            } else {
+                // primitive
+                if (node.isPrimitiveType()) {
+                    return null;
+                }
+                // struct
+                if (node.isStructType()) {
+                    subNodeValueCache.put(node.getCurrentPath(), nodeValue);
+                    if (!(nodeValue instanceof DynamicMessage)) {
                         return null;
                     }
-                    current = newCurrent;
-                } else {
                     current = (DynamicMessage) nodeValue;
+                    continue;
                 }
-                continue;
-            }
-            // last node
-            if (node.isArray()) {
-                return buildStructData(node.getMessageType(), ((List) 
nodeValue).get(node.getArrayIndex()));
-            } else if (node.isMap()) {
-                List<DynamicMessage> nodeValueList = (List<DynamicMessage>) 
nodeValue;
-                Object fieldValue = null;
-                for (DynamicMessage subnodeValue : nodeValueList) {
-                    String keyValue = 
String.valueOf(subnodeValue.getField(node.getMapKeyDesc()));
-                    if (StringUtils.equals(keyValue, node.getMapKey())) {
-                        fieldValue = 
subnodeValue.getField(node.getMapValueDesc());
-                        break;
+                // array
+                if (node.isArrayType()) {
+                    subNodeValueCache.put(node.getCurrentPath(), nodeValue);
+                    if (!node.isHasArrayIndex()) {
+                        return null;
                     }
+                    if (!(nodeValue instanceof List)) {
+                        return null;
+                    }
+                    List<?> nodeValueList = (List<?>) nodeValue;
+                    if (node.getArrayIndex() >= nodeValueList.size()) {
+                        return null;
+                    }
+                    Object newNode = nodeValueList.get(node.getArrayIndex());
+                    subNodeValueCache.put(node.getCurrentIndexPath(), newNode);
+                    if (!(newNode instanceof DynamicMessage)) {
+                        return null;
+                    }
+                    current = (DynamicMessage) newNode;
+                    continue;
                 }
-                return this.buildFieldValue(node.getFieldDesc(), fieldValue, 
false);
-            } else if (node.isMapType()) {
-                return this.buildStructData(node.getMessageType(), nodeValue);
-            } else if (node.getFieldDesc().isRepeated()) {
-                List<Object> valueList = (List) nodeValue;
-                List<Object> result = new ArrayList<>(valueList.size());
-                for (Object value : valueList) {
-                    result.add(this.buildFieldValue(node.getFieldDesc(), 
value, false));
+                // map
+                if (node.isMapType()) {
+                    subNodeValueCache.put(node.getCurrentPath(), nodeValue);
+                    if (!node.isHasMapKey()) {
+                        return null;
+                    }
+                    final Object mapNodeValue = nodeValue;
+                    Map<Object, Object> mapCache = 
subMapNodeCache.computeIfAbsent(node.getCurrentPath(),
+                            k -> parseMapNode(mapNodeValue, node));
+                    Object fieldValue = mapCache.get(node.getMapKey());
+                    subNodeValueCache.put(node.getCurrentIndexPath(), 
fieldValue);
+                    if (fieldValue == null || !(fieldValue instanceof 
DynamicMessage)) {
+                        return null;
+                    }
+                    current = (DynamicMessage) fieldValue;
+                    continue;
                 }
-                return new GenericArrayData(result.toArray());
-            } else {
-                return this.buildFieldValue(node.getFieldDesc(), nodeValue, 
false);
+                return null;
             }
         }
         return null;
     }
 
-    @SuppressWarnings("unchecked")
-    private Object buildFieldValue(FieldDescriptor fieldDesc, Object 
nodeValue, boolean isRepeated) {
-        if (nodeValue == null) {
-            return null;
+    /**
+     * Clear the leaf field referenced by {@code childNodes} on a copy of 
{@code root}.
+     * <p>
+     * Implementation notes (important):
+     * <ul>
+     *   <li>Intermediate nodes are descended by reading the value out of the 
parent
+     *       builder, creating a sub-builder via {@link 
DynamicMessage#toBuilder()},
+     *       recursing into it, and then writing the rebuilt sub-message back 
via
+     *       {@code setField} / {@code setRepeatedField}. We never rely on 
automatic
+     *       reverse propagation from {@code getFieldBuilder}, which is not 
consistent
+     *       across protobuf-java versions for {@code 
DynamicMessage.Builder}.</li>
+     *   <li>Repeated and map entries are NEVER mutated through the list 
returned by
+     *       {@code getField} (it is an unmodifiable view in many protobuf 
versions).
+     *       Instead the field is cleared and the kept entries are re-added via
+     *       {@code addRepeatedField}, which is the portable way to "remove" 
an entry
+     *       from a {@code DynamicMessage.Builder}.</li>
+     * </ul>
+     *
+     * @param childNodes path to the leaf node to clear
+     * @param root       the top-level builder; modifications are applied to it
+     */
+    public void clearNodeValue(List<PbNode> childNodes, DynamicMessage.Builder 
root) {
+        if (childNodes == null || childNodes.isEmpty() || root == null) {
+            return;
         }
-        switch (fieldDesc.getJavaType()) {
-            case STRING:
-            case INT:
-            case LONG:
-            case FLOAT:
-            case DOUBLE:
-            case BOOLEAN:
-            case ENUM:
-                return nodeValue;
-            case BYTE_STRING:
-                return ((ByteString) nodeValue).toByteArray();
-            case MESSAGE: {
-                if (!isRepeated) {
-                    return this.buildStructData(fieldDesc.getMessageType(), 
nodeValue);
-                } else if (PbNode.isMapDescriptor(fieldDesc.getMessageType())) 
{
-                    return this.buildStructData(fieldDesc.getMessageType(), 
nodeValue);
+        clearNodeValueRec(root, childNodes, 0);
+    }
+
+    /**
+     * Recursive helper. Modifies {@code builder} in place; for nested levels, 
every
+     * change is written back to {@code builder} via setField/setRepeatedField 
as we
+     * unwind, so the topmost caller sees the modification.
+     */
+    private void clearNodeValueRec(DynamicMessage.Builder builder, 
List<PbNode> nodes, int from) {
+        PbNode node = nodes.get(from);
+        FieldDescriptor fd = node.getFieldDesc();
+        if (fd == null) {
+            return;
+        }
+
+        boolean isLast = (from == nodes.size() - 1);
+
+        // ============================== LEAF ==============================
+        if (isLast) {
+            // primitive / struct / repeated-without-index / map-without-key: 
clearField wipes
+            // the whole field. This is the safe single-call PB API.
+            if (node.isPrimitiveType()
+                    || node.isStructType()
+                    || (node.isArrayType() && !node.isHasArrayIndex())
+                    || (node.isMapType() && !node.isHasMapKey())) {
+                builder.clearField(fd);
+                return;
+            }
+            // repeated with explicit index: rebuild the list without the 
target element.
+            if (node.isArrayType() && node.isHasArrayIndex()) {
+                removeRepeatedAt(builder, fd, node.getArrayIndex());
+                return;
+            }
+            // map with explicit key: rebuild the entries dropping the 
matching key.
+            if (node.isMapType() && node.isHasMapKey()) {
+                removeMapEntryByKey(builder, fd, node.getMapKeyDesc(), 
node.getMapKey());
+                return;
+            }
+            return;
+        }
+
+        // ============================ INTERMEDIATE 
============================
+        // primitive intermediate: invalid path, abort
+        if (node.isPrimitiveType()) {
+            return;
+        }
+
+        // struct intermediate: descend into the message field, mutate, then 
setField back
+        if (node.isStructType()) {
+            if (!builder.hasField(fd)) {
+                return;
+            }
+            Object child = builder.getField(fd);
+            if (!(child instanceof DynamicMessage)) {
+                return;
+            }
+            DynamicMessage.Builder childBuilder = ((DynamicMessage) 
child).toBuilder();
+            clearNodeValueRec(childBuilder, nodes, from + 1);
+            builder.setField(fd, childBuilder.build());
+            return;
+        }
+
+        // array intermediate (only meaningful with an explicit index): 
descend into that element
+        if (node.isArrayType()) {
+            if (!node.isHasArrayIndex() || fd.getJavaType() != 
JavaType.MESSAGE) {
+                return;
+            }
+            int idx = node.getArrayIndex();
+            int count = builder.getRepeatedFieldCount(fd);
+            if (idx < 0 || idx >= count) {
+                return;
+            }
+            Object element = builder.getRepeatedField(fd, idx);
+            if (!(element instanceof DynamicMessage)) {
+                return;
+            }
+            DynamicMessage.Builder eb = ((DynamicMessage) element).toBuilder();
+            clearNodeValueRec(eb, nodes, from + 1);
+            builder.setRepeatedField(fd, idx, eb.build());
+            return;
+        }
+
+        // map intermediate: only meaningful with an explicit key.
+        // Descend into the value of the matching entry, mutate it, and 
replace the entry.
+        if (node.isMapType()) {
+            if (!node.isHasMapKey()) {
+                return;
+            }
+            FieldDescriptor mapKeyDesc = node.getMapKeyDesc();
+            FieldDescriptor mapValueDesc = node.getMapValueDesc();
+            if (mapKeyDesc == null || mapValueDesc == null
+                    || mapValueDesc.getJavaType() != JavaType.MESSAGE) {
+                return;
+            }
+            int count = builder.getRepeatedFieldCount(fd);
+            for (int i = 0; i < count; i++) {
+                Object entry = builder.getRepeatedField(fd, i);
+                if (!(entry instanceof DynamicMessage)) {
+                    continue;
+                }
+                DynamicMessage entryMsg = (DynamicMessage) entry;
+                Object keyVal = entryMsg.getField(mapKeyDesc);
+                if (keyVal == null || !Objects.equals(node.getMapKey(), 
keyVal)) {
+                    continue;
                 }
-                List<DynamicMessage> valueList = (List<DynamicMessage>) 
nodeValue;
-                List<Object> result = new ArrayList<>(valueList.size());
-                for (DynamicMessage value : valueList) {
-                    
result.add(this.buildStructData(fieldDesc.getMessageType(), value));
+                Object valObj = entryMsg.getField(mapValueDesc);
+                if (!(valObj instanceof DynamicMessage)) {
+                    return;
                 }
-                return new GenericArrayData(result.toArray());
+                DynamicMessage.Builder valBuilder = ((DynamicMessage) 
valObj).toBuilder();
+                clearNodeValueRec(valBuilder, nodes, from + 1);
+                DynamicMessage.Builder entryBuilder = entryMsg.toBuilder();
+                entryBuilder.setField(mapValueDesc, valBuilder.build());
+                builder.setRepeatedField(fd, i, entryBuilder.build());
+                return;
             }
-            default:
-                return String.valueOf(nodeValue);
         }
     }
 
-    @SuppressWarnings("unchecked")
-    protected Object buildStructData(Descriptors.Descriptor messageType, 
Object nodeValue) {
-        // map
-        if (PbNode.isMapDescriptor(messageType)) {
-            Descriptors.FieldDescriptor keyField = 
messageType.findFieldByNumber(1);
-            Descriptors.FieldDescriptor valueField = 
messageType.findFieldByNumber(2);
-            List<DynamicMessage> subNodeValueList = (List<DynamicMessage>) 
nodeValue;
-            Map<Object, Object> result = new HashMap<>();
-            for (DynamicMessage subnodeValue : subNodeValueList) {
-                Object keyValue = buildFieldValue(keyField, 
subnodeValue.getField(keyField), false);
-                Object valueValue = buildFieldValue(valueField, 
subnodeValue.getField(valueField), false);
-                result.put(keyValue, valueValue);
-            }
-            return new GenericMapData(result);
+    /**
+     * Remove the {@code targetIndex}-th element from the given repeated field 
on
+     * {@code builder}. {@code DynamicMessage.Builder} does not expose
+     * {@code removeRepeatedField} portably across protobuf-java versions, so 
we
+     * rebuild the field via clearField + addRepeatedField.
+     */
+    private static void removeRepeatedAt(DynamicMessage.Builder builder,
+            FieldDescriptor fd, int targetIndex) {
+        int count = builder.getRepeatedFieldCount(fd);
+        if (targetIndex < 0 || targetIndex >= count) {
+            return;
         }
-        // struct
-        DynamicMessage msgObj = (DynamicMessage) nodeValue;
-        GenericRowData result = new 
GenericRowData(messageType.getFields().size());
-        int index = 0;
-        for (FieldDescriptor fieldDesc : messageType.getFields()) {
-            Object fieldValue = msgObj.getField(fieldDesc);
-            if (fieldValue == null) {
-                result.setField(index++, null);
+        List<Object> kept = new ArrayList<>(count - 1);
+        for (int i = 0; i < count; i++) {
+            if (i == targetIndex) {
                 continue;
             }
-            Object fieldResult = this.buildFieldValue(fieldDesc, fieldValue, 
false);
-            result.setField(index++, fieldResult);
+            kept.add(builder.getRepeatedField(fd, i));
+        }
+        builder.clearField(fd);
+        for (Object v : kept) {
+            builder.addRepeatedField(fd, v);
+        }
+    }
+
+    /**
+     * Remove the map entry whose key equals {@code targetKey} from the given 
map field
+     * on {@code builder}. Uses the portable clearField + addRepeatedField 
approach.
+     */
+    private static void removeMapEntryByKey(DynamicMessage.Builder builder,
+            FieldDescriptor fd, FieldDescriptor mapKeyDesc, Object targetKey) {
+        if (mapKeyDesc == null || targetKey == null) {
+            return;
+        }
+        int count = builder.getRepeatedFieldCount(fd);
+        List<Object> kept = new ArrayList<>(count);
+        boolean removed = false;
+        for (int i = 0; i < count; i++) {
+            Object entry = builder.getRepeatedField(fd, i);
+            if (entry instanceof DynamicMessage) {
+                Object keyVal = ((DynamicMessage) entry).getField(mapKeyDesc);
+                if (Objects.equals(targetKey, keyVal)) {
+                    removed = true;
+                    continue;
+                }
+            }
+            kept.add(entry);
+        }
+        if (!removed) {
+            return;
+        }
+        builder.clearField(fd);
+        for (Object e : kept) {
+            builder.addRepeatedField(fd, e);
         }
-        return result;
     }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
index cc4a6a8e99..46ce0215f1 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
@@ -86,7 +86,7 @@ public class PbSourceDecoder extends SourceDecoder<String> {
             this.rowsNodePath = sourceInfo.getRowsNodePath();
             this.childNodes = PbNode.parseNodePath(rootDesc, rowsNodePath);
             if (this.childNodes != null && this.childNodes.size() > 0) {
-                this.childDesc = this.childNodes.get(this.childNodes.size() - 
1).getMessageType();
+                this.childDesc = this.childNodes.get(this.childNodes.size() - 
1).getFieldDesc().getMessageType();
             }
         } catch (Exception e) {
             LOG.error(e.getMessage(), e);
@@ -127,7 +127,7 @@ public class PbSourceDecoder extends SourceDecoder<String> {
                             break;
                         }
                     }
-                    if (!node.isArray()) {
+                    if (!node.isArrayType()) {
                         if (!(nodeValue instanceof DynamicMessage)) {
                             // error data
                             return new PbSourceData(root, rootDesc, 
columnNodeMap, srcCharset);
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/FunctionConstant.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/FunctionConstant.java
index 1abe442f84..bca3ec36dd 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/FunctionConstant.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/FunctionConstant.java
@@ -35,4 +35,6 @@ public class FunctionConstant {
 
     public static final String TEMPORAL_TYPE = "temporal";
 
+    public static final String PB_TYPE = "pb";
+
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ConcatStructFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ConcatStructFunction.java
new file mode 100644
index 0000000000..6d23644ab1
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ConcatStructFunction.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function.pb;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.function.FunctionConstant;
+import org.apache.inlong.sdk.transform.process.function.TransformFunction;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+import org.apache.flink.table.data.GenericRowData;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * ConcatStructFunction  ->  concat_struct(field1, field2, field3...)
+ * description:
+ * - Always returns a GenericRowData whose arity equals the number of input 
parameters.
+ * - If any parameter evaluates to NULL, the corresponding position in the 
returned
+ *   GenericRowData is set to NULL while the other positions are populated 
normally.
+ * - Each field value is taken from the protobuf source data based on its path.
+ */
+@TransformFunction(type = FunctionConstant.PB_TYPE, names = {
+        "concat_struct"}, parameter = "(field1,field2,field3...)", 
descriptions = {
+                "- Always returns a GenericRowData whose arity equals the 
number of input parameters;",
+                "- If any parameter is NULL, the corresponding position in the 
returned "
+                        + "GenericRowData is set to NULL while the other 
positions are populated normally;",
+                "- Each field value is taken from the protobuf source data 
based on its 'path'."
+        }, examples = {
+                "concat_struct($root.name,$root.age) = +I(\"Alice\",11)"
+        })
+public class ConcatStructFunction implements ValueParser {
+
+    private final List<ValueParser> fieldParsers;
+
+    public ConcatStructFunction(Function expr) {
+        List<Expression> expressions = expr.getParameters().getExpressions();
+        this.fieldParsers = new ArrayList<>();
+        for (int i = 0; i < expressions.size(); i++) {
+            
this.fieldParsers.add(OperatorTools.buildParser(expressions.get(i)));
+        }
+    }
+
+    @Override
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        GenericRowData result = new GenericRowData(fieldParsers.size());
+        int index = 0;
+        for (ValueParser parser : fieldParsers) {
+            result.setField(index++, parser.parse(sourceData, rowIndex, 
context));
+        }
+        return result;
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ExtractBinaryFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ExtractBinaryFunction.java
new file mode 100644
index 0000000000..102031f82f
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ExtractBinaryFunction.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function.pb;
+
+import org.apache.inlong.sdk.transform.decode.PbNode;
+import org.apache.inlong.sdk.transform.decode.PbSourceData;
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.function.FunctionConstant;
+import org.apache.inlong.sdk.transform.process.function.TransformFunction;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ColumnParser;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.MessageLite;
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.table.data.GenericArrayData;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * ExtractBinaryFunction  ->  extract_binary(path)
+ * description:
+ * - Only works on protobuf source data; returns NULL if the source is not a 
PbSourceData.
+ * - Returns NULL if 'path' is missing/invalid, or the path cannot be resolved 
to a value
+ *   in the protobuf message.
+ * - For primitive / struct / map nodes and array nodes with an explicit array 
index,
+ *   returns the matched value serialized as a {@code byte[]}.
+ * - For array nodes without an array index, returns a {@link 
GenericArrayData} whose
+ *   elements are the {@code byte[]} representation of each list value.
+ */
+@TransformFunction(type = FunctionConstant.PB_TYPE, names = {
+        "extract_binary"}, parameter = "(path)", descriptions = {
+                "- Only works on protobuf source data; returns NULL if the 
source is not a PbSourceData;",
+                "- Returns NULL if 'path' is missing/invalid, or the path 
cannot be resolved "
+                        + "to a value in the protobuf message;",
+                "- For primitive / struct / map nodes and array nodes with an 
explicit array index, "
+                        + "returns the matched value serialized as a byte[];",
+                "- For array nodes without an array index, returns a 
GenericArrayData whose elements "
+                        + "are the byte[] representation of each list value."
+        }, examples = {
+                "extract_binary($root.feature) = [62,111]"
+        })
+public class ExtractBinaryFunction implements ValueParser {
+
+    private final ValueParser pathParser;
+    private Descriptor parentDesc;
+    private DynamicMessage parentRoot;
+
+    public ExtractBinaryFunction(Function expr) {
+        List<Expression> expressions = expr.getParameters().getExpressions();
+        this.pathParser = OperatorTools.buildParser(expressions.get(0));
+    }
+
+    @Override
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        // data
+        if (!(sourceData instanceof PbSourceData)) {
+            return null;
+        }
+        if (pathParser instanceof ColumnParser) {
+            return this.parseByColumnParser(sourceData, rowIndex, context);
+        }
+        if (pathParser instanceof ExtractStructExcludingFunction) {
+            ExtractStructExcludingFunction excluding = 
(ExtractStructExcludingFunction) pathParser;
+            excluding.setKeepMessage(true);
+            Object result = excluding.parse(sourceData, rowIndex, context);
+            return toByteArray(result);
+        }
+        Object result = this.pathParser.parse(sourceData, rowIndex, context);
+        return toByteArray(result);
+    }
+
+    public Object parseByColumnParser(SourceData sourceData, int rowIndex, 
Context context) {
+        String path = ((ColumnParser) pathParser).getFieldName();
+        if (path == null) {
+            return null;
+        }
+        PbSourceData pbData = (PbSourceData) sourceData;
+        if (StringUtils.equals(PbSourceData.ROOT, path)) {
+            return pbData.getRoot().toByteArray();
+        }
+        // node list
+        List<PbNode> childNodes = null;
+        boolean isParentData = false;
+        if (StringUtils.startsWith(path, PbSourceData.ROOT_KEY)) {
+            childNodes = pbData.parseStructNodeList(path, 
pbData.getRootDesc());
+        } else if (StringUtils.startsWith(path, PbSourceData.CHILD_KEY)) {
+            if (pbData.getChildDesc() == null) {
+                return null;
+            }
+            childNodes = pbData.parseStructNodeList(path, 
pbData.getChildDesc());
+        } else if (parentDesc != null) {
+            childNodes = pbData.parseStructNodeList(path, parentDesc);
+            isParentData = true;
+        }
+        if (childNodes == null || childNodes.size() <= 0) {
+            return null;
+        }
+        // value
+        Object currentNode = null;
+        if (isParentData) {
+            currentNode = pbData.findNodeValueByCache(childNodes, parentRoot);
+        } else {
+            currentNode = pbData.findFieldNode(rowIndex, path);
+        }
+        if (currentNode == null) {
+            return null;
+        }
+        // array
+        PbNode lastNode = childNodes.get(childNodes.size() - 1);
+        // primitive
+        if (lastNode.isPrimitiveType()) {
+            return toByteArray(currentNode);
+        }
+        // struct
+        if (lastNode.isStructType()) {
+            return toByteArray(currentNode);
+        }
+        // array
+        if (lastNode.isArrayType()) {
+            if (!lastNode.isHasArrayIndex()) {
+                if (!(currentNode instanceof List)) {
+                    return null;
+                }
+                List<?> valueList = (List<?>) currentNode;
+                List<Object> fieldResult = new ArrayList<>(valueList.size());
+                for (Object value : valueList) {
+                    fieldResult.add(toByteArray(value));
+                }
+                return new GenericArrayData(fieldResult.toArray());
+            }
+            return toByteArray(currentNode);
+        }
+        // map
+        if (lastNode.isMapType()) {
+            return toByteArray(currentNode);
+        }
+        return null;
+    }
+
+    private Object toByteArray(Object currentNode) {
+        if (currentNode == null) {
+            return null;
+        }
+        if (currentNode instanceof MessageLite) {
+            return ((MessageLite) currentNode).toByteArray();
+        }
+        if (currentNode instanceof ByteString) {
+            return ((ByteString) currentNode).toByteArray();
+        }
+        if (currentNode instanceof List) {
+            List<?> valueList = (List<?>) currentNode;
+            List<Object> fieldResult = new ArrayList<>(valueList.size());
+            for (Object value : valueList) {
+                fieldResult.add(toByteArray(value));
+            }
+            return new GenericArrayData(fieldResult.toArray());
+        }
+        return 
String.valueOf(currentNode).getBytes(StandardCharsets.ISO_8859_1);
+    }
+
+    /**
+     * get parentDesc
+     * @return the parentDesc
+     */
+    public Descriptor getParentDesc() {
+        return parentDesc;
+    }
+
+    /**
+     * set parentDesc
+     * @param parentDesc the parentDesc to set
+     */
+    public void setParentDesc(Descriptor parentDesc) {
+        this.parentDesc = parentDesc;
+    }
+
+    /**
+     * get parentRoot
+     * @return the parentRoot
+     */
+    public DynamicMessage getParentRoot() {
+        return parentRoot;
+    }
+
+    /**
+     * set parentRoot
+     * @param parentRoot the parentRoot to set
+     */
+    public void setParentRoot(DynamicMessage parentRoot) {
+        this.parentRoot = parentRoot;
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ExtractStructExcludingFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ExtractStructExcludingFunction.java
new file mode 100644
index 0000000000..589f943388
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ExtractStructExcludingFunction.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function.pb;
+
+import org.apache.inlong.sdk.transform.decode.PbNode;
+import org.apache.inlong.sdk.transform.decode.PbSourceData;
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.function.FunctionConstant;
+import org.apache.inlong.sdk.transform.process.function.TransformFunction;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ColumnParser;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
+import com.google.protobuf.DynamicMessage;
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+import org.apache.flink.table.data.GenericArrayData;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * ExtractStructExcludingFunction  ->  extract_struct_excluding(path, 
excludeField1, excludeField2, ...)
+ * description:
+ * - Only works on protobuf source data; returns NULL if the source is not a 
PbSourceData.
+ * - Returns NULL if 'path' is missing/invalid, or the path cannot be resolved 
to a
+ *   message-typed node in the protobuf source data.
+ * - Each {@code excludeFieldN} is the name (relative to 'path') of a 
sub-field that
+ *   should be REMOVED from a copy of the located message before returning. 
Parameters
+ *   that cannot be resolved on the located message, or are not plain column 
references,
+ *   are silently ignored. The original record is never mutated.
+ * - When 'path' resolves to:
+ *     - a single message: returns a GenericRowData built from a trimmed copy 
of that
+ *       message (the excluded fields are cleared);
+ *     - a repeated (array) field of messages: returns a GenericArrayData 
whose elements
+ *       are the trimmed GenericRowData for every array element.
+ */
+@TransformFunction(type = FunctionConstant.PB_TYPE, names = {
+        "extract_struct_excluding"}, parameter = "(path, excludeField1, 
excludeField2, ...)", descriptions = {
+                "- Only works on protobuf source data; returns NULL if the 
source is not a PbSourceData;",
+                "- Returns NULL if 'path' is missing/invalid, or the path 
cannot be resolved "
+                        + "to a message-typed node;",
+                "- Each excludeFieldN is the name (relative to 'path') of a 
sub-field to REMOVE "
+                        + "from a copy of the located message; unknown / 
non-column-reference "
+                        + "parameters are silently ignored;",
+                "- When 'path' resolves to a single message, returns a trimmed 
GenericRowData. "
+                        + "When 'path' resolves to a repeated message field, 
returns a "
+                        + "GenericArrayData whose elements are the trimmed 
GenericRowData for "
+                        + "every array element. The original record is never 
mutated."
+        }, examples = {
+                "extract_struct_excluding($root.person,address,phone) "
+                        + "= <GenericRowData of person without address and 
phone>"
+        })
+public class ExtractStructExcludingFunction implements ValueParser {
+
+    private final ValueParser pathParser;
+    private final List<ValueParser> fieldParsers;
+    private String path;
+    private boolean isKeepMessage = false;
+
+    public ExtractStructExcludingFunction(Function expr) {
+        List<Expression> expressions = expr.getParameters().getExpressions();
+        this.pathParser = OperatorTools.buildParser(expressions.get(0));
+        if (pathParser instanceof ColumnParser) {
+            this.path = ((ColumnParser) pathParser).getFieldName();
+        }
+        this.fieldParsers = new ArrayList<>();
+        for (int i = 1; i < expressions.size(); i++) {
+            
this.fieldParsers.add(OperatorTools.buildParser(expressions.get(i)));
+        }
+    }
+
+    @Override
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        if (!(sourceData instanceof PbSourceData)) {
+            return null;
+        }
+        if (path == null) {
+            return null;
+        }
+        PbSourceData pbData = (PbSourceData) sourceData;
+        if (PbSourceData.ROOT.equals(path)) {
+            return buildStruct(pbData, rowIndex, context, pbData.getRoot());
+        }
+        // parse path
+        List<PbNode> pathChildNodes = pbData.parseStructNodeList(path, 
pbData.getRootDesc());
+        if (pathChildNodes == null || pathChildNodes.size() == 0) {
+            return null;
+        }
+        // check message type
+        PbNode lastNode = pathChildNodes.get(pathChildNodes.size() - 1);
+        if (!lastNode.getFieldDesc().getJavaType().equals(JavaType.MESSAGE)) {
+            return null;
+        }
+        // get data
+        Object currentNode = pbData.findFieldNode(rowIndex, path);
+        if (currentNode == null) {
+            return null;
+        }
+        // array node
+        if (lastNode.isArrayType() && !lastNode.isHasArrayIndex()) {
+            if (!(currentNode instanceof List)) {
+                return null;
+            }
+            List<?> currentNodeList = (List<?>) currentNode;
+            List<Object> valueResult = new ArrayList<>(currentNodeList.size());
+            for (Object nodeValue : currentNodeList) {
+                if (!(nodeValue instanceof DynamicMessage)) {
+                    continue;
+                }
+                DynamicMessage currentValue = (DynamicMessage) nodeValue;
+                Object item = buildStruct(pbData, rowIndex, context, 
currentValue);
+                valueResult.add(item);
+            }
+            GenericArrayData result = new 
GenericArrayData(valueResult.toArray());
+            return result;
+        } else {
+            // struct node
+            if (!(currentNode instanceof DynamicMessage)) {
+                return null;
+            }
+            DynamicMessage currentValue = (DynamicMessage) currentNode;
+            return buildStruct(pbData, rowIndex, context, currentValue);
+        }
+    }
+
+    private Object buildStruct(PbSourceData pbData, int rowIndex, Context 
context,
+            DynamicMessage rawValue) {
+        DynamicMessage.Builder currentValue = rawValue.toBuilder();
+        Descriptor currentDesc = currentValue.getDescriptorForType();
+        for (ValueParser parser : fieldParsers) {
+            if (parser instanceof ColumnParser) {
+                ColumnParser columnParser = (ColumnParser) parser;
+                String fieldName = columnParser.getFieldName();
+                List<PbNode> childNodes = 
pbData.parseStructNodeList(fieldName, currentDesc);
+                if (childNodes == null || childNodes.size() == 0) {
+                    continue;
+                }
+                pbData.clearNodeValue(childNodes, currentValue);
+            }
+        }
+        if (isKeepMessage()) {
+            return currentValue.build();
+        }
+        Object result = pbData.buildStructData(currentDesc, 
currentValue.build());
+        return result;
+    }
+
+    /**
+     * get isKeepMessage
+     * @return the isKeepMessage
+     */
+    public boolean isKeepMessage() {
+        return isKeepMessage;
+    }
+
+    /**
+     * set isKeepMessage
+     * @param isKeepMessage the isKeepMessage to set
+     */
+    public void setKeepMessage(boolean isKeepMessage) {
+        this.isKeepMessage = isKeepMessage;
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ExtractStructFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ExtractStructFunction.java
new file mode 100644
index 0000000000..d2c0c2c70f
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ExtractStructFunction.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function.pb;
+
+import org.apache.inlong.sdk.transform.decode.PbNode;
+import org.apache.inlong.sdk.transform.decode.PbSourceData;
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.function.FunctionConstant;
+import org.apache.inlong.sdk.transform.process.function.TransformFunction;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ColumnParser;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
+import com.google.protobuf.DynamicMessage;
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * ExtractStructFunction  ->  extract_struct(path, field1, field2, field3...)
+ * description:
+ * - Only works on protobuf source data; returns NULL if the source is not a 
PbSourceData.
+ * - Returns NULL if 'path' is missing/invalid, or the path cannot be resolved 
to a
+ *   DynamicMessage in the protobuf source data.
+ * - Otherwise, returns a GenericRowData whose arity equals the number of 
declared
+ *   fields (field1, field2, ...). For each declared field:
+ *     - if the field can be resolved on the located message, the corresponding
+ *       position is filled with the resolved value;
+ *     - otherwise (field not found, or the parameter is not a column 
reference),
+ *       the corresponding position is set to NULL.
+ */
+@TransformFunction(type = FunctionConstant.PB_TYPE, names = {
+        "extract_struct"}, parameter = "(path, field1,field2,field3...)", 
descriptions = {
+                "- Only works on protobuf source data; returns NULL if the 
source is not a PbSourceData;",
+                "- Returns NULL if 'path' is missing/invalid, or the path 
cannot be resolved "
+                        + "to a DynamicMessage in the protobuf source data;",
+                "- Otherwise, returns a GenericRowData whose arity equals the 
number of declared fields. "
+                        + "Each position is filled with the resolved field 
value, or NULL if the field "
+                        + "cannot be resolved on the located message."
+        }, examples = {
+                "extract_struct($root.person,name,age) = +I(\"Alice\",11)"
+        })
+public class ExtractStructFunction implements ValueParser {
+
+    private final ValueParser pathParser;
+    private final List<ValueParser> fieldParsers;
+    private String path;
+
+    public ExtractStructFunction(Function expr) {
+        List<Expression> expressions = expr.getParameters().getExpressions();
+        this.pathParser = OperatorTools.buildParser(expressions.get(0));
+        if (pathParser instanceof ColumnParser) {
+            this.path = ((ColumnParser) pathParser).getFieldName();
+        }
+        this.fieldParsers = new ArrayList<>();
+        for (int i = 1; i < expressions.size(); i++) {
+            
this.fieldParsers.add(OperatorTools.buildParser(expressions.get(i)));
+        }
+    }
+
+    @Override
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        if (!(sourceData instanceof PbSourceData)) {
+            return null;
+        }
+        if (path == null) {
+            return null;
+        }
+        PbSourceData pbData = (PbSourceData) sourceData;
+        // parse path
+        List<PbNode> pathChildNodes = pbData.parseStructNodeList(path, 
pbData.getRootDesc());
+        if (pathChildNodes == null || pathChildNodes.size() == 0) {
+            return null;
+        }
+        // check message type
+        PbNode lastNode = pathChildNodes.get(pathChildNodes.size() - 1);
+        if (!lastNode.getFieldDesc().getJavaType().equals(JavaType.MESSAGE)) {
+            return null;
+        }
+        // get data
+        Object currentNode = pbData.findFieldNode(rowIndex, path);
+        if (currentNode == null) {
+            return null;
+        }
+        // array node
+        if (lastNode.isArrayType() && !lastNode.isHasArrayIndex()) {
+            if (!(currentNode instanceof List)) {
+                return null;
+            }
+            List<?> currentNodeList = (List<?>) currentNode;
+            List<GenericRowData> valueResult = new 
ArrayList<>(currentNodeList.size());
+            for (Object nodeValue : currentNodeList) {
+                if (!(nodeValue instanceof DynamicMessage)) {
+                    continue;
+                }
+                DynamicMessage currentValue = (DynamicMessage) nodeValue;
+                GenericRowData item = buildStruct(pbData, rowIndex, context, 
currentValue);
+                valueResult.add(item);
+            }
+            GenericArrayData result = new 
GenericArrayData(valueResult.toArray());
+            return result;
+        } else {
+            // struct node
+            if (!(currentNode instanceof DynamicMessage)) {
+                return null;
+            }
+            DynamicMessage currentValue = (DynamicMessage) currentNode;
+            return buildStruct(pbData, rowIndex, context, currentValue);
+        }
+    }
+
+    private GenericRowData buildStruct(PbSourceData pbData, int rowIndex, 
Context context,
+            DynamicMessage currentValue) {
+        Descriptor currentDesc = currentValue.getDescriptorForType();
+        GenericRowData result = new GenericRowData(fieldParsers.size());
+        int index = 0;
+        for (ValueParser parser : fieldParsers) {
+            if (parser instanceof ColumnParser) {
+                ColumnParser columnParser = (ColumnParser) parser;
+                String fieldName = columnParser.getFieldName();
+                List<PbNode> childNodes = 
pbData.parseStructNodeList(fieldName, currentDesc);
+                if (childNodes == null || childNodes.size() == 0) {
+                    result.setField(index++, null);
+                    continue;
+                }
+                Object fieldValue = pbData.findNodeValueByCache(childNodes, 
currentValue);
+                PbNode lastNode = childNodes.get(childNodes.size() - 1);
+                if (lastNode.isArrayType() && !lastNode.isHasArrayIndex()) {
+                    if (!(fieldValue instanceof List)) {
+                        result.setField(index++, null);
+                        continue;
+                    }
+                    List<?> valueList = (List<?>) fieldValue;
+                    List<Object> valueResult = new 
ArrayList<>(valueList.size());
+                    for (Object value : valueList) {
+                        Object transformedValue = 
pbData.buildFieldValue(lastNode.getFieldDesc(), value);
+                        valueResult.add(transformedValue);
+                    }
+                    GenericArrayData arrayItem = new 
GenericArrayData(valueResult.toArray());
+                    result.setField(index++, arrayItem);
+                } else {
+                    Object transformedValue = 
pbData.buildFieldValue(lastNode.getFieldDesc(), fieldValue);
+                    result.setField(index++, transformedValue);
+                }
+            } else if (parser instanceof ExtractBinaryFunction) {
+                ExtractBinaryFunction extractBinaryFunc = 
(ExtractBinaryFunction) parser;
+                extractBinaryFunc.setParentRoot(currentValue);
+                extractBinaryFunc.setParentDesc(currentDesc);
+                Object fieldValue = extractBinaryFunc.parse(pbData, rowIndex, 
context);
+                result.setField(index++, fieldValue);
+            } else {
+                result.setField(index++, null);
+            }
+        }
+        return result;
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
index ebec767375..7102b3c969 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
@@ -51,4 +51,11 @@ public class ColumnParser implements ValueParser {
         return sourceData.getField(rowIndex, fieldName);
     }
 
+    /**
+     * get fieldName
+     * @return the fieldName
+     */
+    public String getFieldName() {
+        return fieldName;
+    }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/TestFunctionDoc.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/TestFunctionDoc.java
index 53ca1a4979..4cb272a23d 100644
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/TestFunctionDoc.java
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/TestFunctionDoc.java
@@ -32,7 +32,7 @@ public class TestFunctionDoc extends 
AbstractFunctionStringTestBase {
     @Test
     public void TestFunctionDoc() {
         Map<String, Set<FunctionInfo>> functionDocMap = 
FunctionTools.getFunctionDoc();
-        Assert.assertEquals(8, functionDocMap.size());
+        Assert.assertEquals(9, functionDocMap.size());
         System.out.println(new Gson().toJson(functionDocMap));
     }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2RowDataProcessor.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2RowDataProcessor.java
index a126f8d52d..e6dcb6e44e 100644
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2RowDataProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2RowDataProcessor.java
@@ -32,6 +32,7 @@ import org.apache.inlong.sdk.transform.pojo.RowDataSinkInfo;
 import org.apache.inlong.sdk.transform.pojo.TransformConfig;
 import org.apache.inlong.sdk.transform.process.TransformProcessor;
 
+import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.data.GenericMapData;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
@@ -115,4 +116,140 @@ public class TestPb2RowDataProcessor extends 
AbstractProcessorTestBase {
         Assert.assertEquals(((GenericRowData) output.get(1).getRow(5, 
3)).getMap(2).size(), 1);
         Assert.assertEquals(output.get(1).getArray(6).size(), 2);
     }
+
+    @Test
+    public void testPb2RowData4Struct() throws Exception {
+        String transformBase64 = this.getPbTestDescription();
+        PbSourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, 
"SdkDataRequest", "msgs");
+        List<FieldInfo> sinkFields = this.getTestFieldList("sid", "packageID", 
"msgTime");
+        // concat_struct
+        FieldInfo concatStructField = new FieldInfo("concatStruct");
+        String[] concatStructFields = new String[]{"attaID", "packageID"};
+        FormatInfo[] concatStructFieldFormats = new FormatInfo[]{
+                new StringFormatInfo(),
+                new LongFormatInfo()
+        };
+        RowFormatInfo concatStructFormat = new 
RowFormatInfo(concatStructFields, concatStructFieldFormats);
+        concatStructField.setFormatInfo(concatStructFormat);
+        sinkFields.add(concatStructField);
+        // extract_struct
+        FieldInfo extractStructField = new FieldInfo("extractStruct");
+        String[] extractStructFields = new String[]{"msg", "msgTime"};
+        FormatInfo[] extractStructFieldFormats = new FormatInfo[]{
+                new BinaryFormatInfo(Integer.MAX_VALUE),
+                new LongFormatInfo()
+        };
+        RowFormatInfo extractStructFormat = new 
RowFormatInfo(extractStructFields, extractStructFieldFormats);
+        extractStructField.setFormatInfo(extractStructFormat);
+        sinkFields.add(extractStructField);
+        // extract_binary_string
+        FieldInfo extractBinaryStringField = new 
FieldInfo("extractBinaryString");
+        extractBinaryStringField.setFormatInfo(new 
BinaryFormatInfo(Integer.MAX_VALUE));
+        sinkFields.add(extractBinaryStringField);
+        // extract_binary_array_binary
+        FieldInfo extractBinaryArrayBinaryField = new 
FieldInfo("extractBinaryArrayBinary");
+        ArrayFormatInfo extractBinaryArrayBinaryFormat = new 
ArrayFormatInfo(new BinaryFormatInfo(Integer.MAX_VALUE));
+        
extractBinaryArrayBinaryField.setFormatInfo(extractBinaryArrayBinaryFormat);
+        sinkFields.add(extractBinaryArrayBinaryField);
+        // extract_binary_map
+        FieldInfo extractBinaryMapField = new FieldInfo("extractBinaryMap");
+        extractBinaryMapField.setFormatInfo(new 
BinaryFormatInfo(Integer.MAX_VALUE));
+        sinkFields.add(extractBinaryMapField);
+        // sink
+        RowDataSinkInfo rowSink = new RowDataSinkInfo("UTF-8", sinkFields);
+        // sql
+        String transformSql = "select $root.sid,$root.packageID,$child.msgTime"
+                + ",concat_struct($root.sid,$root.packageID) as concatStruct"
+                + ",extract_struct($root.msgs(0),msg,msgTime) as extractStruct"
+                + ",extract_binary($root.sid) as extractBinaryString"
+                + ",extract_binary($root.msgs) as extractBinaryArrayBinary"
+                + ",extract_binary($child.extinfo) as extractBinaryMap "
+                + "from source";
+        TransformConfig config = new TransformConfig(transformSql);
+        // case1
+        TransformProcessor<String, RowData> processor = TransformProcessor
+                .create(config, SourceDecoderFactory.createPbDecoder(pbSource),
+                        SinkEncoderFactory.createRowEncoder(rowSink));
+        byte[] srcBytes = this.getPbTestData();
+        List<RowData> output = processor.transformForBytes(srcBytes, new 
HashMap<>());
+        Assert.assertEquals(2, output.size());
+        // 0
+        Assert.assertEquals(output.get(0).getString(0).toString(), "sid");
+        Assert.assertEquals(output.get(0).getString(1).toString(), "1");
+        Assert.assertEquals(output.get(0).getString(2).toString(), 
"1713243918000");
+
+        Assert.assertEquals(((GenericRowData) output.get(0).getRow(3, 
2)).getString(0).toString(), "sid");
+        Assert.assertEquals(((GenericRowData) output.get(0).getRow(3, 
2)).getLong(1), 1);
+
+        Assert.assertEquals(((GenericRowData) output.get(0).getRow(4, 
2)).getBinary(0).length, 9);
+        Assert.assertEquals(((GenericRowData) output.get(0).getRow(4, 
2)).getLong(1), 1713243918000L);
+
+        Assert.assertEquals(output.get(0).getBinary(5).length, 3);
+
+        Assert.assertEquals(((GenericArrayData) 
output.get(0).getArray(6)).size(), 2);
+        Assert.assertEquals(((GenericArrayData) 
output.get(0).getArray(6)).getBinary(0).length, 32);
+        Assert.assertEquals(((GenericArrayData) 
output.get(0).getArray(6)).getBinary(1).length, 35);
+
+        Assert.assertEquals(output.get(0).getBinary(7).length, 53);
+    }
+
+    @Test
+    public void testPb2RowData4ExtractStructExcluding() throws Exception {
+        String transformBase64 = this.getPbTestDescription();
+        PbSourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, 
"SdkDataRequest", "msgs");
+        List<FieldInfo> sinkFields = this.getTestFieldList("sid", "packageID", 
"msgTime");
+        // extract_struct
+        FieldInfo extractStructExcludingField = new 
FieldInfo("extractStructExcluding");
+        String[] extractStructExcludingFields = new String[]{"msg", "msgTime", 
"extinfo"};
+        FormatInfo[] extractStructExcludingFieldFormats = new FormatInfo[]{
+                new BinaryFormatInfo(Integer.MAX_VALUE),
+                new LongFormatInfo(),
+                new MapFormatInfo(new StringFormatInfo(), new 
StringFormatInfo())
+        };
+        RowFormatInfo extractStructExcludingFormat = new 
RowFormatInfo(extractStructExcludingFields,
+                extractStructExcludingFieldFormats);
+        
extractStructExcludingField.setFormatInfo(extractStructExcludingFormat);
+        sinkFields.add(extractStructExcludingField);
+        // rootBinary
+        FieldInfo rootBinary = new FieldInfo("rootBinary");
+        rootBinary.setFormatInfo(new BinaryFormatInfo(Integer.MAX_VALUE));
+        sinkFields.add(rootBinary);
+        // extractStructExcludingBinary
+        FieldInfo extractStructExcludingBinary = new 
FieldInfo("extractStructExcludingBinary");
+        extractStructExcludingBinary.setFormatInfo(new 
BinaryFormatInfo(Integer.MAX_VALUE));
+        sinkFields.add(extractStructExcludingBinary);
+        // extractStructBinary
+        FieldInfo extractStructBinary = new FieldInfo("extractStructBinary");
+        extractStructBinary.setFormatInfo(new 
BinaryFormatInfo(Integer.MAX_VALUE));
+        sinkFields.add(extractStructBinary);
+        // sink
+        RowDataSinkInfo rowSink = new RowDataSinkInfo("UTF-8", sinkFields);
+        // sql
+        String transformSql = "select $root.sid,$root.packageID,$child.msgTime"
+                + ",extract_struct_excluding($root.msgs(0),msg,extinfo) as 
extractStructExcluding "
+                + ",extract_binary($root) as rootBinary "
+                + 
",extract_binary(extract_struct_excluding($root.msgs(0),msg,extinfo)) as 
extractStructExcludingBinary "
+                + ",extract_binary(extract_struct($root.msgs(0),msg,msgTime)) 
as extractStructBinary "
+                + " from source";
+        TransformConfig config = new TransformConfig(transformSql);
+        // case1
+        TransformProcessor<String, RowData> processor = TransformProcessor
+                .create(config, SourceDecoderFactory.createPbDecoder(pbSource),
+                        SinkEncoderFactory.createRowEncoder(rowSink));
+        byte[] srcBytes = this.getPbTestData();
+        List<RowData> output = processor.transformForBytes(srcBytes, new 
HashMap<>());
+        Assert.assertEquals(2, output.size());
+        // 0
+        Assert.assertEquals(output.get(0).getString(0).toString(), "sid");
+        Assert.assertEquals(output.get(0).getString(1).toString(), "1");
+        Assert.assertEquals(output.get(0).getString(2).toString(), 
"1713243918000");
+
+        Assert.assertEquals(((GenericRowData) output.get(0).getRow(3, 
3)).getBinary(0).length, 0);
+        Assert.assertEquals(((GenericRowData) output.get(0).getRow(3, 
3)).getLong(1), 1713243918000L);
+        Assert.assertEquals(((GenericRowData) output.get(0).getRow(3, 
3)).getMap(2).size(), 0);
+
+        Assert.assertEquals(output.get(0).getBinary(4).length, 78);
+        Assert.assertEquals(output.get(0).getBinary(5).length, 7);
+        Assert.assertEquals(output.get(0).getBinary(6).length, 60);
+    }
 }


Reply via email to