[ 
https://issues.apache.org/jira/browse/FLINK-27598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingbo Huang reassigned FLINK-27598:
------------------------------------

    Assignee: Xingbo Huang

> Improve the exception message when mixing use Python UDF and Pandas UDF
> -----------------------------------------------------------------------
>
>                 Key: FLINK-27598
>                 URL: https://issues.apache.org/jira/browse/FLINK-27598
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>            Reporter: Dian Fu
>            Assignee: Xingbo Huang
>            Priority: Major
>
> For the following job:
> {code}
> import argparse
> from decimal import Decimal
> from pyflink.common import Row
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, DataTypes
> from pyflink.table.udf import AggregateFunction, udaf
> class DeduplicatedSum(AggregateFunction):
> def create_accumulator(self):
> return \{int(0), float(0)}
> def get_value(self, accumulator) -> float:
> sum(accumulator.values())
> def accumulate(self, accumulator, k: int, v: float):
> if k not in accumulator:
> accumulator[k] = v
> def retract(self, accumulator, k: int, v: float):
> if k in accumulator:
> del accumulator[k]
> deduplicated_sum = udaf(f=DeduplicatedSum(),
> func_type="pandas",
> result_type=DataTypes.DOUBLE(),
> input_types=[DataTypes.BIGINT(), DataTypes.DOUBLE()])
> class FirstValue(AggregateFunction):
> def create_accumulator(self):
> return [int(-1), float(0)]
> def get_value(self, accumulator) -> float:
> return accumulator[1]
> def accumulate(self, accumulator, k: int, v: float):
> ck = accumulator[0]
> if ck > k:
> accumulator[0] = k
> accumulator[1] = v
> first_value = udaf(f=FirstValue(),
> result_type=DataTypes.DOUBLE(),
> func_type="pandas",
> input_types=[DataTypes.BIGINT(), DataTypes.DOUBLE()])
> class LastValue(AggregateFunction):
> def create_accumulator(self):
> return [int(-1), float(0)]
> def get_value(self, accumulator: Row) -> float:
> return accumulator[1]
> def accumulate(self, accumulator: Row, k: int, v: float):
> ck = accumulator[0]
> if ck < k:
> accumulator[0] = k
> accumulator[1] = v
> last_value = udaf(f=LastValue(),
> func_type="pandas",
> result_type=DataTypes.DOUBLE(),
> input_types=[DataTypes.BIGINT(), DataTypes.DOUBLE()])
> def create_source_table_trades(table_env):
> source = f"""
> CREATE TABLE src_trade (
> `id` VARCHAR
> ,`timestamp` BIGINT
> ,`side` VARCHAR
> ,`price` DOUBLE
> ,`size` DOUBLE
> ,`uniqueId` BIGINT
> ,ts_micro AS `timestamp`
> ,ts_milli AS `timestamp` / 1000
> ,ts AS TO_TIMESTAMP_LTZ(`timestamp` / 1000, 3)
> ,WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND
> ) WITH (
> 'connector' = 'datagen')
> """
> table_env.execute_sql(source)
> def create_sink_table(table_env):
> sink = f"""
> CREATE TABLE dst_kline (
> wst TIMESTAMP_LTZ(3)
> ,wet TIMESTAMP_LTZ(3)
> ,otm BIGINT
> ,ot TIMESTAMP_LTZ(3)
> ,ctm BIGINT
> ,ct TIMESTAMP_LTZ(3)
> ,ptm BIGINT
> ,pt TIMESTAMP_LTZ(3)
> ,`open` DOUBLE
> ,`close` DOUBLE
> ,`high` DOUBLE
> ,`low` DOUBLE
> ,`vol` DOUBLE -- total trade volume
> ,`to` DOUBLE -- total turnover value
> ,`rev` INT -- revision, something we might use for versioning
> ,`gap` INT -- if this value is reliable
> ,PRIMARY KEY(wst) NOT ENFORCED
> ) WITH (
> 'connector' = 'print'
> )
> """
> table_env.execute_sql(sink)
> def kafka_src_topic(value):
> if not len(value.split('-')) == 5:
> raise argparse.ArgumentTypeError("{} is not a valid kafka 
> topic".format(value))
> return value
> def interval(value):
> i = []
> prev_num = []
> for character in value:
> if character.isalpha():
> if prev_num:
> num = Decimal(''.join(prev_num))
> if character == 'd':
> i.append(f"'\{num}' DAYS")
> elif character == 'h':
> i.append(f"'\{num}' HOURS")
> elif character == 'm':
> i.append(f"'\{num}' MINUTES")
> elif character == 's':
> i.append(f"'\{num}' SECONDS")
> prev_num = []
> elif character.isnumeric() or character == '.':
> prev_num.append(character)
> return " ".join(i)
> def fetch_arguments_flink_kline():
> import argparse
> parser = argparse.ArgumentParser()
> parser.add_argument('--bootstrap-servers', type=str, required=True)
> parser.add_argument('--src-topic', type=kafka_src_topic)
> parser.add_argument('--consume-mode', type=str, default='group-offsets',
> choices=['group-offsets', 'latest-offset'],
> help='scan.startup.mode for kafka')
> parser.add_argument('--interval', type=str, default='20s',
> help='output interval e.g. 5d4h3m1s, default to 20s')
> parser.add_argument('--force-test', action='store_true')
> parser.add_argument('--consumer-group-hint', type=str, default='1')
> args = parser.parse_args()
> if args.force_test and args.consumer_group_hint == '1':
> parser.error("With --force-test, should not use default '1' for 
> --consumer-group-hint")
> return args
> def main():
> # args = fetch_arguments_flink_kline()
> # parts = args.src_topic.split('-')
> # _, e, p, s, _ = parts
> # dst_topic = f'\{e}-\{p}-\{s}-Kline\{args.interval}'
> env = StreamExecutionEnvironment.get_execution_environment()
> table_env = StreamTableEnvironment.create(env)
> # 
> table_env.get_config().get_configuration().set_boolean("table.exec.emit.early-fire.enabled",
>  True)
> # 
> table_env.get_config().get_configuration().set_string("table.exec.emit.early-fire.delay",
>  "0 s")
> table_env.get_config().get_configuration().set_string("table.exec.emit.allow-lateness",
>  "1 h")
> # 
> table_env.get_config().get_configuration().set_boolean("table.exec.emit.late-fire.enabled",
>  True)
> # 
> table_env.get_config().get_configuration().set_string("table.exec.emit.late-fire.delay",
>  "0 s")
> table_env.create_temporary_function("deduplicated_sum", deduplicated_sum)
> table_env.create_temporary_function("first_value", first_value)
> table_env.create_temporary_function("last_value", last_value)
> create_source_table_trades(table_env)
> create_sink_table(table_env)
> stmt = f"""
> INSERT INTO dst_kline
> SELECT TUMBLE_START(ts, INTERVAL '1' DAY)
> ,TUMBLE_END(ts, INTERVAL '1' DAY)
> ,MIN(ts_milli)
> ,MIN(ts) AS st
> ,MAX(ts_milli)
> ,MAX(ts) AS et
> ,EXTRACT(MILLISECOND FROM CURRENT_TIMESTAMP) + UNIX_TIMESTAMP() * 1000
> ,CURRENT_TIMESTAMP
> ,first_value(ts_micro, price)
> ,last_value(ts_micro, price)
> ,MAX(price)
> ,MIN(price)
> ,deduplicated_sum(uniqueId, `size`)
> ,deduplicated_sum(uniqueId, price * `size`)
> ,1
> ,CAST((MAX(ts_milli) - MIN(ts_milli)) / 1000 AS INT)
> FROM src_trade
> GROUP BY TUMBLE(ts, INTERVAL '1' DAY)
> """
> table_env.execute_sql(stmt)
> if __name__ == '__main__':
>     main()
> {code}
> It throws the following exception:
> {code}
> Traceback (most recent call last):
>   File "/Users/dianfu/code/src/workspace/pyflink-examples/tests/kl(3).py", 
> line 207, in <module>
>     main()
>   File "/Users/dianfu/code/src/workspace/pyflink-examples/tests/kl(3).py", 
> line 203, in main
>     table_env.execute_sql(stmt)
>   File 
> "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/table/table_environment.py",
>  line 876, in execute_sql
>     return TableResult(self._j_tenv.executeSql(stmt))
>   File 
> "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/py4j/java_gateway.py",
>  line 1286, in __call__
>     answer, self.gateway_client, self.target_id, self.name)
>   File 
> "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/util/exceptions.py",
>  line 146, in deco
>     return f(*a, **kw)
>   File 
> "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/py4j/protocol.py",
>  line 328, in get_return_value
>     format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o10.executeSql.
> : java.lang.NullPointerException
>     at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59)
>     at 
> org.apache.flink.table.functions.python.PythonFunctionInfo.<init>(PythonFunctionInfo.java:45)
>     at 
> org.apache.flink.table.functions.python.PythonAggregateFunctionInfo.<init>(PythonAggregateFunctionInfo.java:36)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.extractPythonAggregateFunctionInfosFromAggregateCall(CommonPythonUtil.java:236)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupWindowAggregate.createPandasPythonStreamWindowGroupOneInputTransformation(StreamExecPythonGroupWindowAggregate.java:365)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupWindowAggregate.translateToPlanInternal(StreamExecPythonGroupWindowAggregate.java:264)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:88)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:114)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:71)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:70)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70)
>     at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:752)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:872)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:742)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>     at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>     at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>     at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>     at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>     at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>     at java.lang.Thread.run(Thread.java:748)
> {code}
> The exception message is confusing and should be improved.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to