[ https://issues.apache.org/jira/browse/FLINK-33963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17814014#comment-17814014 ]
xuyang edited comment on FLINK-33963 at 2/4/24 2:34 AM: -------------------------------------------------------- I agree that we should be more rigorous in our consideration of whether a scalar function should be reused. However, in the scenario you mentioned, if the path can be extracted once and used permanently, it then implies that the path is a constant across all elements. In such a case, using two scalar functions might not be the best choice. That's why I just say this is not a uncommon scenario. was (Author: xuyangzhong): I agree that we should be more rigorous in our consideration of whether a scalar function should be reused. However, in the scenario you mentioned, if the path can be extracted once and used permanently, it then implies that the path is a constant across all elements. In such a case, using a scalar function might not be the best choice. That's why I just say this is not a uncommon scenario. > There is only one UDF instance after serializing the same task > -------------------------------------------------------------- > > Key: FLINK-33963 > URL: https://issues.apache.org/jira/browse/FLINK-33963 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.18.0 > Environment: local env in idea test. > java 8 > Reporter: lifengchao > Priority: Major > > I define this UDF and expect the following SQL to return 'a', 'b', but it > return 'a', 'a'. > {code:java} > public class UdfSerializeFunc extends ScalarFunction { > static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class); > String cache; > @Override > public void open(FunctionContext context) throws Exception { > LOG.warn("open:{}.", this.hashCode()); > } > public String eval(String a, String b){ > if(cache == null){ > LOG.warn("cache_null.cache:{}", b); > cache = b; > } > return cache; > } > } > {code} > sql: > {code:sql} > select > udf_ser(name, 'a') name1, > udf_ser(name, 'b') name2 > from heros > {code} > Changing UDF to this will achieve the expected results. > {code:java} > public class UdfSerializeFunc2 extends ScalarFunction { > static final Logger LOG = > LoggerFactory.getLogger(UdfSerializeFunc2.class); > String cache; > @Override > public void open(FunctionContext context) throws Exception { > LOG.warn("open:{}.", this.hashCode()); > } > public String eval(String a, String b){ > if(cache == null){ > LOG.warn("cache_null.cache:{}", b); > cache = b; > } > return cache; > } > @Override > public TypeInference getTypeInference(DataTypeFactory typeFactory) { > return TypeInference.newBuilder() > .outputTypeStrategy(new TypeStrategy() { > @Override > public Optional<DataType> inferType(CallContext > callContext) { > List<DataType> argumentDataTypes = > callContext.getArgumentDataTypes(); > if (argumentDataTypes.size() != 2) { > throw callContext.newValidationError("arg size > error"); > } > if (!callContext.isArgumentLiteral(1) || > callContext.isArgumentNull(1)) { > throw callContext.newValidationError("Literal > expected for second argument."); > } > cache = callContext.getArgumentValue(1, > String.class).get(); > return Optional.of(DataTypes.STRING()); > } > }) > .build(); > } > } > {code} > > My complete test code: > {code:java} > public class UdfSerializeFunc extends ScalarFunction { > static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class); > String cache; > @Override > public void open(FunctionContext context) throws Exception { > LOG.warn("open:{}.", this.hashCode()); > } > public String eval(String a, String b){ > if(cache == null){ > LOG.warn("cache_null.cache:{}", b); > cache = b; > } > return cache; > } > } > public class UdfSerializeFunc2 extends ScalarFunction { > static final Logger LOG = > LoggerFactory.getLogger(UdfSerializeFunc2.class); > String cache; > @Override > public void open(FunctionContext context) throws Exception { > LOG.warn("open:{}.", this.hashCode()); > } > public String eval(String a, String b){ > if(cache == null){ > LOG.warn("cache_null.cache:{}", b); > cache = b; > } > return cache; > } > @Override > public TypeInference getTypeInference(DataTypeFactory typeFactory) { > return TypeInference.newBuilder() > .outputTypeStrategy(new TypeStrategy() { > @Override > public Optional<DataType> inferType(CallContext > callContext) { > List<DataType> argumentDataTypes = > callContext.getArgumentDataTypes(); > if (argumentDataTypes.size() != 2) { > throw callContext.newValidationError("arg size > error"); > } > if (!callContext.isArgumentLiteral(1) || > callContext.isArgumentNull(1)) { > throw callContext.newValidationError("Literal > expected for second argument."); > } > cache = callContext.getArgumentValue(1, > String.class).get(); > return Optional.of(DataTypes.STRING()); > } > }) > .build(); > } > } > class UdfSerializeSuite extends AnyFunSuite with BeforeAndAfterAll{ > var env: StreamExecutionEnvironment = _ > var tEnv: StreamTableEnvironment = _ > override protected def beforeAll(): Unit = { > val conf = new Configuration() > env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf) > env.setParallelism(2) > env.getConfig.enableObjectReuse() > tEnv = StreamTableEnvironment.create(env) > } > /** > * 2个task,只是每个task有一个udf,udf_ser(name, 'a')和udf_ser(name, 'b')没区分开 > * 它这函数的序列化,单个task的2个udf_ser序列化后还是同一个对象,不是2个 > * getTypeInference中修改udf的属性可以实现2个不同的对象 > */ > test("UdfSerializeFunc"){ > tEnv.createTemporarySystemFunction("udf_ser", classOf[UdfSerializeFunc]) > var sql = """ > CREATE TEMPORARY TABLE heros ( > `name` STRING, > `power` STRING, > `age` INT > ) WITH ( > 'connector' = 'faker', > 'fields.name.expression' = '#{superhero.name}', > 'fields.power.expression' = '#{superhero.power}', > 'fields.power.null-rate' = '0.05', > 'rows-per-second' = '1', > 'fields.age.expression' = '#{number.numberBetween ''0'',''1000''}' > ) > """ > tEnv.executeSql(sql) > sql = """ > select > udf_ser(name, 'a') name1, > udf_ser(name, 'b') name2 > from heros > """ > val rstTable = tEnv.sqlQuery(sql) > rstTable.printSchema() > rstTable.execute().print() > } > /** > * 修改ScalarFunction的属性,能使之序列化后是不同的对象 > */ > test("UdfSerializeFunc2"){ > tEnv.createTemporarySystemFunction("udf_ser", classOf[UdfSerializeFunc2]) > var sql = """ > CREATE TEMPORARY TABLE heros ( > `name` STRING, > `power` STRING, > `age` INT > ) WITH ( > 'connector' = 'faker', > 'fields.name.expression' = '#{superhero.name}', > 'fields.power.expression' = '#{superhero.power}', > 'fields.power.null-rate' = '0.05', > 'rows-per-second' = '1', > 'fields.age.expression' = '#{number.numberBetween ''0'',''1000''}' > ) > """ > tEnv.executeSql(sql) > sql = """ > select > udf_ser(name, 'a') name1, > udf_ser(name, 'b') name2 > from heros > """ > val rstTable = tEnv.sqlQuery(sql) > rstTable.printSchema() > rstTable.execute().print() > } > override protected def afterAll(): Unit = { > env.execute() > } > } > {code} > test UdfSerializeFunc log out: > {code:java} > ( > `name1` STRING, > `name2` STRING > ) > 14:44:41,183 DEBUG > org.apache.flink.table.planner.codegen.OperatorCodeGenerator$ > [ScalaTest-run-running-UdfSerializeSuite] [] - Compiling > OneInputStreamOperator Code: > StreamExecCalc > 14:44:42,819 WARN > org.apache.flink.runtime.security.token.DefaultDelegationTokenManager > [ScalaTest-run-running-UdfSerializeSuite] [] - No tokens obtained so skipping > notifications > 14:44:43,092 INFO > org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint > [ScalaTest-run-running-UdfSerializeSuite] [] - Starting rest endpoint. > 14:44:43,240 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils > [ScalaTest-run-running-UdfSerializeSuite] [] - Log file environment variable > 'log.file' is not set. > 14:44:43,240 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils > [ScalaTest-run-running-UdfSerializeSuite] [] - JobManager log files are > unavailable in the web dashboard. Log file location not found in environment > variable 'log.file' or configuration key 'web.log.path'. > 14:44:43,711 INFO > org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint > [ScalaTest-run-running-UdfSerializeSuite] [] - Rest endpoint listening at > localhost:8081 > 14:44:43,716 INFO > org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint > [ScalaTest-run-running-UdfSerializeSuite] [] - Web frontend listening at > http://localhost:8081. > 14:44:43,717 INFO > org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint > [mini-cluster-io-thread-1] [] - http://localhost:8081 was granted leadership > with leaderSessionID=18ab2e30-a83a-4ec0-be98-7d49b7628565 > 14:44:43,789 WARN > org.apache.flink.runtime.security.token.DefaultDelegationTokenManager > [flink-pekko.actor.default-dispatcher-4] [] - No tokens obtained so skipping > notifications > 14:44:43,790 WARN > org.apache.flink.runtime.security.token.DefaultDelegationTokenManager > [flink-pekko.actor.default-dispatcher-4] [] - Tokens update task not started > because either no tokens obtained or none of the tokens specified its renewal > date > 14:44:44,576 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc > [Source: Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - open:969139468. > 14:44:44,576 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc > [Source: Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - > open:1737783673. > 14:44:44,607 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc > [Source: Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - > cache_null.cache:a > 14:44:44,607 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc > [Source: Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - > cache_null.cache:a > +----+--------------------------------+--------------------------------+ > | op | name1 | name2 | > +----+--------------------------------+--------------------------------+ > | +I | a | a | > | +I | a | a | > | +I | a | a | > | +I | a | a | > | +I | a | a | > | +I | a | a | > | +I | a | a | > | +I | a | a | > {code} > test UdfSerializeFunc2 log out: > {code:java} > ( > `name1` STRING, > `name2` STRING > ) > 14:45:18,786 DEBUG > org.apache.flink.table.planner.codegen.OperatorCodeGenerator$ > [ScalaTest-run-running-UdfSerializeSuite] [] - Compiling > OneInputStreamOperator Code: > StreamExecCalc > 14:45:20,296 WARN > org.apache.flink.runtime.security.token.DefaultDelegationTokenManager > [ScalaTest-run-running-UdfSerializeSuite] [] - No tokens obtained so skipping > notifications > 14:45:20,518 INFO > org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint > [ScalaTest-run-running-UdfSerializeSuite] [] - Starting rest endpoint. > 14:45:20,635 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils > [ScalaTest-run-running-UdfSerializeSuite] [] - Log file environment variable > 'log.file' is not set. > 14:45:20,635 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils > [ScalaTest-run-running-UdfSerializeSuite] [] - JobManager log files are > unavailable in the web dashboard. Log file location not found in environment > variable 'log.file' or configuration key 'web.log.path'. > 14:45:21,032 INFO > org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint > [ScalaTest-run-running-UdfSerializeSuite] [] - Rest endpoint listening at > localhost:8081 > 14:45:21,034 INFO > org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint > [ScalaTest-run-running-UdfSerializeSuite] [] - Web frontend listening at > http://localhost:8081. > 14:45:21,035 INFO > org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint > [mini-cluster-io-thread-1] [] - http://localhost:8081 was granted leadership > with leaderSessionID=2fcfdba0-0e36-4e8b-9f3c-88f2c242458f > 14:45:21,089 WARN > org.apache.flink.runtime.security.token.DefaultDelegationTokenManager > [flink-pekko.actor.default-dispatcher-4] [] - No tokens obtained so skipping > notifications > 14:45:21,089 WARN > org.apache.flink.runtime.security.token.DefaultDelegationTokenManager > [flink-pekko.actor.default-dispatcher-4] [] - Tokens update task not started > because either no tokens obtained or none of the tokens specified its renewal > date > 14:45:21,741 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc2 > [Source: Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - > open:1439144392. > 14:45:21,741 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc2 > [Source: Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - open:381953409. > 14:45:21,742 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc2 > [Source: Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - > open:1162638327. > 14:45:21,742 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc2 > [Source: Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - open:391248806. > +----+--------------------------------+--------------------------------+ > | op | name1 | name2 | > +----+--------------------------------+--------------------------------+ > | +I | a | b | > | +I | a | b | > | +I | a | b | > | +I | a | b | > {code} > > *This is an issue caused by UDF function serialization.* -- This message was sent by Atlassian Jira (v8.20.10#820010)