[ 
https://issues.apache.org/jira/browse/FLINK-33963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17811845#comment-17811845
 ] 

xuyang commented on FLINK-33963:
--------------------------------

Hi, [~lifengchao] .

This is because in the current implementation, whether two UDX instances are 
considered being reused is determined by the arrays after serializing the UDX. 
In `UdfSerializeFunc`, since the cache is always null during the codegen phase, 
the variables generated after codegen become identical after two POJOs are 
serialized. In `UdfSerializeFunc2`, since getTypeInference is called before 
codegen, the different value about field `cache` leads to different 
serialization results, and after codegen, two different variables are 
maintained.

To solve this issue, I think it is unreasonable to determine the reuse of UDX 
POJOs based on whether the serialization is the same or not. Perhaps we could 
consider determining reuse based on the digest of RexCall. cc [~lsy] 

> There is only one UDF instance after serializing the same task
> --------------------------------------------------------------
>
>                 Key: FLINK-33963
>                 URL: https://issues.apache.org/jira/browse/FLINK-33963
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.18.0
>         Environment: local env in idea test.
> java 8
>            Reporter: lifengchao
>            Priority: Major
>
> I define this UDF and expect the following SQL to return 'a', 'b', but it 
> return 'a', 'a'.
> {code:java}
> public class UdfSerializeFunc extends ScalarFunction {
>     static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class);
>     String cache;
>     @Override
>     public void open(FunctionContext context) throws Exception {
>         LOG.warn("open:{}.", this.hashCode());
>     }
>     public String eval(String a, String b){
>         if(cache == null){
>             LOG.warn("cache_null.cache:{}", b);
>             cache = b;
>         }
>         return cache;
>     }
> }
> {code}
> sql:
> {code:sql}
>     select
>         udf_ser(name, 'a') name1,
>         udf_ser(name, 'b') name2
>     from heros
> {code}
> Changing UDF to this will achieve the expected results.
> {code:java}
> public class UdfSerializeFunc2 extends ScalarFunction {
>     static final Logger LOG = 
> LoggerFactory.getLogger(UdfSerializeFunc2.class);
>     String cache;
>     @Override
>     public void open(FunctionContext context) throws Exception {
>         LOG.warn("open:{}.", this.hashCode());
>     }
>     public String eval(String a, String b){
>         if(cache == null){
>             LOG.warn("cache_null.cache:{}", b);
>             cache = b;
>         }
>         return cache;
>     }
>     @Override
>     public TypeInference getTypeInference(DataTypeFactory typeFactory) {
>         return TypeInference.newBuilder()
>                 .outputTypeStrategy(new TypeStrategy() {
>                     @Override
>                     public Optional<DataType> inferType(CallContext 
> callContext) {
>                         List<DataType> argumentDataTypes = 
> callContext.getArgumentDataTypes();
>                         if (argumentDataTypes.size() != 2) {
>                             throw callContext.newValidationError("arg size 
> error");
>                         }
>                         if (!callContext.isArgumentLiteral(1) || 
> callContext.isArgumentNull(1)) {
>                             throw callContext.newValidationError("Literal 
> expected for second argument.");
>                         }
>                         cache = callContext.getArgumentValue(1, 
> String.class).get();
>                         return Optional.of(DataTypes.STRING());
>                     }
>                 })
>         .build();
>     }
> }
> {code}
>  
> My complete test code:
> {code:java}
> public class UdfSerializeFunc extends ScalarFunction {
>     static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class);
>     String cache;
>     @Override
>     public void open(FunctionContext context) throws Exception {
>         LOG.warn("open:{}.", this.hashCode());
>     }
>     public String eval(String a, String b){
>         if(cache == null){
>             LOG.warn("cache_null.cache:{}", b);
>             cache = b;
>         }
>         return cache;
>     }
> }
> public class UdfSerializeFunc2 extends ScalarFunction {
>     static final Logger LOG = 
> LoggerFactory.getLogger(UdfSerializeFunc2.class);
>     String cache;
>     @Override
>     public void open(FunctionContext context) throws Exception {
>         LOG.warn("open:{}.", this.hashCode());
>     }
>     public String eval(String a, String b){
>         if(cache == null){
>             LOG.warn("cache_null.cache:{}", b);
>             cache = b;
>         }
>         return cache;
>     }
>     @Override
>     public TypeInference getTypeInference(DataTypeFactory typeFactory) {
>         return TypeInference.newBuilder()
>                 .outputTypeStrategy(new TypeStrategy() {
>                     @Override
>                     public Optional<DataType> inferType(CallContext 
> callContext) {
>                         List<DataType> argumentDataTypes = 
> callContext.getArgumentDataTypes();
>                         if (argumentDataTypes.size() != 2) {
>                             throw callContext.newValidationError("arg size 
> error");
>                         }
>                         if (!callContext.isArgumentLiteral(1) || 
> callContext.isArgumentNull(1)) {
>                             throw callContext.newValidationError("Literal 
> expected for second argument.");
>                         }
>                         cache = callContext.getArgumentValue(1, 
> String.class).get();
>                         return Optional.of(DataTypes.STRING());
>                     }
>                 })
>         .build();
>     }
> }
> class UdfSerializeSuite extends AnyFunSuite with BeforeAndAfterAll{
>   var env: StreamExecutionEnvironment = _
>   var tEnv: StreamTableEnvironment = _
>   override protected def beforeAll(): Unit = {
>     val conf = new Configuration()
>     env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
>     env.setParallelism(2)
>     env.getConfig.enableObjectReuse()
>     tEnv = StreamTableEnvironment.create(env)
>   }
>   /**
>    * 2个task,只是每个task有一个udf,udf_ser(name, 'a')和udf_ser(name, 'b')没区分开
>    * 它这函数的序列化,单个task的2个udf_ser序列化后还是同一个对象,不是2个
>    * getTypeInference中修改udf的属性可以实现2个不同的对象
>    */
>   test("UdfSerializeFunc"){
>     tEnv.createTemporarySystemFunction("udf_ser", classOf[UdfSerializeFunc])
>     var sql = """
>     CREATE TEMPORARY TABLE heros (
>       `name` STRING,
>       `power` STRING,
>       `age` INT
>     ) WITH (
>       'connector' = 'faker',
>       'fields.name.expression' = '#{superhero.name}',
>       'fields.power.expression' = '#{superhero.power}',
>       'fields.power.null-rate' = '0.05',
>       'rows-per-second' = '1',
>       'fields.age.expression' = '#{number.numberBetween ''0'',''1000''}'
>     )
>     """
>     tEnv.executeSql(sql)
>     sql = """
>     select
>         udf_ser(name, 'a') name1,
>         udf_ser(name, 'b') name2
>     from heros
>     """
>     val rstTable = tEnv.sqlQuery(sql)
>     rstTable.printSchema()
>     rstTable.execute().print()
>   }
>   /**
>    * 修改ScalarFunction的属性,能使之序列化后是不同的对象
>    */
>   test("UdfSerializeFunc2"){
>     tEnv.createTemporarySystemFunction("udf_ser", classOf[UdfSerializeFunc2])
>     var sql = """
>     CREATE TEMPORARY TABLE heros (
>       `name` STRING,
>       `power` STRING,
>       `age` INT
>     ) WITH (
>       'connector' = 'faker',
>       'fields.name.expression' = '#{superhero.name}',
>       'fields.power.expression' = '#{superhero.power}',
>       'fields.power.null-rate' = '0.05',
>       'rows-per-second' = '1',
>       'fields.age.expression' = '#{number.numberBetween ''0'',''1000''}'
>     )
>     """
>     tEnv.executeSql(sql)
>     sql = """
>     select
>         udf_ser(name, 'a') name1,
>         udf_ser(name, 'b') name2
>     from heros
>     """
>     val rstTable = tEnv.sqlQuery(sql)
>     rstTable.printSchema()
>     rstTable.execute().print()
>   }
>   override protected def afterAll(): Unit = {
>     env.execute()
>   }
> }
> {code}
> test UdfSerializeFunc log out:
> {code:java}
> (
>   `name1` STRING,
>   `name2` STRING
> )
> 14:44:41,183  DEBUG 
> org.apache.flink.table.planner.codegen.OperatorCodeGenerator$ 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Compiling 
> OneInputStreamOperator Code:
> StreamExecCalc
> 14:44:42,819  WARN  
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
> [ScalaTest-run-running-UdfSerializeSuite] [] - No tokens obtained so skipping 
> notifications
> 14:44:43,092  INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Starting rest endpoint.
> 14:44:43,240  WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Log file environment variable 
> 'log.file' is not set.
> 14:44:43,240  WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils 
> [ScalaTest-run-running-UdfSerializeSuite] [] - JobManager log files are 
> unavailable in the web dashboard. Log file location not found in environment 
> variable 'log.file' or configuration key 'web.log.path'.
> 14:44:43,711  INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Rest endpoint listening at 
> localhost:8081
> 14:44:43,716  INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Web frontend listening at 
> http://localhost:8081.
> 14:44:43,717  INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [mini-cluster-io-thread-1] [] - http://localhost:8081 was granted leadership 
> with leaderSessionID=18ab2e30-a83a-4ec0-be98-7d49b7628565
> 14:44:43,789  WARN  
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
> [flink-pekko.actor.default-dispatcher-4] [] - No tokens obtained so skipping 
> notifications
> 14:44:43,790  WARN  
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
> [flink-pekko.actor.default-dispatcher-4] [] - Tokens update task not started 
> because either no tokens obtained or none of the tokens specified its renewal 
> date
> 14:44:44,576  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc 
> [Source: Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - open:969139468.
> 14:44:44,576  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc 
> [Source: Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - 
> open:1737783673.
> 14:44:44,607  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc 
> [Source: Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - 
> cache_null.cache:a
> 14:44:44,607  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc 
> [Source: Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - 
> cache_null.cache:a
> +----+--------------------------------+--------------------------------+
> | op |                          name1 |                          name2 |
> +----+--------------------------------+--------------------------------+
> | +I |                              a |                              a |
> | +I |                              a |                              a |
> | +I |                              a |                              a |
> | +I |                              a |                              a |
> | +I |                              a |                              a |
> | +I |                              a |                              a |
> | +I |                              a |                              a |
> | +I |                              a |                              a |
> {code}
> test UdfSerializeFunc2 log out:
> {code:java}
> (
>   `name1` STRING,
>   `name2` STRING
> )
> 14:45:18,786  DEBUG 
> org.apache.flink.table.planner.codegen.OperatorCodeGenerator$ 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Compiling 
> OneInputStreamOperator Code:
> StreamExecCalc
> 14:45:20,296  WARN  
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
> [ScalaTest-run-running-UdfSerializeSuite] [] - No tokens obtained so skipping 
> notifications
> 14:45:20,518  INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Starting rest endpoint.
> 14:45:20,635  WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Log file environment variable 
> 'log.file' is not set.
> 14:45:20,635  WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils 
> [ScalaTest-run-running-UdfSerializeSuite] [] - JobManager log files are 
> unavailable in the web dashboard. Log file location not found in environment 
> variable 'log.file' or configuration key 'web.log.path'.
> 14:45:21,032  INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Rest endpoint listening at 
> localhost:8081
> 14:45:21,034  INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Web frontend listening at 
> http://localhost:8081.
> 14:45:21,035  INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [mini-cluster-io-thread-1] [] - http://localhost:8081 was granted leadership 
> with leaderSessionID=2fcfdba0-0e36-4e8b-9f3c-88f2c242458f
> 14:45:21,089  WARN  
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
> [flink-pekko.actor.default-dispatcher-4] [] - No tokens obtained so skipping 
> notifications
> 14:45:21,089  WARN  
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
> [flink-pekko.actor.default-dispatcher-4] [] - Tokens update task not started 
> because either no tokens obtained or none of the tokens specified its renewal 
> date
> 14:45:21,741  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc2 
> [Source: Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - 
> open:1439144392.
> 14:45:21,741  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc2 
> [Source: Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - open:381953409.
> 14:45:21,742  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc2 
> [Source: Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - 
> open:1162638327.
> 14:45:21,742  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc2 
> [Source: Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - open:391248806.
> +----+--------------------------------+--------------------------------+
> | op |                          name1 |                          name2 |
> +----+--------------------------------+--------------------------------+
> | +I |                              a |                              b |
> | +I |                              a |                              b |
> | +I |                              a |                              b |
> | +I |                              a |                              b |
> {code}
>  
> *This is an issue caused by UDF function serialization.*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to