[ 
https://issues.apache.org/jira/browse/FLINK-33963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lifengchao updated FLINK-33963:
-------------------------------
    Description: 
I define this UDF and expect the following SQL to return 'a', 'b', but it 
return 'a', 'a'.


{code:java}
public class UdfSerializeFunc extends ScalarFunction {
    static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class);
    String cache;
    @Override
    public void open(FunctionContext context) throws Exception {
        LOG.warn("open:{}.", this.hashCode());
    }

    public String eval(String a, String b){
        if(cache == null){
            LOG.warn("cache_null.cache:{}", b);
            cache = b;
        }
        return cache;
    }
}
{code}

sql:

{code:sql}
    select
        udf_ser(name, 'a') name1,
        udf_ser(name, 'b') name2
    from heros
{code}


Changing UDF to this will achieve the expected results.

{code:java}
public class UdfSerializeFunc2 extends ScalarFunction {
    static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc2.class);
    String cache;
    @Override
    public void open(FunctionContext context) throws Exception {
        LOG.warn("open:{}.", this.hashCode());
    }

    public String eval(String a, String b){
        if(cache == null){
            LOG.warn("cache_null.cache:{}", b);
            cache = b;
        }
        return cache;
    }

    @Override
    public TypeInference getTypeInference(DataTypeFactory typeFactory) {
        return TypeInference.newBuilder()
                .outputTypeStrategy(new TypeStrategy() {
                    @Override
                    public Optional<DataType> inferType(CallContext 
callContext) {
                        List<DataType> argumentDataTypes = 
callContext.getArgumentDataTypes();
                        if (argumentDataTypes.size() != 2) {
                            throw callContext.newValidationError("arg size 
error");
                        }
                        if (!callContext.isArgumentLiteral(1) || 
callContext.isArgumentNull(1)) {
                            throw callContext.newValidationError("Literal 
expected for second argument.");
                        }
                        cache = callContext.getArgumentValue(1, 
String.class).get();
                        return Optional.of(DataTypes.STRING());
                    }
                })
        .build();
    }
}
{code}

 

My complete test code:


{code:java}
public class UdfSerializeFunc extends ScalarFunction {
    static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class);
    String cache;
    @Override
    public void open(FunctionContext context) throws Exception {
        LOG.warn("open:{}.", this.hashCode());
    }

    public String eval(String a, String b){
        if(cache == null){
            LOG.warn("cache_null.cache:{}", b);
            cache = b;
        }
        return cache;
    }
}

