[ 
https://issues.apache.org/jira/browse/FLINK-33963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17814014#comment-17814014
 ] 

xuyang edited comment on FLINK-33963 at 2/4/24 2:34 AM:
--------------------------------------------------------

I agree that we should be more rigorous in our consideration of whether a 
scalar function should be reused.

However, in the scenario you mentioned, if the path can be extracted once and 
used permanently, it then implies that the path is a constant across all 
elements. In such a case, using two scalar functions might not be the best 
choice. That's why I just say this is not a uncommon scenario.


was (Author: xuyangzhong):
I agree that we should be more rigorous in our consideration of whether a 
scalar function should be reused.

However, in the scenario you mentioned, if the path can be extracted once and 
used permanently, it then implies that the path is a constant across all 
elements. In such a case, using a scalar function might not be the best choice. 
That's why I just say this is not a uncommon scenario.

> There is only one UDF instance after serializing the same task
> --------------------------------------------------------------
>
>                 Key: FLINK-33963
>                 URL: https://issues.apache.org/jira/browse/FLINK-33963
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.18.0
>         Environment: local env in idea test.
> java 8
>            Reporter: lifengchao
>            Priority: Major
>
> I define this UDF and expect the following SQL to return 'a', 'b', but it 
> return 'a', 'a'.
> {code:java}
> public class UdfSerializeFunc extends ScalarFunction {
>     static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class);
>     String cache;
>     @Override
>     public void open(FunctionContext context) throws Exception {
>         LOG.warn("open:{}.", this.hashCode());
>     }
>     public String eval(String a, String b){
>         if(cache == null){
>             LOG.warn("cache_null.cache:{}", b);
>             cache = b;
>         }
>         return cache;
>     }
> }
> {code}
> sql:
> {code:sql}
>     select
>         udf_ser(name, 'a') name1,
>         udf_ser(name, 'b') name2
>     from heros
> {code}
> Changing UDF to this will achieve the expected results.
> {code:java}
> public class UdfSerializeFunc2 extends ScalarFunction {
>     static final Logger LOG = 
> LoggerFactory.getLogger(UdfSerializeFunc2.class);
>     String cache;
>     @Override
>     public void open(FunctionContext context) throws Exception {
>         LOG.warn("open:{}.", this.hashCode());
>     }
>     public String eval(String a, String b){
>         if(cache == null){
>             LOG.warn("cache_null.cache:{}", b);
>             cache = b;
>         }
>         return cache;
>     }
>     @Override
>     public TypeInference getTypeInference(DataTypeFactory typeFactory) {
>         return TypeInference.newBuilder()
>                 .outputTypeStrategy(new TypeStrategy() {
>                     @Override
>                     public Optional<DataType> inferType(CallContext 
> callContext) {
>                         List<DataType> argumentDataTypes = 
> callContext.getArgumentDataTypes();
>                         if (argumentDataTypes.size() != 2) {
>                             throw callContext.newValidationError("arg size 
> error");
>                         }
>                         if (!callContext.isArgumentLiteral(1) || 
> callContext.isArgumentNull(1)) {
>                             throw callContext.newValidationError("Literal 
> expected for second argument.");
>                         }
>                         cache = callContext.getArgumentValue(1, 
> String.class).get();
>                         return Optional.of(DataTypes.STRING());
>                     }
>                 })
>         .build();
>     }
> }
> {code}
>  
> My complete test code:
> {code:java}
> public class UdfSerializeFunc extends ScalarFunction {
>     static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class);
>     String cache;
>     @Override
>     public void open(FunctionContext context) throws Exception {
>         LOG.warn("open:{}.", this.hashCode());
>     }
>     public String eval(String a, String b){
>         if(cache == null){
>             LOG.warn("cache_null.cache:{}", b);
>             cache = b;
>         }
>         return cache;
>     }
> }
> public class UdfSerializeFunc2 extends ScalarFunction {
>     static final Logger LOG = 
> LoggerFactory.getLogger(UdfSerializeFunc2.class);
>     String cache;
>     @Override
>     public void open(FunctionContext context) throws Exception {
>         LOG.warn("open:{}.", this.hashCode());
>     }
>     public String eval(String a, String b){
>         if(cache == null){
>             LOG.warn("cache_null.cache:{}", b);
>             cache = b;
>         }
>         return cache;
>     }
>     @Override
>     public TypeInference getTypeInference(DataTypeFactory typeFactory) {
>         return TypeInference.newBuilder()
>                 .outputTypeStrategy(new TypeStrategy() {
>                     @Override
>                     public Optional<DataType> inferType(CallContext 
> callContext) {
>                         List<DataType> argumentDataTypes = 
> callContext.getArgumentDataTypes();
>                         if (argumentDataTypes.size() != 2) {
>                             throw callContext.newValidationError("arg size 
> error");
>                         }
>                         if (!callContext.isArgumentLiteral(1) || 
> callContext.isArgumentNull(1)) {
>                             throw callContext.newValidationError("Literal 
> expected for second argument.");
>                         }
>                         cache = callContext.getArgumentValue(1, 
> String.class).get();
>                         return Optional.of(DataTypes.STRING());
>                     }
>                 })
>         .build();
>     }
> }
> class UdfSerializeSuite extends AnyFunSuite with BeforeAndAfterAll{
>   var env: StreamExecutionEnvironment = _
>   var tEnv: StreamTableEnvironment = _
>   override protected def beforeAll(): Unit = {
>     val conf = new Configuration()
>     env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
>     env.setParallelism(2)
>     env.getConfig.enableObjectReuse()
>     tEnv = StreamTableEnvironment.create(env)
>   }
>   /**
>    * 2个task,只是每个task有一个udf,udf_ser(name, 'a')和udf_ser(name, 'b')没区分开
>    * 它这函数的序列化,单个task的2个udf_ser序列化后还是同一个对象,不是2个
>    * getTypeInference中修改udf的属性可以实现2个不同的对象
>    */
>   test("UdfSerializeFunc"){
>     tEnv.createTemporarySystemFunction("udf_ser", classOf[UdfSerializeFunc])
>     var sql = """
>     CREATE TEMPORARY TABLE heros (
>       `name` STRING,
>       `power` STRING,
>       `age` INT
>     ) WITH (
>       'connector' = 'faker',
>       'fields.name.expression' = '#{superhero.name}',
>       'fields.power.expression' = '#{superhero.power}',
>       'fields.power.null-rate' = '0.05',
>       'rows-per-second' = '1',
>       'fields.age.expression' = '#{number.numberBetween ''0'',''1000''}'
>     )
>     """
>     tEnv.executeSql(sql)
>     sql = """
>     select
>         udf_ser(name, 'a') name1,
>         udf_ser(name, 'b') name2
>     from heros
>     """
>     val rstTable = tEnv.sqlQuery(sql)
>     rstTable.printSchema()
>     rstTable.execute().print()
>   }
>   /**
>    * 修改ScalarFunction的属性,能使之序列化后是不同的对象
>    */
>   test("UdfSerializeFunc2"){
>     tEnv.createTemporarySystemFunction("udf_ser", classOf[UdfSerializeFunc2])
>     var sql = """
>     CREATE TEMPORARY TABLE heros (
>       `name` STRING,
>       `power` STRING,
>       `age` INT
>     ) WITH (
>       'connector' = 'faker',
>       'fields.name.expression' = '#{superhero.name}',
>       'fields.power.expression' = '#{superhero.power}',
>       'fields.power.null-rate' = '0.05',
>       'rows-per-second' = '1',
>       'fields.age.expression' = '#{number.numberBetween ''0'',''1000''}'
>     )
>     """
>     tEnv.executeSql(sql)
>     sql = """
>     select
>         udf_ser(name, 'a') name1,
>         udf_ser(name, 'b') name2
>     from heros
>     """
>     val rstTable = tEnv.sqlQuery(sql)
>     rstTable.printSchema()
>     rstTable.execute().print()
>   }
>   override protected def afterAll(): Unit = {
>     env.execute()
>   }
> }
> {code}
> test UdfSerializeFunc log out:
> {code:java}
> (
>   `name1` STRING,
>   `name2` STRING
> )
> 14:44:41,183  DEBUG 
> org.apache.flink.table.planner.codegen.OperatorCodeGenerator$ 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Compiling 
> OneInputStreamOperator Code:
> StreamExecCalc
> 14:44:42,819  WARN  
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
> [ScalaTest-run-running-UdfSerializeSuite] [] - No tokens obtained so skipping 
> notifications
> 14:44:43,092  INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Starting rest endpoint.
> 14:44:43,240  WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Log file environment variable 
> 'log.file' is not set.
> 14:44:43,240  WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils 
> [ScalaTest-run-running-UdfSerializeSuite] [] - JobManager log files are 
> unavailable in the web dashboard. Log file location not found in environment 
> variable 'log.file' or configuration key 'web.log.path'.
> 14:44:43,711  INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Rest endpoint listening at 
> localhost:8081
> 14:44:43,716  INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Web frontend listening at 
> http://localhost:8081.
> 14:44:43,717  INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [mini-cluster-io-thread-1] [] - http://localhost:8081 was granted leadership 
> with leaderSessionID=18ab2e30-a83a-4ec0-be98-7d49b7628565
> 14:44:43,789  WARN  
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
> [flink-pekko.actor.default-dispatcher-4] [] - No tokens obtained so skipping 
> notifications
> 14:44:43,790  WARN  
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
> [flink-pekko.actor.default-dispatcher-4] [] - Tokens update task not started 
> because either no tokens obtained or none of the tokens specified its renewal 
> date
> 14:44:44,576  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc 
> [Source: Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - open:969139468.
> 14:44:44,576  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc 
> [Source: Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - 
> open:1737783673.
> 14:44:44,607  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc 
> [Source: Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - 
> cache_null.cache:a
> 14:44:44,607  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc 
> [Source: Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - 
> cache_null.cache:a
> +----+--------------------------------+--------------------------------+
> | op |                          name1 |                          name2 |
> +----+--------------------------------+--------------------------------+
> | +I |                              a |                              a |
> | +I |                              a |                              a |
> | +I |                              a |                              a |
> | +I |                              a |                              a |
> | +I |                              a |                              a |
> | +I |                              a |                              a |
> | +I |                              a |                              a |
> | +I |                              a |                              a |
> {code}
> test UdfSerializeFunc2 log out:
> {code:java}
> (
>   `name1` STRING,
>   `name2` STRING
> )
> 14:45:18,786  DEBUG 
> org.apache.flink.table.planner.codegen.OperatorCodeGenerator$ 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Compiling 
> OneInputStreamOperator Code:
> StreamExecCalc
> 14:45:20,296  WARN  
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
> [ScalaTest-run-running-UdfSerializeSuite] [] - No tokens obtained so skipping 
> notifications
> 14:45:20,518  INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Starting rest endpoint.
> 14:45:20,635  WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Log file environment variable 
> 'log.file' is not set.
> 14:45:20,635  WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils 
> [ScalaTest-run-running-UdfSerializeSuite] [] - JobManager log files are 
> unavailable in the web dashboard. Log file location not found in environment 
> variable 'log.file' or configuration key 'web.log.path'.
> 14:45:21,032  INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Rest endpoint listening at 
> localhost:8081
> 14:45:21,034  INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Web frontend listening at 
> http://localhost:8081.
> 14:45:21,035  INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [mini-cluster-io-thread-1] [] - http://localhost:8081 was granted leadership 
> with leaderSessionID=2fcfdba0-0e36-4e8b-9f3c-88f2c242458f
> 14:45:21,089  WARN  
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
> [flink-pekko.actor.default-dispatcher-4] [] - No tokens obtained so skipping 
> notifications
> 14:45:21,089  WARN  
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
> [flink-pekko.actor.default-dispatcher-4] [] - Tokens update task not started 
> because either no tokens obtained or none of the tokens specified its renewal 
> date
> 14:45:21,741  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc2 
> [Source: Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - 
> open:1439144392.
> 14:45:21,741  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc2 
> [Source: Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - open:381953409.
> 14:45:21,742  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc2 
> [Source: Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - 
> open:1162638327.
> 14:45:21,742  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc2 
> [Source: Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - open:391248806.
> +----+--------------------------------+--------------------------------+
> | op |                          name1 |                          name2 |
> +----+--------------------------------+--------------------------------+
> | +I |                              a |                              b |
> | +I |                              a |                              b |
> | +I |                              a |                              b |
> | +I |                              a |                              b |
> {code}
>  
> *This is an issue caused by UDF function serialization.*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to