[ 
https://issues.apache.org/jira/browse/FLINK-33963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17812115#comment-17812115
 ] 

xuyang commented on FLINK-33963:
--------------------------------

>From my personal perspective, the method of adding class member variables into 
>scalar functions is rather uncommon. If the optimizer is instructed to never 
>reuse UDX objects under any circumstances, it will result in additional 
>overhead in terms of memory and the costs for serialization/deserialization.

A short-term feasible approach would be to incorporate similar examples into 
our documentation to guide advanced UDX developers on how to prevent UDX reuse 
if they wish to do so (just like in the example, by assigning different values 
to class member variables by overriding getTypeInference).

In the long term, we could consider this Jira as an improvement item, enabling 
codegen to prevent UDX reuse based on RexCall or an API approach.

WDYT?

> There is only one UDF instance after serializing the same task
> --------------------------------------------------------------
>
>                 Key: FLINK-33963
>                 URL: https://issues.apache.org/jira/browse/FLINK-33963
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.18.0
>         Environment: local env in idea test.
> java 8
>            Reporter: lifengchao
>            Priority: Major
>
> I define this UDF and expect the following SQL to return 'a', 'b', but it 
> return 'a', 'a'.
> {code:java}
> public class UdfSerializeFunc extends ScalarFunction {
>     static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class);
>     String cache;
>     @Override
>     public void open(FunctionContext context) throws Exception {
>         LOG.warn("open:{}.", this.hashCode());
>     }
>     public String eval(String a, String b){
>         if(cache == null){
>             LOG.warn("cache_null.cache:{}", b);
>             cache = b;
>         }
>         return cache;
>     }
> }
> {code}
> sql:
> {code:sql}
>     select
>         udf_ser(name, 'a') name1,
>         udf_ser(name, 'b') name2
>     from heros
> {code}
> Changing UDF to this will achieve the expected results.
> {code:java}
> public class UdfSerializeFunc2 extends ScalarFunction {
>     static final Logger LOG = 
> LoggerFactory.getLogger(UdfSerializeFunc2.class);
>     String cache;
>     @Override
>     public void open(FunctionContext context) throws Exception {
>         LOG.warn("open:{}.", this.hashCode());
>     }
>     public String eval(String a, String b){
>         if(cache == null){
>             LOG.warn("cache_null.cache:{}", b);
>             cache = b;
>         }
>         return cache;
>     }
>     @Override
>     public TypeInference getTypeInference(DataTypeFactory typeFactory) {
>         return TypeInference.newBuilder()
>                 .outputTypeStrategy(new TypeStrategy() {
>                     @Override
>                     public Optional<DataType> inferType(CallContext 
> callContext) {
>                         List<DataType> argumentDataTypes = 
> callContext.getArgumentDataTypes();
>                         if (argumentDataTypes.size() != 2) {
>                             throw callContext.newValidationError("arg size 
> error");
>                         }
>                         if (!callContext.isArgumentLiteral(1) || 
> callContext.isArgumentNull(1)) {
>                             throw callContext.newValidationError("Literal 
> expected for second argument.");
>                         }
>                         cache = callContext.getArgumentValue(1, 
> String.class).get();
>                         return Optional.of(DataTypes.STRING());
>                     }
>                 })
>         .build();
>     }
> }
> {code}
>  
> My complete test code:
> {code:java}
> public class UdfSerializeFunc extends ScalarFunction {
>     static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class);
>     String cache;
>     @Override
>     public void open(FunctionContext context) throws Exception {
>         LOG.warn("open:{}.", this.hashCode());
>     }
>     public String eval(String a, String b){
>         if(cache == null){
>             LOG.warn("cache_null.cache:{}", b);
>             cache = b;
>         }
>         return cache;
>     }
> }
> public class UdfSerializeFunc2 extends ScalarFunction {
>     static final Logger LOG = 
> LoggerFactory.getLogger(UdfSerializeFunc2.class);
>     String cache;
>     @Override
>     public void open(FunctionContext context) throws Exception {
>         LOG.warn("open:{}.", this.hashCode());
>     }
>     public String eval(String a, String b){
>         if(cache == null){
>             LOG.warn("cache_null.cache:{}", b);
>             cache = b;
>         }
>         return cache;
>     }
>     @Override
>     public TypeInference getTypeInference(DataTypeFactory typeFactory) {
>         return TypeInference.newBuilder()
>                 .outputTypeStrategy(new TypeStrategy() {
>                     @Override
>                     public Optional<DataType> inferType(CallContext 
> callContext) {
>                         List<DataType> argumentDataTypes = 
> callContext.getArgumentDataTypes();
>                         if (argumentDataTypes.size() != 2) {
>                             throw callContext.newValidationError("arg size 
> error");
>                         }
>                         if (!callContext.isArgumentLiteral(1) || 
> callContext.isArgumentNull(1)) {
>                             throw callContext.newValidationError("Literal 
> expected for second argument.");
>                         }
>                         cache = callContext.getArgumentValue(1, 
> String.class).get();
>                         return Optional.of(DataTypes.STRING());
>                     }
>                 })
>         .build();
>     }
> }
> class UdfSerializeSuite extends AnyFunSuite with BeforeAndAfterAll{
>   var env: StreamExecutionEnvironment = _
>   var tEnv: StreamTableEnvironment = _
>   override protected def beforeAll(): Unit = {
>     val conf = new Configuration()
>     env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
>     env.setParallelism(2)
>     env.getConfig.enableObjectReuse()
>     tEnv = StreamTableEnvironment.create(env)
>   }
>   /**
>    * 2个task,只是每个task有一个udf,udf_ser(name, 'a')和udf_ser(name, 'b')没区分开
>    * 它这函数的序列化,单个task的2个udf_ser序列化后还是同一个对象,不是2个
>    * getTypeInference中修改udf的属性可以实现2个不同的对象
>    */
>   test("UdfSerializeFunc"){
>     tEnv.createTemporarySystemFunction("udf_ser", classOf[UdfSerializeFunc])
>     var sql = """
>     CREATE TEMPORARY TABLE heros (
>       `name` STRING,
>       `power` STRING,
>       `age` INT
>     ) WITH (
>       'connector' = 'faker',
>       'fields.name.expression' = '#{superhero.name}',
>       'fields.power.expression' = '#{superhero.power}',
>       'fields.power.null-rate' = '0.05',
>       'rows-per-second' = '1',
>       'fields.age.expression' = '#{number.numberBetween ''0'',''1000''}'
>     )
>     """
>     tEnv.executeSql(sql)
>     sql = """
>     select
>         udf_ser(name, 'a') name1,
>         udf_ser(name, 'b') name2
>     from heros
>     """
>     val rstTable = tEnv.sqlQuery(sql)
>     rstTable.printSchema()
>     rstTable.execute().print()
>   }
>   /**
>    * 修改ScalarFunction的属性,能使之序列化后是不同的对象
>    */
>   test("UdfSerializeFunc2"){
>     tEnv.createTemporarySystemFunction("udf_ser", classOf[UdfSerializeFunc2])
>     var sql = """
>     CREATE TEMPORARY TABLE heros (
>       `name` STRING,
>       `power` STRING,
>       `age` INT
>     ) WITH (
>       'connector' = 'faker',
>       'fields.name.expression' = '#{superhero.name}',
>       'fields.power.expression' = '#{superhero.power}',
>       'fields.power.null-rate' = '0.05',
>       'rows-per-second' = '1',
>       'fields.age.expression' = '#{number.numberBetween ''0'',''1000''}'
>     )
>     """
>     tEnv.executeSql(sql)
>     sql = """
>     select
>         udf_ser(name, 'a') name1,
>         udf_ser(name, 'b') name2
>     from heros
>     """
>     val rstTable = tEnv.sqlQuery(sql)
>     rstTable.printSchema()
>     rstTable.execute().print()
>   }
>   override protected def afterAll(): Unit = {
>     env.execute()
>   }
> }
> {code}
> test UdfSerializeFunc log out:
> {code:java}
> (
>   `name1` STRING,
>   `name2` STRING
> )
> 14:44:41,183  DEBUG 
> org.apache.flink.table.planner.codegen.OperatorCodeGenerator$ 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Compiling 
> OneInputStreamOperator Code:
> StreamExecCalc
> 14:44:42,819  WARN  
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
> [ScalaTest-run-running-UdfSerializeSuite] [] - No tokens obtained so skipping 
> notifications
> 14:44:43,092  INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Starting rest endpoint.
> 14:44:43,240  WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Log file environment variable 
> 'log.file' is not set.
> 14:44:43,240  WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils 
> [ScalaTest-run-running-UdfSerializeSuite] [] - JobManager log files are 
> unavailable in the web dashboard. Log file location not found in environment 
> variable 'log.file' or configuration key 'web.log.path'.
> 14:44:43,711  INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Rest endpoint listening at 
> localhost:8081
> 14:44:43,716  INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Web frontend listening at 
> http://localhost:8081.
> 14:44:43,717  INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [mini-cluster-io-thread-1] [] - http://localhost:8081 was granted leadership 
> with leaderSessionID=18ab2e30-a83a-4ec0-be98-7d49b7628565
> 14:44:43,789  WARN  
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
> [flink-pekko.actor.default-dispatcher-4] [] - No tokens obtained so skipping 
> notifications
> 14:44:43,790  WARN  
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
> [flink-pekko.actor.default-dispatcher-4] [] - Tokens update task not started 
> because either no tokens obtained or none of the tokens specified its renewal 
> date
> 14:44:44,576  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc 
> [Source: Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - open:969139468.
> 14:44:44,576  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc 
> [Source: Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - 
> open:1737783673.
> 14:44:44,607  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc 
> [Source: Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - 
> cache_null.cache:a
> 14:44:44,607  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc 
> [Source: Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - 
> cache_null.cache:a
> +----+--------------------------------+--------------------------------+
> | op |                          name1 |                          name2 |
> +----+--------------------------------+--------------------------------+
> | +I |                              a |                              a |
> | +I |                              a |                              a |
> | +I |                              a |                              a |
> | +I |                              a |                              a |
> | +I |                              a |                              a |
> | +I |                              a |                              a |
> | +I |                              a |                              a |
> | +I |                              a |                              a |
> {code}
> test UdfSerializeFunc2 log out:
> {code:java}
> (
>   `name1` STRING,
>   `name2` STRING
> )
> 14:45:18,786  DEBUG 
> org.apache.flink.table.planner.codegen.OperatorCodeGenerator$ 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Compiling 
> OneInputStreamOperator Code:
> StreamExecCalc
> 14:45:20,296  WARN  
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
> [ScalaTest-run-running-UdfSerializeSuite] [] - No tokens obtained so skipping 
> notifications
> 14:45:20,518  INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Starting rest endpoint.
> 14:45:20,635  WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Log file environment variable 
> 'log.file' is not set.
> 14:45:20,635  WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils 
> [ScalaTest-run-running-UdfSerializeSuite] [] - JobManager log files are 
> unavailable in the web dashboard. Log file location not found in environment 
> variable 'log.file' or configuration key 'web.log.path'.
> 14:45:21,032  INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Rest endpoint listening at 
> localhost:8081
> 14:45:21,034  INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Web frontend listening at 
> http://localhost:8081.
> 14:45:21,035  INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [mini-cluster-io-thread-1] [] - http://localhost:8081 was granted leadership 
> with leaderSessionID=2fcfdba0-0e36-4e8b-9f3c-88f2c242458f
> 14:45:21,089  WARN  
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
> [flink-pekko.actor.default-dispatcher-4] [] - No tokens obtained so skipping 
> notifications
> 14:45:21,089  WARN  
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
> [flink-pekko.actor.default-dispatcher-4] [] - Tokens update task not started 
> because either no tokens obtained or none of the tokens specified its renewal 
> date
> 14:45:21,741  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc2 
> [Source: Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - 
> open:1439144392.
> 14:45:21,741  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc2 
> [Source: Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - open:381953409.
> 14:45:21,742  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc2 
> [Source: Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - 
> open:1162638327.
> 14:45:21,742  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc2 
> [Source: Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - open:391248806.
> +----+--------------------------------+--------------------------------+
> | op |                          name1 |                          name2 |
> +----+--------------------------------+--------------------------------+
> | +I |                              a |                              b |
> | +I |                              a |                              b |
> | +I |                              a |                              b |
> | +I |                              a |                              b |
> {code}
>  
> *This is an issue caused by UDF function serialization.*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to