public class UdfSerializeFunc2 extends ScalarFunction {
    static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc2.class);
    String cache;
    @Override
    public void open(FunctionContext context) throws Exception {
        LOG.warn("open:{}.", this.hashCode());
    }

    public String eval(String a, String b){
        if(cache == null){
            LOG.warn("cache_null.cache:{}", b);
            cache = b;
        }
        return cache;
    }

    @Override
    public TypeInference getTypeInference(DataTypeFactory typeFactory) {
        return TypeInference.newBuilder()
                .outputTypeStrategy(new TypeStrategy() {
                    @Override
                    public Optional<DataType> inferType(CallContext 
callContext) {
                        List<DataType> argumentDataTypes = 
callContext.getArgumentDataTypes();
                        if (argumentDataTypes.size() != 2) {
                            throw callContext.newValidationError("arg size 
error");
                        }
                        if (!callContext.isArgumentLiteral(1) || 
callContext.isArgumentNull(1)) {
                            throw callContext.newValidationError("Literal 
expected for second argument.");
                        }
                        cache = callContext.getArgumentValue(1, 
String.class).get();
                        return Optional.of(DataTypes.STRING());
                    }
                })
        .build();
    }
}

class UdfSerializeSuite extends AnyFunSuite with BeforeAndAfterAll{
  var env: StreamExecutionEnvironment = _
  var tEnv: StreamTableEnvironment = _

  override protected def beforeAll(): Unit = {
    val conf = new Configuration()
    env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    env.setParallelism(2)
    env.getConfig.enableObjectReuse()

    tEnv = StreamTableEnvironment.create(env)
  }

  /**
   * 2个task,只是每个task有一个udf,udf_ser(name, 'a')和udf_ser(name, 'b')没区分开
   * 它这函数的序列化真傻屌,单个task的2个udf_ser序列化后还是同一个对象,不是2个
   * getTypeInference中修改udf的属性可以实现2个不同的对象
   * 都1.18了还是这样,难道被人都没遇到这个问题反馈?
   */
  test("UdfSerializeFunc"){
    tEnv.createTemporarySystemFunction("udf_ser", classOf[UdfSerializeFunc])

    var sql = """
    CREATE TEMPORARY TABLE heros (
      `name` STRING,
      `power` STRING,
      `age` INT
    ) WITH (
      'connector' = 'faker',
      'fields.name.expression' = '#{superhero.name}',
      'fields.power.expression' = '#{superhero.power}',
      'fields.power.null-rate' = '0.05',
      'rows-per-second' = '1',
      'fields.age.expression' = '#{number.numberBetween ''0'',''1000''}'
    )
    """
    tEnv.executeSql(sql)

    sql = """
    select
        udf_ser(name, 'a') name1,
        udf_ser(name, 'b') name2
    from heros
    """
    val rstTable = tEnv.sqlQuery(sql)
    rstTable.printSchema()

    rstTable.execute().print()
  }

  /**
   * 修改ScalarFunction的属性,能使之序列化后是不同的对象
   */
  test("UdfSerializeFunc2"){
    tEnv.createTemporarySystemFunction("udf_ser", classOf[UdfSerializeFunc2])

    var sql = """
    CREATE TEMPORARY TABLE heros (
      `name` STRING,
      `power` STRING,
      `age` INT
    ) WITH (
      'connector' = 'faker',
      'fields.name.expression' = '#{superhero.name}',
      'fields.power.expression' = '#{superhero.power}',
      'fields.power.null-rate' = '0.05',
      'rows-per-second' = '1',
      'fields.age.expression' = '#{number.numberBetween ''0'',''1000''}'
    )
    """
    tEnv.executeSql(sql)

    sql = """
    select
        udf_ser(name, 'a') name1,
        udf_ser(name, 'b') name2
    from heros
    """
    val rstTable = tEnv.sqlQuery(sql)
    rstTable.printSchema()

    rstTable.execute().print()
  }

  override protected def afterAll(): Unit = {
    env.execute()
  }

}
{code}


test UdfSerializeFunc log out:

{code:java}
(
  `name1` STRING,
  `name2` STRING
)
14:44:41,183  DEBUG 
org.apache.flink.table.planner.codegen.OperatorCodeGenerator$ 
[ScalaTest-run-running-UdfSerializeSuite] [] - Compiling OneInputStreamOperator 
Code:
StreamExecCalc
14:44:42,819  WARN  
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
[ScalaTest-run-running-UdfSerializeSuite] [] - No tokens obtained so skipping 
notifications
14:44:43,092  INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
[ScalaTest-run-running-UdfSerializeSuite] [] - Starting rest endpoint.
14:44:43,240  WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils 
[ScalaTest-run-running-UdfSerializeSuite] [] - Log file environment variable 
'log.file' is not set.
14:44:43,240  WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils 
[ScalaTest-run-running-UdfSerializeSuite] [] - JobManager log files are 
unavailable in the web dashboard. Log file location not found in environment 
variable 'log.file' or configuration key 'web.log.path'.
14:44:43,711  INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
[ScalaTest-run-running-UdfSerializeSuite] [] - Rest endpoint listening at 
localhost:8081
14:44:43,716  INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
[ScalaTest-run-running-UdfSerializeSuite] [] - Web frontend listening at 
http://localhost:8081.
14:44:43,717  INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
[mini-cluster-io-thread-1] [] - http://localhost:8081 was granted leadership 
with leaderSessionID=18ab2e30-a83a-4ec0-be98-7d49b7628565
14:44:43,789  WARN  
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
[flink-pekko.actor.default-dispatcher-4] [] - No tokens obtained so skipping 
notifications
14:44:43,790  WARN  
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
[flink-pekko.actor.default-dispatcher-4] [] - Tokens update task not started 
because either no tokens obtained or none of the tokens specified its renewal 
date
14:44:44,576  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc [Source: 
Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - open:969139468.
14:44:44,576  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc [Source: 
Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - open:1737783673.
14:44:44,607  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc [Source: 
Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - cache_null.cache:a
14:44:44,607  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc [Source: 
Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - cache_null.cache:a
+----+--------------------------------+--------------------------------+
| op |                          name1 |                          name2 |
+----+--------------------------------+--------------------------------+
| +I |                              a |                              a |
| +I |                              a |                              a |
| +I |                              a |                              a |
| +I |                              a |                              a |
| +I |                              a |                              a |
| +I |                              a |                              a |
| +I |                              a |                              a |
| +I |                              a |                              a |
{code}



test UdfSerializeFunc2 log out:

{code:java}
(
  `name1` STRING,
  `name2` STRING
)
14:45:18,786  DEBUG 
org.apache.flink.table.planner.codegen.OperatorCodeGenerator$ 
[ScalaTest-run-running-UdfSerializeSuite] [] - Compiling OneInputStreamOperator 
Code:
StreamExecCalc
14:45:20,296  WARN  
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
[ScalaTest-run-running-UdfSerializeSuite] [] - No tokens obtained so skipping 
notifications
14:45:20,518  INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
[ScalaTest-run-running-UdfSerializeSuite] [] - Starting rest endpoint.
14:45:20,635  WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils 
[ScalaTest-run-running-UdfSerializeSuite] [] - Log file environment variable 
'log.file' is not set.
14:45:20,635  WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils 
[ScalaTest-run-running-UdfSerializeSuite] [] - JobManager log files are 
unavailable in the web dashboard. Log file location not found in environment 
variable 'log.file' or configuration key 'web.log.path'.
14:45:21,032  INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
[ScalaTest-run-running-UdfSerializeSuite] [] - Rest endpoint listening at 
localhost:8081
14:45:21,034  INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
[ScalaTest-run-running-UdfSerializeSuite] [] - Web frontend listening at 
http://localhost:8081.
14:45:21,035  INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
[mini-cluster-io-thread-1] [] - http://localhost:8081 was granted leadership 
with leaderSessionID=2fcfdba0-0e36-4e8b-9f3c-88f2c242458f
14:45:21,089  WARN  
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
[flink-pekko.actor.default-dispatcher-4] [] - No tokens obtained so skipping 
notifications
14:45:21,089  WARN  
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
[flink-pekko.actor.default-dispatcher-4] [] - Tokens update task not started 
because either no tokens obtained or none of the tokens specified its renewal 
date
14:45:21,741  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc2 [Source: 
Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - open:1439144392.
14:45:21,741  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc2 [Source: 
Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - open:381953409.
14:45:21,742  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc2 [Source: 
Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - open:1162638327.
14:45:21,742  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc2 [Source: 
Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - open:391248806.
+----+--------------------------------+--------------------------------+
| op |                          name1 |                          name2 |
+----+--------------------------------+--------------------------------+
| +I |                              a |                              b |
| +I |                              a |                              b |
| +I |                              a |                              b |
| +I |                              a |                              b |

{code}

 

*This is an issue caused by UDF function serialization.*

  was:
I define this UDF and expect the following SQL to return 'a', 'b', but it 
return 'a', 'a'.

```java
public class UdfSerializeFunc extends ScalarFunction {
static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class);
String cache;
@Override
public void open(FunctionContext context) throws Exception {
LOG.warn("open:{}.", this.hashCode());
}

public String eval(String a, String b){
if(cache == null){
LOG.warn("cache_null.cache:{}", b);
cache = b;
}
return cache;
}
}
```

```
select
name,
udf_ser(name, 'a') name1,
udf_ser(name, 'b') name2
from heros
```

Changing UDF to this will achieve the expected results.

```java
public class UdfSerializeFunc2 extends ScalarFunction {
static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc2.class);
String cache;
@Override
public void open(FunctionContext context) throws Exception {
LOG.warn("open:{}.", this.hashCode());
}

public String eval(String a, String b){
if(cache == null){
LOG.warn("cache_null.cache:{}", b);
cache = b;
}
return cache;
}

@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
return TypeInference.newBuilder()
.outputTypeStrategy(new TypeStrategy() {
@Override
public Optional<DataType> inferType(CallContext callContext) {
List<DataType> argumentDataTypes = callContext.getArgumentDataTypes();
if (argumentDataTypes.size() != 2) {
throw callContext.newValidationError("arg size error");
}
if (!callContext.isArgumentLiteral(1) || callContext.isArgumentNull(1)) {
throw callContext.newValidationError("Literal expected for second argument.");
}
cache = callContext.getArgumentValue(1, String.class).get();
return Optional.of(DataTypes.STRING());
}
})
.build();
}
}
```

 

My complete test code:

```
public class UdfSerializeFunc extends ScalarFunction {
static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class);
String cache;
@Override
public void open(FunctionContext context) throws Exception {
LOG.warn("open:{}.", this.hashCode());
}

public String eval(String a, String b){
if(cache == null){
LOG.warn("cache_null.cache:{}", b);
cache = b;
}
return cache;
}
}
 
public class UdfSerializeFunc2 extends ScalarFunction {
static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc2.class);
String cache;
@Override
public void open(FunctionContext context) throws Exception {
LOG.warn("open:{}.", this.hashCode());
}

public String eval(String a, String b){
if(cache == null){
LOG.warn("cache_null.cache:{}", b);
cache = b;
}
return cache;
}

@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
return TypeInference.newBuilder()
.outputTypeStrategy(new TypeStrategy() {
@Override
public Optional<DataType> inferType(CallContext callContext) {
List<DataType> argumentDataTypes = callContext.getArgumentDataTypes();
if (argumentDataTypes.size() != 2) {
throw callContext.newValidationError("arg size error");
}
if (!callContext.isArgumentLiteral(1) || callContext.isArgumentNull(1)) {
throw callContext.newValidationError("Literal expected for second argument.");
}
cache = callContext.getArgumentValue(1, String.class).get();
return Optional.of(DataTypes.STRING());
}
})
.build();
}
}
 
class UdfSerializeSuite extends AnyFunSuite with BeforeAndAfterAll{
var env: StreamExecutionEnvironment = _
var tEnv: StreamTableEnvironment = _

override protected def beforeAll(): Unit = {
val conf = new Configuration()
env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
env.setParallelism(2)
env.getConfig.enableObjectReuse()

tEnv = StreamTableEnvironment.create(env)
}

/**
* 2个task,只是每个task有一个udf,udf_ser(name, 'a')和udf_ser(name, 'b')没区分开
* 它这函数的序列化真傻屌,单个task的2个udf_ser序列化后还是同一个对象,不是2个
* getTypeInference中修改udf的属性可以实现2个不同的对象
*/
test("UdfSerializeFunc"){
tEnv.createTemporarySystemFunction("udf_ser", classOf[UdfSerializeFunc])

var sql = """
CREATE TEMPORARY TABLE heros (
`name` STRING,
`power` STRING,
`age` INT
) WITH (
'connector' = 'faker',
'fields.name.expression' = '#\{superhero.name}',
'fields.power.expression' = '#\{superhero.power}',
'fields.power.null-rate' = '0.05',
'rows-per-second' = '1',
'fields.age.expression' = '#\{number.numberBetween ''0'',''1000''}'
)
"""
tEnv.executeSql(sql)

sql = """
select
udf_ser(name, 'a') name1,
udf_ser(name, 'b') name2
from heros
"""
val rstTable = tEnv.sqlQuery(sql)
rstTable.printSchema()

rstTable.execute().print()
}

/**
* 修改ScalarFunction的属性,能使之序列化后是不同的对象
*/
test("UdfSerializeFunc2"){
tEnv.createTemporarySystemFunction("udf_ser", classOf[UdfSerializeFunc2])

var sql = """
CREATE TEMPORARY TABLE heros (
`name` STRING,
`power` STRING,
`age` INT
) WITH (
'connector' = 'faker',
'fields.name.expression' = '#\{superhero.name}',
'fields.power.expression' = '#\{superhero.power}',
'fields.power.null-rate' = '0.05',
'rows-per-second' = '1',
'fields.age.expression' = '#\{number.numberBetween ''0'',''1000''}'
)
"""
tEnv.executeSql(sql)

sql = """
select
udf_ser(name, 'a') name1,
udf_ser(name, 'b') name2
from heros
"""
val rstTable = tEnv.sqlQuery(sql)
rstTable.printSchema()

rstTable.execute().print()
}

override protected def afterAll(): Unit = {
env.execute()
}

}
```

test UdfSerializeFunc log out:

```

(
  `name1` STRING,
  `name2` STRING
)
14:44:41,183  DEBUG 
org.apache.flink.table.planner.codegen.OperatorCodeGenerator$ 
[ScalaTest-run-running-UdfSerializeSuite] [] - Compiling OneInputStreamOperator 
Code:
StreamExecCalc
14:44:42,819  WARN  
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
[ScalaTest-run-running-UdfSerializeSuite] [] - No tokens obtained so skipping 
notifications
14:44:43,092  INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
[ScalaTest-run-running-UdfSerializeSuite] [] - Starting rest endpoint.
14:44:43,240  WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils 
[ScalaTest-run-running-UdfSerializeSuite] [] - Log file environment variable 
'log.file' is not set.
14:44:43,240  WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils 
[ScalaTest-run-running-UdfSerializeSuite] [] - JobManager log files are 
unavailable in the web dashboard. Log file location not found in environment 
variable 'log.file' or configuration key 'web.log.path'.
14:44:43,711  INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
[ScalaTest-run-running-UdfSerializeSuite] [] - Rest endpoint listening at 
localhost:8081
14:44:43,716  INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
[ScalaTest-run-running-UdfSerializeSuite] [] - Web frontend listening at 
http://localhost:8081.
14:44:43,717  INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
[mini-cluster-io-thread-1] [] - http://localhost:8081 was granted leadership 
with leaderSessionID=18ab2e30-a83a-4ec0-be98-7d49b7628565
14:44:43,789  WARN  
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
[flink-pekko.actor.default-dispatcher-4] [] - No tokens obtained so skipping 
notifications
14:44:43,790  WARN  
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
[flink-pekko.actor.default-dispatcher-4] [] - Tokens update task not started 
because either no tokens obtained or none of the tokens specified its renewal 
date
14:44:44,576  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc [Source: 
Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - open:969139468.
14:44:44,576  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc [Source: 
Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - open:1737783673.
14:44:44,607  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc [Source: 
Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - cache_null.cache:a
14:44:44,607  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc [Source: 
Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - cache_null.cache:a
+----+--------------------------------+--------------------------------+
| op |                          name1 |                          name2 |
+----+--------------------------------+--------------------------------+
| +I |                              a |                              a |
| +I |                              a |                              a |
| +I |                              a |                              a |
| +I |                              a |                              a |
| +I |                              a |                              a |
| +I |                              a |                              a |
| +I |                              a |                              a |
| +I |                              a |                              a |

```

test UdfSerializeFunc2 log out:

```

(
  `name1` STRING,
  `name2` STRING
)
14:45:18,786  DEBUG 
org.apache.flink.table.planner.codegen.OperatorCodeGenerator$ 
[ScalaTest-run-running-UdfSerializeSuite] [] - Compiling OneInputStreamOperator 
Code:
StreamExecCalc
14:45:20,296  WARN  
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
[ScalaTest-run-running-UdfSerializeSuite] [] - No tokens obtained so skipping 
notifications
14:45:20,518  INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
[ScalaTest-run-running-UdfSerializeSuite] [] - Starting rest endpoint.
14:45:20,635  WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils 
[ScalaTest-run-running-UdfSerializeSuite] [] - Log file environment variable 
'log.file' is not set.
14:45:20,635  WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils 
[ScalaTest-run-running-UdfSerializeSuite] [] - JobManager log files are 
unavailable in the web dashboard. Log file location not found in environment 
variable 'log.file' or configuration key 'web.log.path'.
14:45:21,032  INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
[ScalaTest-run-running-UdfSerializeSuite] [] - Rest endpoint listening at 
localhost:8081
14:45:21,034  INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
[ScalaTest-run-running-UdfSerializeSuite] [] - Web frontend listening at 
http://localhost:8081.
14:45:21,035  INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
[mini-cluster-io-thread-1] [] - http://localhost:8081 was granted leadership 
with leaderSessionID=2fcfdba0-0e36-4e8b-9f3c-88f2c242458f
14:45:21,089  WARN  
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
[flink-pekko.actor.default-dispatcher-4] [] - No tokens obtained so skipping 
notifications
14:45:21,089  WARN  
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
[flink-pekko.actor.default-dispatcher-4] [] - Tokens update task not started 
because either no tokens obtained or none of the tokens specified its renewal 
date
14:45:21,741  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc2 [Source: 
Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - open:1439144392.
14:45:21,741  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc2 [Source: 
Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - open:381953409.
14:45:21,742  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc2 [Source: 
Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - open:1162638327.
14:45:21,742  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc2 [Source: 
Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - open:391248806.
+----+--------------------------------+--------------------------------+
| op |                          name1 |                          name2 |
+----+--------------------------------+--------------------------------+
| +I |                              a |                              b |
| +I |                              a |                              b |
| +I |                              a |                              b |
| +I |                              a |                              b |

```

 

*This is an issue caused by UDF function serialization.*


> There is only one UDF instance after serializing the same task
> --------------------------------------------------------------
>
>                 Key: FLINK-33963
>                 URL: https://issues.apache.org/jira/browse/FLINK-33963
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.18.0
>         Environment: local env in idea test.
> java 8
>            Reporter: lifengchao
>            Priority: Major
>             Fix For: 1.18.0
>
>
> I define this UDF and expect the following SQL to return 'a', 'b', but it 
> return 'a', 'a'.
> {code:java}
> public class UdfSerializeFunc extends ScalarFunction {
>     static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class);
>     String cache;
>     @Override
>     public void open(FunctionContext context) throws Exception {
>         LOG.warn("open:{}.", this.hashCode());
>     }
>     public String eval(String a, String b){
>         if(cache == null){
>             LOG.warn("cache_null.cache:{}", b);
>             cache = b;
>         }
>         return cache;
>     }
> }
> {code}
> sql:
> {code:sql}
>     select
>         udf_ser(name, 'a') name1,
>         udf_ser(name, 'b') name2
>     from heros
> {code}
> Changing UDF to this will achieve the expected results.
> {code:java}
> public class UdfSerializeFunc2 extends ScalarFunction {
>     static final Logger LOG = 
> LoggerFactory.getLogger(UdfSerializeFunc2.class);
>     String cache;
>     @Override
>     public void open(FunctionContext context) throws Exception {
>         LOG.warn("open:{}.", this.hashCode());
>     }
>     public String eval(String a, String b){
>         if(cache == null){
>             LOG.warn("cache_null.cache:{}", b);
>             cache = b;
>         }
>         return cache;
>     }
>     @Override
>     public TypeInference getTypeInference(DataTypeFactory typeFactory) {
>         return TypeInference.newBuilder()
>                 .outputTypeStrategy(new TypeStrategy() {
>                     @Override
>                     public Optional<DataType> inferType(CallContext 
> callContext) {
>                         List<DataType> argumentDataTypes = 
> callContext.getArgumentDataTypes();
>                         if (argumentDataTypes.size() != 2) {
>                             throw callContext.newValidationError("arg size 
> error");
>                         }
>                         if (!callContext.isArgumentLiteral(1) || 
> callContext.isArgumentNull(1)) {
>                             throw callContext.newValidationError("Literal 
> expected for second argument.");
>                         }
>                         cache = callContext.getArgumentValue(1, 
> String.class).get();
>                         return Optional.of(DataTypes.STRING());
>                     }
>                 })
>         .build();
>     }
> }
> {code}
>  
> My complete test code:
> {code:java}
> public class UdfSerializeFunc extends ScalarFunction {
>     static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class);
>     String cache;
>     @Override
>     public void open(FunctionContext context) throws Exception {
>         LOG.warn("open:{}.", this.hashCode());
>     }
>     public String eval(String a, String b){
>         if(cache == null){
>             LOG.warn("cache_null.cache:{}", b);
>             cache = b;
>         }
>         return cache;
>     }
> }
> public class UdfSerializeFunc2 extends ScalarFunction {
>     static final Logger LOG = 
> LoggerFactory.getLogger(UdfSerializeFunc2.class);
>     String cache;
>     @Override
>     public void open(FunctionContext context) throws Exception {
>         LOG.warn("open:{}.", this.hashCode());
>     }
>     public String eval(String a, String b){
>         if(cache == null){
>             LOG.warn("cache_null.cache:{}", b);
>             cache = b;
>         }
>         return cache;
>     }
>     @Override
>     public TypeInference getTypeInference(DataTypeFactory typeFactory) {
>         return TypeInference.newBuilder()
>                 .outputTypeStrategy(new TypeStrategy() {
>                     @Override
>                     public Optional<DataType> inferType(CallContext 
> callContext) {
>                         List<DataType> argumentDataTypes = 
> callContext.getArgumentDataTypes();
>                         if (argumentDataTypes.size() != 2) {
>                             throw callContext.newValidationError("arg size 
> error");
>                         }
>                         if (!callContext.isArgumentLiteral(1) || 
> callContext.isArgumentNull(1)) {
>                             throw callContext.newValidationError("Literal 
> expected for second argument.");
>                         }
>                         cache = callContext.getArgumentValue(1, 
> String.class).get();
>                         return Optional.of(DataTypes.STRING());
>                     }
>                 })
>         .build();
>     }
> }
> class UdfSerializeSuite extends AnyFunSuite with BeforeAndAfterAll{
>   var env: StreamExecutionEnvironment = _
>   var tEnv: StreamTableEnvironment = _
>   override protected def beforeAll(): Unit = {
>     val conf = new Configuration()
>     env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
>     env.setParallelism(2)
>     env.getConfig.enableObjectReuse()
>     tEnv = StreamTableEnvironment.create(env)
>   }
>   /**
>    * 2个task,只是每个task有一个udf,udf_ser(name, 'a')和udf_ser(name, 'b')没区分开
>    * 它这函数的序列化真傻屌,单个task的2个udf_ser序列化后还是同一个对象,不是2个
>    * getTypeInference中修改udf的属性可以实现2个不同的对象
>    * 都1.18了还是这样,难道被人都没遇到这个问题反馈?
>    */
>   test("UdfSerializeFunc"){
>     tEnv.createTemporarySystemFunction("udf_ser", classOf[UdfSerializeFunc])
>     var sql = """
>     CREATE TEMPORARY TABLE heros (
>       `name` STRING,
>       `power` STRING,
>       `age` INT
>     ) WITH (
>       'connector' = 'faker',
>       'fields.name.expression' = '#{superhero.name}',
>       'fields.power.expression' = '#{superhero.power}',
>       'fields.power.null-rate' = '0.05',
>       'rows-per-second' = '1',
>       'fields.age.expression' = '#{number.numberBetween ''0'',''1000''}'
>     )
>     """
>     tEnv.executeSql(sql)
>     sql = """
>     select
>         udf_ser(name, 'a') name1,
>         udf_ser(name, 'b') name2
>     from heros
>     """
>     val rstTable = tEnv.sqlQuery(sql)
>     rstTable.printSchema()
>     rstTable.execute().print()
>   }
>   /**
>    * 修改ScalarFunction的属性,能使之序列化后是不同的对象
>    */
>   test("UdfSerializeFunc2"){
>     tEnv.createTemporarySystemFunction("udf_ser", classOf[UdfSerializeFunc2])
>     var sql = """
>     CREATE TEMPORARY TABLE heros (
>       `name` STRING,
>       `power` STRING,
>       `age` INT
>     ) WITH (
>       'connector' = 'faker',
>       'fields.name.expression' = '#{superhero.name}',
>       'fields.power.expression' = '#{superhero.power}',
>       'fields.power.null-rate' = '0.05',
>       'rows-per-second' = '1',
>       'fields.age.expression' = '#{number.numberBetween ''0'',''1000''}'
>     )
>     """
>     tEnv.executeSql(sql)
>     sql = """
>     select
>         udf_ser(name, 'a') name1,
>         udf_ser(name, 'b') name2
>     from heros
>     """
>     val rstTable = tEnv.sqlQuery(sql)
>     rstTable.printSchema()
>     rstTable.execute().print()
>   }
>   override protected def afterAll(): Unit = {
>     env.execute()
>   }
> }
> {code}
> test UdfSerializeFunc log out:
> {code:java}
> (
>   `name1` STRING,
>   `name2` STRING
> )
> 14:44:41,183  DEBUG 
> org.apache.flink.table.planner.codegen.OperatorCodeGenerator$ 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Compiling 
> OneInputStreamOperator Code:
> StreamExecCalc
> 14:44:42,819  WARN  
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
> [ScalaTest-run-running-UdfSerializeSuite] [] - No tokens obtained so skipping 
> notifications
> 14:44:43,092  INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Starting rest endpoint.
> 14:44:43,240  WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Log file environment variable 
> 'log.file' is not set.
> 14:44:43,240  WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils 
> [ScalaTest-run-running-UdfSerializeSuite] [] - JobManager log files are 
> unavailable in the web dashboard. Log file location not found in environment 
> variable 'log.file' or configuration key 'web.log.path'.
> 14:44:43,711  INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Rest endpoint listening at 
> localhost:8081
> 14:44:43,716  INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Web frontend listening at 
> http://localhost:8081.
> 14:44:43,717  INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [mini-cluster-io-thread-1] [] - http://localhost:8081 was granted leadership 
> with leaderSessionID=18ab2e30-a83a-4ec0-be98-7d49b7628565
> 14:44:43,789  WARN  
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
> [flink-pekko.actor.default-dispatcher-4] [] - No tokens obtained so skipping 
> notifications
> 14:44:43,790  WARN  
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
> [flink-pekko.actor.default-dispatcher-4] [] - Tokens update task not started 
> because either no tokens obtained or none of the tokens specified its renewal 
> date
> 14:44:44,576  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc 
> [Source: Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - open:969139468.
> 14:44:44,576  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc 
> [Source: Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - 
> open:1737783673.
> 14:44:44,607  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc 
> [Source: Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - 
> cache_null.cache:a
> 14:44:44,607  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc 
> [Source: Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - 
> cache_null.cache:a
> +----+--------------------------------+--------------------------------+
> | op |                          name1 |                          name2 |
> +----+--------------------------------+--------------------------------+
> | +I |                              a |                              a |
> | +I |                              a |                              a |
> | +I |                              a |                              a |
> | +I |                              a |                              a |
> | +I |                              a |                              a |
> | +I |                              a |                              a |
> | +I |                              a |                              a |
> | +I |                              a |                              a |
> {code}
> test UdfSerializeFunc2 log out:
> {code:java}
> (
>   `name1` STRING,
>   `name2` STRING
> )
> 14:45:18,786  DEBUG 
> org.apache.flink.table.planner.codegen.OperatorCodeGenerator$ 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Compiling 
> OneInputStreamOperator Code:
> StreamExecCalc
> 14:45:20,296  WARN  
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
> [ScalaTest-run-running-UdfSerializeSuite] [] - No tokens obtained so skipping 
> notifications
> 14:45:20,518  INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Starting rest endpoint.
> 14:45:20,635  WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Log file environment variable 
> 'log.file' is not set.
> 14:45:20,635  WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils 
> [ScalaTest-run-running-UdfSerializeSuite] [] - JobManager log files are 
> unavailable in the web dashboard. Log file location not found in environment 
> variable 'log.file' or configuration key 'web.log.path'.
> 14:45:21,032  INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Rest endpoint listening at 
> localhost:8081
> 14:45:21,034  INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [ScalaTest-run-running-UdfSerializeSuite] [] - Web frontend listening at 
> http://localhost:8081.
> 14:45:21,035  INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [mini-cluster-io-thread-1] [] - http://localhost:8081 was granted leadership 
> with leaderSessionID=2fcfdba0-0e36-4e8b-9f3c-88f2c242458f
> 14:45:21,089  WARN  
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
> [flink-pekko.actor.default-dispatcher-4] [] - No tokens obtained so skipping 
> notifications
> 14:45:21,089  WARN  
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
> [flink-pekko.actor.default-dispatcher-4] [] - Tokens update task not started 
> because either no tokens obtained or none of the tokens specified its renewal 
> date
> 14:45:21,741  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc2 
> [Source: Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - 
> open:1439144392.
> 14:45:21,741  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc2 
> [Source: Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - open:381953409.
> 14:45:21,742  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc2 
> [Source: Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - 
> open:1162638327.
> 14:45:21,742  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc2 
> [Source: Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - open:391248806.
> +----+--------------------------------+--------------------------------+
> | op |                          name1 |                          name2 |
> +----+--------------------------------+--------------------------------+
> | +I |                              a |                              b |
> | +I |                              a |                              b |
> | +I |                              a |                              b |
> | +I |                              a |                              b |
> {code}
>  
> *This is an issue caused by UDF function serialization.*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to