chen0623-bak commented on issue #4939:
URL: https://github.com/apache/seatunnel/issues/4939#issuecomment-1624706017

   @ocean-zhc 我的想法是这样的
   这是我的配置, 我已经测试过了 
   ```
   env {
     execution.parallelism = 1
     job.mode = "BATCH"
   }
   source {
       FakeSource {
         result_table_name="t1"
         schema = {
           fields {
             c_int = "int"
             c_string = "string"
             c_array = "array<string>"
             c_map = "map<string,string>"
             c_row {
               c_int = "int"
               c_string = "string"
             }
           }
         }
         rows = [
           {
             kind = INSERT
             fields = [1, "zs", 
["can","tiao","rap"],{"name":"zs","age":"18"},[1,"shanghai"]]
           },
           {
             kind = INSERT
             fields = [2, "ls", 
["can","tiao","rap"],{"name":"ls","age":"19"},[2,"beijing"]]
           },
           {
             kind = INSERT
             fields = [3, "ww", 
["can","tiao","rap"],{"name":"ww","age":"20"},[1,"shenzheng"]]
           },
         ]
       }
   }
   transform {
     Sql {
       source_table_name = "t1"
       result_table_name = "t2"
       -- query = "select get_time(c_row,1) from t1"
       -- query = "select get_time(c_map,'name') from t1"
       -- query = "select get_time(c_array,0) from t1"
     }
   }
   sink {
     Console {
       source_table_name = "t2"
     }
   }
   ```
   
   
   这是我自定义的udf
   
   ```java
   @AutoService(ZetaUDF.class)
   public class GetItemUDF implements ZetaUDF {
   
       @Override
       public String functionName() {
           return "GET_ITEM";
       }
   
       /**
        * 先调用 evaluate 然后才会调用 resultType
        * 所以在处理 evaluate 时,拿不到类型
        * 不然row类型应该也能根据key获取value的
        * 现在只能根据下标拿value
        * @param argsType
        * @return
        */
       @Override
       public SeaTunnelDataType<?> resultType(List<SeaTunnelDataType<?>> 
argsType) {
           return BasicType.STRING_TYPE;
       }
   
       @SneakyThrows(RuntimeException.class)
       @Override
       public Object evaluate(List<Object> args) {
           Object data = args.get(0);
           Object param = args.get(1);
   
           if (data instanceof Map) {
               return ((Map<?, ?>) data).get(param).toString();
           }else if(data instanceof Object[]){
               int index = Integer.parseInt(param.toString());
               return ((Object[]) data)[index].toString();
           }else if (data instanceof SeaTunnelRow){
               int index = Integer.parseInt(param.toString());
               return ((SeaTunnelRow) data).getField(index).toString();
           }
           throw new RuntimeException("this get_item function only supports 
[array,row,map] types ...");
       }
   }
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to