[
https://issues.apache.org/jira/browse/FLINK-33963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17812120#comment-17812120
]
lifengchao commented on FLINK-33963:
------------------------------------
I think the cost of function serialization and its memory usage can be ignored,
as serialization only occurs once during the task initialization phase. It
seems that UDF in Spark SQL does not have this issue.
我认为函数序列化和其内存占用的开销可以忽略,序列化只会会任务初始化阶段发生一次。spark sql 中udf似乎就没有这个问题。
> There is only one UDF instance after serializing the same task
> --------------------------------------------------------------
>
> Key: FLINK-33963
> URL: https://issues.apache.org/jira/browse/FLINK-33963
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.18.0
> Environment: local env in idea test.
> java 8
> Reporter: lifengchao
> Priority: Major
>
> I define this UDF and expect the following SQL to return 'a', 'b', but it
> return 'a', 'a'.
> {code:java}
> public class UdfSerializeFunc extends ScalarFunction {
> static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class);
> String cache;
> @Override
> public void open(FunctionContext context) throws Exception {
> LOG.warn("open:{}.", this.hashCode());
> }
> public String eval(String a, String b){
> if(cache == null){
> LOG.warn("cache_null.cache:{}", b);
> cache = b;
> }
> return cache;
> }
> }
> {code}
> sql:
> {code:sql}
> select
> udf_ser(name, 'a') name1,
> udf_ser(name, 'b') name2
> from heros
> {code}
> Changing UDF to this will achieve the expected results.
> {code:java}
> public class UdfSerializeFunc2 extends ScalarFunction {
> static final Logger LOG =
> LoggerFactory.getLogger(UdfSerializeFunc2.class);
> String cache;
> @Override
> public void open(FunctionContext context) throws Exception {
> LOG.warn("open:{}.", this.hashCode());
> }
> public String eval(String a, String b){
> if(cache == null){
> LOG.warn("cache_null.cache:{}", b);
> cache = b;
> }
> return cache;
> }
> @Override
> public TypeInference getTypeInference(DataTypeFactory typeFactory) {
> return TypeInference.newBuilder()
> .outputTypeStrategy(new TypeStrategy() {
> @Override
> public Optional<DataType> inferType(CallContext
> callContext) {
> List<DataType> argumentDataTypes =
> callContext.getArgumentDataTypes();
> if (argumentDataTypes.size() != 2) {
> throw callContext.newValidationError("arg size
> error");
> }
> if (!callContext.isArgumentLiteral(1) ||
> callContext.isArgumentNull(1)) {
> throw callContext.newValidationError("Literal
> expected for second argument.");
> }
> cache = callContext.getArgumentValue(1,
> String.class).get();
> return Optional.of(DataTypes.STRING());
> }
> })
> .build();
> }
> }
> {code}
>
> My complete test code:
> {code:java}
> public class UdfSerializeFunc extends ScalarFunction {
> static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class);
> String cache;
> @Override
> public void open(FunctionContext context) throws Exception {
> LOG.warn("open:{}.", this.hashCode());
> }
> public String eval(String a, String b){
> if(cache == null){
> LOG.warn("cache_null.cache:{}", b);
> cache = b;
> }
> return cache;
> }
> }
> public class UdfSerializeFunc2 extends ScalarFunction {
> static final Logger LOG =
> LoggerFactory.getLogger(UdfSerializeFunc2.class);
> String cache;
> @Override
> public void open(FunctionContext context) throws Exception {
> LOG.warn("open:{}.", this.hashCode());
> }
> public String eval(String a, String b){
> if(cache == null){
> LOG.warn("cache_null.cache:{}", b);
> cache = b;
> }
> return cache;
> }
> @Override
> public TypeInference getTypeInference(DataTypeFactory typeFactory) {
> return TypeInference.newBuilder()
> .outputTypeStrategy(new TypeStrategy() {
> @Override
> public Optional<DataType> inferType(CallContext
> callContext) {
> List<DataType> argumentDataTypes =
> callContext.getArgumentDataTypes();
> if (argumentDataTypes.size() != 2) {
> throw callContext.newValidationError("arg size
> error");
> }
> if (!callContext.isArgumentLiteral(1) ||
> callContext.isArgumentNull(1)) {
> throw callContext.newValidationError("Literal
> expected for second argument.");
> }
> cache = callContext.getArgumentValue(1,
> String.class).get();
> return Optional.of(DataTypes.STRING());
> }
> })
> .build();
> }
> }
> class UdfSerializeSuite extends AnyFunSuite with BeforeAndAfterAll{
> var env: StreamExecutionEnvironment = _
> var tEnv: StreamTableEnvironment = _
> override protected def beforeAll(): Unit = {
> val conf = new Configuration()
> env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
> env.setParallelism(2)
> env.getConfig.enableObjectReuse()
> tEnv = StreamTableEnvironment.create(env)
> }
> /**
> * 2个task,只是每个task有一个udf,udf_ser(name, 'a')和udf_ser(name, 'b')没区分开
> * 它这函数的序列化,单个task的2个udf_ser序列化后还是同一个对象,不是2个
> * getTypeInference中修改udf的属性可以实现2个不同的对象
> */
> test("UdfSerializeFunc"){
> tEnv.createTemporarySystemFunction("udf_ser", classOf[UdfSerializeFunc])
> var sql = """
> CREATE TEMPORARY TABLE heros (
> `name` STRING,
> `power` STRING,
> `age` INT
> ) WITH (
> 'connector' = 'faker',
> 'fields.name.expression' = '#{superhero.name}',
> 'fields.power.expression' = '#{superhero.power}',
> 'fields.power.null-rate' = '0.05',
> 'rows-per-second' = '1',
> 'fields.age.expression' = '#{number.numberBetween ''0'',''1000''}'
> )
> """
> tEnv.executeSql(sql)
> sql = """
> select
> udf_ser(name, 'a') name1,
> udf_ser(name, 'b') name2
> from heros
> """
> val rstTable = tEnv.sqlQuery(sql)
> rstTable.printSchema()
> rstTable.execute().print()
> }
> /**
> * 修改ScalarFunction的属性,能使之序列化后是不同的对象
> */
> test("UdfSerializeFunc2"){
> tEnv.createTemporarySystemFunction("udf_ser", classOf[UdfSerializeFunc2])
> var sql = """
> CREATE TEMPORARY TABLE heros (
> `name` STRING,
> `power` STRING,
> `age` INT
> ) WITH (
> 'connector' = 'faker',
> 'fields.name.expression' = '#{superhero.name}',
> 'fields.power.expression' = '#{superhero.power}',
> 'fields.power.null-rate' = '0.05',
> 'rows-per-second' = '1',
> 'fields.age.expression' = '#{number.numberBetween ''0'',''1000''}'
> )
> """
> tEnv.executeSql(sql)
> sql = """
> select
> udf_ser(name, 'a') name1,
> udf_ser(name, 'b') name2
> from heros
> """
> val rstTable = tEnv.sqlQuery(sql)
> rstTable.printSchema()
> rstTable.execute().print()
> }
> override protected def afterAll(): Unit = {
> env.execute()
> }
> }
> {code}
> test UdfSerializeFunc log out:
> {code:java}
> (
> `name1` STRING,
> `name2` STRING
> )
> 14:44:41,183 DEBUG
> org.apache.flink.table.planner.codegen.OperatorCodeGenerator$
> [ScalaTest-run-running-UdfSerializeSuite] [] - Compiling
> OneInputStreamOperator Code:
> StreamExecCalc
> 14:44:42,819 WARN
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager
> [ScalaTest-run-running-UdfSerializeSuite] [] - No tokens obtained so skipping
> notifications
> 14:44:43,092 INFO
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
> [ScalaTest-run-running-UdfSerializeSuite] [] - Starting rest endpoint.
> 14:44:43,240 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils
> [ScalaTest-run-running-UdfSerializeSuite] [] - Log file environment variable
> 'log.file' is not set.
> 14:44:43,240 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils
> [ScalaTest-run-running-UdfSerializeSuite] [] - JobManager log files are
> unavailable in the web dashboard. Log file location not found in environment
> variable 'log.file' or configuration key 'web.log.path'.
> 14:44:43,711 INFO
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
> [ScalaTest-run-running-UdfSerializeSuite] [] - Rest endpoint listening at
> localhost:8081
> 14:44:43,716 INFO
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
> [ScalaTest-run-running-UdfSerializeSuite] [] - Web frontend listening at
> http://localhost:8081.
> 14:44:43,717 INFO
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
> [mini-cluster-io-thread-1] [] - http://localhost:8081 was granted leadership
> with leaderSessionID=18ab2e30-a83a-4ec0-be98-7d49b7628565
> 14:44:43,789 WARN
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager
> [flink-pekko.actor.default-dispatcher-4] [] - No tokens obtained so skipping
> notifications
> 14:44:43,790 WARN
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager
> [flink-pekko.actor.default-dispatcher-4] [] - Tokens update task not started
> because either no tokens obtained or none of the tokens specified its renewal
> date
> 14:44:44,576 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc
> [Source: Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - open:969139468.
> 14:44:44,576 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc
> [Source: Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] -
> open:1737783673.
> 14:44:44,607 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc
> [Source: Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] -
> cache_null.cache:a
> 14:44:44,607 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc
> [Source: Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] -
> cache_null.cache:a
> +----+--------------------------------+--------------------------------+
> | op | name1 | name2 |
> +----+--------------------------------+--------------------------------+
> | +I | a | a |
> | +I | a | a |
> | +I | a | a |
> | +I | a | a |
> | +I | a | a |
> | +I | a | a |
> | +I | a | a |
> | +I | a | a |
> {code}
> test UdfSerializeFunc2 log out:
> {code:java}
> (
> `name1` STRING,
> `name2` STRING
> )
> 14:45:18,786 DEBUG
> org.apache.flink.table.planner.codegen.OperatorCodeGenerator$
> [ScalaTest-run-running-UdfSerializeSuite] [] - Compiling
> OneInputStreamOperator Code:
> StreamExecCalc
> 14:45:20,296 WARN
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager
> [ScalaTest-run-running-UdfSerializeSuite] [] - No tokens obtained so skipping
> notifications
> 14:45:20,518 INFO
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
> [ScalaTest-run-running-UdfSerializeSuite] [] - Starting rest endpoint.
> 14:45:20,635 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils
> [ScalaTest-run-running-UdfSerializeSuite] [] - Log file environment variable
> 'log.file' is not set.
> 14:45:20,635 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils
> [ScalaTest-run-running-UdfSerializeSuite] [] - JobManager log files are
> unavailable in the web dashboard. Log file location not found in environment
> variable 'log.file' or configuration key 'web.log.path'.
> 14:45:21,032 INFO
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
> [ScalaTest-run-running-UdfSerializeSuite] [] - Rest endpoint listening at
> localhost:8081
> 14:45:21,034 INFO
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
> [ScalaTest-run-running-UdfSerializeSuite] [] - Web frontend listening at
> http://localhost:8081.
> 14:45:21,035 INFO
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
> [mini-cluster-io-thread-1] [] - http://localhost:8081 was granted leadership
> with leaderSessionID=2fcfdba0-0e36-4e8b-9f3c-88f2c242458f
> 14:45:21,089 WARN
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager
> [flink-pekko.actor.default-dispatcher-4] [] - No tokens obtained so skipping
> notifications
> 14:45:21,089 WARN
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager
> [flink-pekko.actor.default-dispatcher-4] [] - Tokens update task not started
> because either no tokens obtained or none of the tokens specified its renewal
> date
> 14:45:21,741 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc2
> [Source: Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] -
> open:1439144392.
> 14:45:21,741 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc2
> [Source: Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - open:381953409.
> 14:45:21,742 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc2
> [Source: Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] -
> open:1162638327.
> 14:45:21,742 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc2
> [Source: Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - open:391248806.
> +----+--------------------------------+--------------------------------+
> | op | name1 | name2 |
> +----+--------------------------------+--------------------------------+
> | +I | a | b |
> | +I | a | b |
> | +I | a | b |
> | +I | a | b |
> {code}
>
> *This is an issue caused by UDF function serialization.*
--
This message was sent by Atlassian Jira
(v8.20.10#820010)