[
https://issues.apache.org/jira/browse/FLINK-33963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17814014#comment-17814014
]
xuyang commented on FLINK-33963:
--------------------------------
I agree that we should be more rigorous in our consideration of whether a
scalar function should be reused.
However, in the scenario you mentioned, if the path can be extracted once and
used permanently, it then implies that the path is a constant across all
elements. In such a case, using a scalar function might not be the best choice.
That's why I just say this is not a uncommon scenario.
> There is only one UDF instance after serializing the same task
> --------------------------------------------------------------
>
> Key: FLINK-33963
> URL: https://issues.apache.org/jira/browse/FLINK-33963
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.18.0
> Environment: local env in idea test.
> java 8
> Reporter: lifengchao
> Priority: Major
>
> I define this UDF and expect the following SQL to return 'a', 'b', but it
> return 'a', 'a'.
> {code:java}
> public class UdfSerializeFunc extends ScalarFunction {
> static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class);
> String cache;
> @Override
> public void open(FunctionContext context) throws Exception {
> LOG.warn("open:{}.", this.hashCode());
> }
> public String eval(String a, String b){
> if(cache == null){
> LOG.warn("cache_null.cache:{}", b);
> cache = b;
> }
> return cache;
> }
> }
> {code}
> sql:
> {code:sql}
> select
> udf_ser(name, 'a') name1,
> udf_ser(name, 'b') name2
> from heros
> {code}
> Changing UDF to this will achieve the expected results.
> {code:java}
> public class UdfSerializeFunc2 extends ScalarFunction {
> static final Logger LOG =
> LoggerFactory.getLogger(UdfSerializeFunc2.class);
> String cache;
> @Override
> public void open(FunctionContext context) throws Exception {
> LOG.warn("open:{}.", this.hashCode());
> }
> public String eval(String a, String b){
> if(cache == null){
> LOG.warn("cache_null.cache:{}", b);
> cache = b;
> }
> return cache;
> }
> @Override
> public TypeInference getTypeInference(DataTypeFactory typeFactory) {
> return TypeInference.newBuilder()
> .outputTypeStrategy(new TypeStrategy() {
> @Override
> public Optional<DataType> inferType(CallContext
> callContext) {
> List<DataType> argumentDataTypes =
> callContext.getArgumentDataTypes();
> if (argumentDataTypes.size() != 2) {
> throw callContext.newValidationError("arg size
> error");
> }
> if (!callContext.isArgumentLiteral(1) ||
> callContext.isArgumentNull(1)) {
> throw callContext.newValidationError("Literal
> expected for second argument.");
> }
> cache = callContext.getArgumentValue(1,
> String.class).get();
> return Optional.of(DataTypes.STRING());
> }
> })
> .build();
> }
> }
> {code}
>
> My complete test code:
> {code:java}
> public class UdfSerializeFunc extends ScalarFunction {
> static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class);
> String cache;
> @Override
> public void open(FunctionContext context) throws Exception {
> LOG.warn("open:{}.", this.hashCode());
> }
> public String eval(String a, String b){
> if(cache == null){
> LOG.warn("cache_null.cache:{}", b);
> cache = b;
> }
> return cache;
> }
> }
> public class UdfSerializeFunc2 extends ScalarFunction {
> static final Logger LOG =
> LoggerFactory.getLogger(UdfSerializeFunc2.class);
> String cache;
> @Override
> public void open(FunctionContext context) throws Exception {
> LOG.warn("open:{}.", this.hashCode());
> }
> public String eval(String a, String b){
> if(cache == null){
> LOG.warn("cache_null.cache:{}", b);
> cache = b;
> }
> return cache;
> }
> @Override
> public TypeInference getTypeInference(DataTypeFactory typeFactory) {
> return TypeInference.newBuilder()
> .outputTypeStrategy(new TypeStrategy() {
> @Override
> public Optional<DataType> inferType(CallContext
> callContext) {
> List<DataType> argumentDataTypes =
> callContext.getArgumentDataTypes();
> if (argumentDataTypes.size() != 2) {
> throw callContext.newValidationError("arg size
> error");
> }
> if (!callContext.isArgumentLiteral(1) ||
> callContext.isArgumentNull(1)) {
> throw callContext.newValidationError("Literal
> expected for second argument.");
> }
> cache = callContext.getArgumentValue(1,
> String.class).get();
> return Optional.of(DataTypes.STRING());
> }
> })
> .build();
> }
> }
> class UdfSerializeSuite extends AnyFunSuite with BeforeAndAfterAll{
> var env: StreamExecutionEnvironment = _
> var tEnv: StreamTableEnvironment = _
> override protected def beforeAll(): Unit = {
> val conf = new Configuration()
> env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
> env.setParallelism(2)
> env.getConfig.enableObjectReuse()
> tEnv = StreamTableEnvironment.create(env)
> }
> /**
> * 2个task,只是每个task有一个udf,udf_ser(name, 'a')和udf_ser(name, 'b')没区分开
> * 它这函数的序列化,单个task的2个udf_ser序列化后还是同一个对象,不是2个
> * getTypeInference中修改udf的属性可以实现2个不同的对象
> */
> test("UdfSerializeFunc"){
> tEnv.createTemporarySystemFunction("udf_ser", classOf[UdfSerializeFunc])
> var sql = """
> CREATE TEMPORARY TABLE heros (
> `name` STRING,
> `power` STRING,
> `age` INT
> ) WITH (
> 'connector' = 'faker',
> 'fields.name.expression' = '#{superhero.name}',
> 'fields.power.expression' = '#{superhero.power}',
> 'fields.power.null-rate' = '0.05',
> 'rows-per-second' = '1',
> 'fields.age.expression' = '#{number.numberBetween ''0'',''1000''}'
> )
> """
> tEnv.executeSql(sql)
> sql = """
> select
> udf_ser(name, 'a') name1,
> udf_ser(name, 'b') name2
> from heros
> """
> val rstTable = tEnv.sqlQuery(sql)
> rstTable.printSchema()
> rstTable.execute().print()
> }
> /**
> * 修改ScalarFunction的属性,能使之序列化后是不同的对象
> */
> test("UdfSerializeFunc2"){
> tEnv.createTemporarySystemFunction("udf_ser", classOf[UdfSerializeFunc2])
> var sql = """
> CREATE TEMPORARY TABLE heros (
> `name` STRING,
> `power` STRING,
> `age` INT
> ) WITH (
> 'connector' = 'faker',
> 'fields.name.expression' = '#{superhero.name}',
> 'fields.power.expression' = '#{superhero.power}',
> 'fields.power.null-rate' = '0.05',
> 'rows-per-second' = '1',
> 'fields.age.expression' = '#{number.numberBetween ''0'',''1000''}'
> )
> """
> tEnv.executeSql(sql)
> sql = """
> select
> udf_ser(name, 'a') name1,
> udf_ser(name, 'b') name2
> from heros
> """
> val rstTable = tEnv.sqlQuery(sql)
> rstTable.printSchema()
> rstTable.execute().print()
> }
> override protected def afterAll(): Unit = {
> env.execute()
> }
> }
> {code}
> test UdfSerializeFunc log out:
> {code:java}
> (
> `name1` STRING,
> `name2` STRING
> )
> 14:44:41,183 DEBUG
> org.apache.flink.table.planner.codegen.OperatorCodeGenerator$
> [ScalaTest-run-running-UdfSerializeSuite] [] - Compiling
> OneInputStreamOperator Code:
> StreamExecCalc
> 14:44:42,819 WARN
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager
> [ScalaTest-run-running-UdfSerializeSuite] [] - No tokens obtained so skipping
> notifications
> 14:44:43,092 INFO
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
> [ScalaTest-run-running-UdfSerializeSuite] [] - Starting rest endpoint.
> 14:44:43,240 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils
> [ScalaTest-run-running-UdfSerializeSuite] [] - Log file environment variable
> 'log.file' is not set.
> 14:44:43,240 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils
> [ScalaTest-run-running-UdfSerializeSuite] [] - JobManager log files are
> unavailable in the web dashboard. Log file location not found in environment
> variable 'log.file' or configuration key 'web.log.path'.
> 14:44:43,711 INFO
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
> [ScalaTest-run-running-UdfSerializeSuite] [] - Rest endpoint listening at
> localhost:8081
> 14:44:43,716 INFO
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
> [ScalaTest-run-running-UdfSerializeSuite] [] - Web frontend listening at
> http://localhost:8081.
> 14:44:43,717 INFO
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
> [mini-cluster-io-thread-1] [] - http://localhost:8081 was granted leadership
> with leaderSessionID=18ab2e30-a83a-4ec0-be98-7d49b7628565
> 14:44:43,789 WARN
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager
> [flink-pekko.actor.default-dispatcher-4] [] - No tokens obtained so skipping
> notifications
> 14:44:43,790 WARN
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager
> [flink-pekko.actor.default-dispatcher-4] [] - Tokens update task not started
> because either no tokens obtained or none of the tokens specified its renewal
> date
> 14:44:44,576 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc
> [Source: Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - open:969139468.
> 14:44:44,576 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc
> [Source: Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] -
> open:1737783673.
> 14:44:44,607 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc
> [Source: Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] -
> cache_null.cache:a
> 14:44:44,607 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc
> [Source: Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] -
> cache_null.cache:a
> +----+--------------------------------+--------------------------------+
> | op | name1 | name2 |
> +----+--------------------------------+--------------------------------+
> | +I | a | a |
> | +I | a | a |
> | +I | a | a |
> | +I | a | a |
> | +I | a | a |
> | +I | a | a |
> | +I | a | a |
> | +I | a | a |
> {code}
> test UdfSerializeFunc2 log out:
> {code:java}
> (
> `name1` STRING,
> `name2` STRING
> )
> 14:45:18,786 DEBUG
> org.apache.flink.table.planner.codegen.OperatorCodeGenerator$
> [ScalaTest-run-running-UdfSerializeSuite] [] - Compiling
> OneInputStreamOperator Code:
> StreamExecCalc
> 14:45:20,296 WARN
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager
> [ScalaTest-run-running-UdfSerializeSuite] [] - No tokens obtained so skipping
> notifications
> 14:45:20,518 INFO
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
> [ScalaTest-run-running-UdfSerializeSuite] [] - Starting rest endpoint.
> 14:45:20,635 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils
> [ScalaTest-run-running-UdfSerializeSuite] [] - Log file environment variable
> 'log.file' is not set.
> 14:45:20,635 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils
> [ScalaTest-run-running-UdfSerializeSuite] [] - JobManager log files are
> unavailable in the web dashboard. Log file location not found in environment
> variable 'log.file' or configuration key 'web.log.path'.
> 14:45:21,032 INFO
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
> [ScalaTest-run-running-UdfSerializeSuite] [] - Rest endpoint listening at
> localhost:8081
> 14:45:21,034 INFO
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
> [ScalaTest-run-running-UdfSerializeSuite] [] - Web frontend listening at
> http://localhost:8081.
> 14:45:21,035 INFO
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
> [mini-cluster-io-thread-1] [] - http://localhost:8081 was granted leadership
> with leaderSessionID=2fcfdba0-0e36-4e8b-9f3c-88f2c242458f
> 14:45:21,089 WARN
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager
> [flink-pekko.actor.default-dispatcher-4] [] - No tokens obtained so skipping
> notifications
> 14:45:21,089 WARN
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager
> [flink-pekko.actor.default-dispatcher-4] [] - Tokens update task not started
> because either no tokens obtained or none of the tokens specified its renewal
> date
> 14:45:21,741 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc2
> [Source: Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] -
> open:1439144392.
> 14:45:21,741 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc2
> [Source: Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - open:381953409.
> 14:45:21,742 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc2
> [Source: Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] -
> open:1162638327.
> 14:45:21,742 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc2
> [Source: Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - open:391248806.
> +----+--------------------------------+--------------------------------+
> | op | name1 | name2 |
> +----+--------------------------------+--------------------------------+
> | +I | a | b |
> | +I | a | b |
> | +I | a | b |
> | +I | a | b |
> {code}
>
> *This is an issue caused by UDF function serialization.*
--
This message was sent by Atlassian Jira
(v8.20.10#820010)