chen0623-bak opened a new issue, #5040: URL: https://github.com/apache/seatunnel/issues/5040
### Search before asking - [X] I had searched in the [feature](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22Feature%22) and found no similar feature requirement. ### Description 我想自定义一个函数, 根据key或者index获取目标类型中的field 但是我在resultType方法中获取不到传入udf参数, 导致我不能对嵌套的row类型做出处理 目前只能处理 1. arrya -> 返回 string 2. row -> 返回 string 3. map -> 能获取嵌套 这是我的配置 ``` env { execution.parallelism = 1 job.mode = "BATCH" } source { FakeSource { result_table_name="t1" schema = { fields { c_int = "int" c_string = "string" c_array = "array<string>" c_map = "map<string,map<string,string>>" c_row { c_int = "int" c_string = "string" } } } rows = [ { kind = INSERT fields = [1, "zs", ["can","tiao","rap"],{"student":{"name":"zhangsan","age":"18"}},[1,"shanghai"]] }, { kind = INSERT fields = [2, "ls", ["can","tiao","rap"],{"student":{"name":"lisi","age":"18"}},[2,"beijing"]] }, { kind = INSERT fields = [3, "ww", ["can","tiao","rap"],{"student":{"name":"wangwu","age":"18"}},[3,"shenzheng"]] }, ] } } transform { Sql { source_table_name = "t1" result_table_name = "t2" -- 获取 map 嵌套 item query = "select get_item(c_map,'student') from t1" -- 获取array tiem query = "select get_item(c_array,2) from t1" -- 获取row item,只能返回 string query = "select get_item(c_row,'c_string') from t1" } } sink { Console { source_table_name = "t2" } } ``` 这时我的udf ```java @AutoService(ZetaUDF.class) public class GetItemUDF implements ZetaUDF { private static SeaTunnelDataType<?> type; @Override public String functionName() { return "GET_ITEM"; } @Override public SeaTunnelDataType<?> resultType(List<SeaTunnelDataType<?>> argsType) { type = argsType.get(0); // array 只支持基本类型,统一返回string if(type instanceof ArrayType){ return BasicType.STRING_TYPE; // map 类型则直接返回value的类型 }else if(type instanceof MapType){ return ((MapType<?,?>) type).getValueType(); // 如果是row类型,则返回对应下标的类型 }else if(type instanceof SeaTunnelRowType){ /** * TODO 拿不到传入的参数,只能返回string * SeaTunnelRowType seaTunnelRowType = ((SeaTunnelRowType) type); * int fieldNameIndex = seaTunnelRowType.indexOf(param.toString()); * return seaTunnelRowType.getFieldType(fieldNameIndex); */ return BasicType.STRING_TYPE; } return BasicType.STRING_TYPE; } @SneakyThrows(RuntimeException.class) @Override public Object evaluate(List<Object> args) { if (args.size()!=2) { throw new RuntimeException("this get_item function needs 2 arguments but got " + args.size()); } Object data = args.get(0); Object param = args.get(1); if(type instanceof ArrayType){ int idx = Integer.parseInt(param.toString()); return ((Object[]) data)[idx].toString(); } else if (type instanceof MapType) { return ((Map<?, ?>) data).get(param); } else if (type instanceof SeaTunnelRowType){ int fieldNameIndex = ((SeaTunnelRowType) type).indexOf(param.toString()); return ((SeaTunnelRow) data).getField(fieldNameIndex); } throw new RuntimeException("this get_item function only supports [array,row,map] types ..."); } } ``` ### Usage Scenario _No response_ ### Related issues _No response_ ### Are you willing to submit a PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
