[
https://issues.apache.org/jira/browse/FLINK-22982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Maciej Bryński updated FLINK-22982:
-----------------------------------
Description:
Hi,
I'm trying to use Python UDF with logical condition as argument.
{code:java}
log = logging.getLogger()
@udf(result_type=DataTypes.BOOLEAN())
def trace(message, condition):
if condition:
log.warn(message)
return condition
table_env.create_temporary_function('trace', trace)
table_env.execute_sql("""
CREATE TABLE datagen (
n int
) WITH (
'connector' = 'datagen',
'number-of-rows' = '10'
)
""")
result = table_env.sql_query("""
SELECT *
FROM datagen
WHERE trace(n, n < 0)
""")
for r in result.execute().collect():
print(r){code}
As a result I'm getting exception:
{code:java}
Py4JJavaError: An error occurred while calling o135.execute.
: java.lang.ClassCastException: class org.apache.calcite.rex.RexInputRef cannot
be cast to class org.apache.calcite.rex.RexCall
(org.apache.calcite.rex.RexInputRef and org.apache.calcite.rex.RexCall are in
unnamed module of loader 'app')
at
org.apache.flink.table.planner.plan.rules.logical.PythonMapMergeRule.matches(PythonMapMergeRule.java:70)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:538)
at
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
at
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
at
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
at
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
{code}
was:
Hi,
I'm trying to use Python UDF with logical condition as argument.
{code:java}
log = logging.getLogger()@udf(result_type=DataTypes.BOOLEAN())
def trace(message, condition):
if condition:
log.warn(message)
return condition
table_env.create_temporary_function('trace', trace)
table_env.execute_sql("""
CREATE TABLE datagen (
n int
) WITH (
'connector' = 'datagen',
'number-of-rows' = '10'
)
""")
result = table_env.sql_query("""
SELECT *
FROM datagen
WHERE trace(n, n < 0)
""")
for r in result.execute().collect():
print(r){code}
As a result I'm getting exception:
{code:java}
Py4JJavaError: An error occurred while calling o135.execute.
: java.lang.ClassCastException: class org.apache.calcite.rex.RexInputRef cannot
be cast to class org.apache.calcite.rex.RexCall
(org.apache.calcite.rex.RexInputRef and org.apache.calcite.rex.RexCall are in
unnamed module of loader 'app')
at
org.apache.flink.table.planner.plan.rules.logical.PythonMapMergeRule.matches(PythonMapMergeRule.java:70)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:538)
at
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
at
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
at
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
at
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
{code}
> java.lang.ClassCastException when using Python UDF
> --------------------------------------------------
>
> Key: FLINK-22982
> URL: https://issues.apache.org/jira/browse/FLINK-22982
> Project: Flink
> Issue Type: Bug
> Components: API / Python
> Affects Versions: 1.13.1
> Reporter: Maciej Bryński
> Priority: Major
>
> Hi,
> I'm trying to use Python UDF with logical condition as argument.
>
> {code:java}
> log = logging.getLogger()
> @udf(result_type=DataTypes.BOOLEAN())
> def trace(message, condition):
> if condition:
> log.warn(message)
> return condition
> table_env.create_temporary_function('trace', trace)
> table_env.execute_sql("""
> CREATE TABLE datagen (
> n int
> ) WITH (
> 'connector' = 'datagen',
> 'number-of-rows' = '10'
> )
> """)
> result = table_env.sql_query("""
> SELECT *
> FROM datagen
> WHERE trace(n, n < 0)
> """)
> for r in result.execute().collect():
> print(r){code}
>
> As a result I'm getting exception:
> {code:java}
> Py4JJavaError: An error occurred while calling o135.execute.
> : java.lang.ClassCastException: class org.apache.calcite.rex.RexInputRef
> cannot be cast to class org.apache.calcite.rex.RexCall
> (org.apache.calcite.rex.RexInputRef and org.apache.calcite.rex.RexCall are in
> unnamed module of loader 'app')
> at
> org.apache.flink.table.planner.plan.rules.logical.PythonMapMergeRule.matches(PythonMapMergeRule.java:70)
> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:538)
> at
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
> at
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
> at
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)