[ https://issues.apache.org/jira/browse/FLINK-27598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Xingbo Huang reassigned FLINK-27598: ------------------------------------ Assignee: Xingbo Huang > Improve the exception message when mixing use Python UDF and Pandas UDF > ----------------------------------------------------------------------- > > Key: FLINK-27598 > URL: https://issues.apache.org/jira/browse/FLINK-27598 > Project: Flink > Issue Type: Bug > Components: API / Python > Reporter: Dian Fu > Assignee: Xingbo Huang > Priority: Major > > For the following job: > {code} > import argparse > from decimal import Decimal > from pyflink.common import Row > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import StreamTableEnvironment, DataTypes > from pyflink.table.udf import AggregateFunction, udaf > class DeduplicatedSum(AggregateFunction): > def create_accumulator(self): > return \{int(0), float(0)} > def get_value(self, accumulator) -> float: > sum(accumulator.values()) > def accumulate(self, accumulator, k: int, v: float): > if k not in accumulator: > accumulator[k] = v > def retract(self, accumulator, k: int, v: float): > if k in accumulator: > del accumulator[k] > deduplicated_sum = udaf(f=DeduplicatedSum(), > func_type="pandas", > result_type=DataTypes.DOUBLE(), > input_types=[DataTypes.BIGINT(), DataTypes.DOUBLE()]) > class FirstValue(AggregateFunction): > def create_accumulator(self): > return [int(-1), float(0)] > def get_value(self, accumulator) -> float: > return accumulator[1] > def accumulate(self, accumulator, k: int, v: float): > ck = accumulator[0] > if ck > k: > accumulator[0] = k > accumulator[1] = v > first_value = udaf(f=FirstValue(), > result_type=DataTypes.DOUBLE(), > func_type="pandas", > input_types=[DataTypes.BIGINT(), DataTypes.DOUBLE()]) > class LastValue(AggregateFunction): > def create_accumulator(self): > return [int(-1), float(0)] > def get_value(self, accumulator: Row) -> float: > return accumulator[1] > def accumulate(self, accumulator: Row, k: int, v: float): > ck = accumulator[0] > if ck < k: > accumulator[0] = k > accumulator[1] = v > last_value = udaf(f=LastValue(), > func_type="pandas", > result_type=DataTypes.DOUBLE(), > input_types=[DataTypes.BIGINT(), DataTypes.DOUBLE()]) > def create_source_table_trades(table_env): > source = f""" > CREATE TABLE src_trade ( > `id` VARCHAR > ,`timestamp` BIGINT > ,`side` VARCHAR > ,`price` DOUBLE > ,`size` DOUBLE > ,`uniqueId` BIGINT > ,ts_micro AS `timestamp` > ,ts_milli AS `timestamp` / 1000 > ,ts AS TO_TIMESTAMP_LTZ(`timestamp` / 1000, 3) > ,WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND > ) WITH ( > 'connector' = 'datagen') > """ > table_env.execute_sql(source) > def create_sink_table(table_env): > sink = f""" > CREATE TABLE dst_kline ( > wst TIMESTAMP_LTZ(3) > ,wet TIMESTAMP_LTZ(3) > ,otm BIGINT > ,ot TIMESTAMP_LTZ(3) > ,ctm BIGINT > ,ct TIMESTAMP_LTZ(3) > ,ptm BIGINT > ,pt TIMESTAMP_LTZ(3) > ,`open` DOUBLE > ,`close` DOUBLE > ,`high` DOUBLE > ,`low` DOUBLE > ,`vol` DOUBLE -- total trade volume > ,`to` DOUBLE -- total turnover value > ,`rev` INT -- revision, something we might use for versioning > ,`gap` INT -- if this value is reliable > ,PRIMARY KEY(wst) NOT ENFORCED > ) WITH ( > 'connector' = 'print' > ) > """ > table_env.execute_sql(sink) > def kafka_src_topic(value): > if not len(value.split('-')) == 5: > raise argparse.ArgumentTypeError("{} is not a valid kafka > topic".format(value)) > return value > def interval(value): > i = [] > prev_num = [] > for character in value: > if character.isalpha(): > if prev_num: > num = Decimal(''.join(prev_num)) > if character == 'd': > i.append(f"'\{num}' DAYS") > elif character == 'h': > i.append(f"'\{num}' HOURS") > elif character == 'm': > i.append(f"'\{num}' MINUTES") > elif character == 's': > i.append(f"'\{num}' SECONDS") > prev_num = [] > elif character.isnumeric() or character == '.': > prev_num.append(character) > return " ".join(i) > def fetch_arguments_flink_kline(): > import argparse > parser = argparse.ArgumentParser() > parser.add_argument('--bootstrap-servers', type=str, required=True) > parser.add_argument('--src-topic', type=kafka_src_topic) > parser.add_argument('--consume-mode', type=str, default='group-offsets', > choices=['group-offsets', 'latest-offset'], > help='scan.startup.mode for kafka') > parser.add_argument('--interval', type=str, default='20s', > help='output interval e.g. 5d4h3m1s, default to 20s') > parser.add_argument('--force-test', action='store_true') > parser.add_argument('--consumer-group-hint', type=str, default='1') > args = parser.parse_args() > if args.force_test and args.consumer_group_hint == '1': > parser.error("With --force-test, should not use default '1' for > --consumer-group-hint") > return args > def main(): > # args = fetch_arguments_flink_kline() > # parts = args.src_topic.split('-') > # _, e, p, s, _ = parts > # dst_topic = f'\{e}-\{p}-\{s}-Kline\{args.interval}' > env = StreamExecutionEnvironment.get_execution_environment() > table_env = StreamTableEnvironment.create(env) > # > table_env.get_config().get_configuration().set_boolean("table.exec.emit.early-fire.enabled", > True) > # > table_env.get_config().get_configuration().set_string("table.exec.emit.early-fire.delay", > "0 s") > table_env.get_config().get_configuration().set_string("table.exec.emit.allow-lateness", > "1 h") > # > table_env.get_config().get_configuration().set_boolean("table.exec.emit.late-fire.enabled", > True) > # > table_env.get_config().get_configuration().set_string("table.exec.emit.late-fire.delay", > "0 s") > table_env.create_temporary_function("deduplicated_sum", deduplicated_sum) > table_env.create_temporary_function("first_value", first_value) > table_env.create_temporary_function("last_value", last_value) > create_source_table_trades(table_env) > create_sink_table(table_env) > stmt = f""" > INSERT INTO dst_kline > SELECT TUMBLE_START(ts, INTERVAL '1' DAY) > ,TUMBLE_END(ts, INTERVAL '1' DAY) > ,MIN(ts_milli) > ,MIN(ts) AS st > ,MAX(ts_milli) > ,MAX(ts) AS et > ,EXTRACT(MILLISECOND FROM CURRENT_TIMESTAMP) + UNIX_TIMESTAMP() * 1000 > ,CURRENT_TIMESTAMP > ,first_value(ts_micro, price) > ,last_value(ts_micro, price) > ,MAX(price) > ,MIN(price) > ,deduplicated_sum(uniqueId, `size`) > ,deduplicated_sum(uniqueId, price * `size`) > ,1 > ,CAST((MAX(ts_milli) - MIN(ts_milli)) / 1000 AS INT) > FROM src_trade > GROUP BY TUMBLE(ts, INTERVAL '1' DAY) > """ > table_env.execute_sql(stmt) > if __name__ == '__main__': > main() > {code} > It throws the following exception: > {code} > Traceback (most recent call last): > File "/Users/dianfu/code/src/workspace/pyflink-examples/tests/kl(3).py", > line 207, in <module> > main() > File "/Users/dianfu/code/src/workspace/pyflink-examples/tests/kl(3).py", > line 203, in main > table_env.execute_sql(stmt) > File > "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/table/table_environment.py", > line 876, in execute_sql > return TableResult(self._j_tenv.executeSql(stmt)) > File > "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/py4j/java_gateway.py", > line 1286, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File > "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/util/exceptions.py", > line 146, in deco > return f(*a, **kw) > File > "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/py4j/protocol.py", > line 328, in get_return_value > format(target_id, ".", name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling o10.executeSql. > : java.lang.NullPointerException > at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59) > at > org.apache.flink.table.functions.python.PythonFunctionInfo.<init>(PythonFunctionInfo.java:45) > at > org.apache.flink.table.functions.python.PythonAggregateFunctionInfo.<init>(PythonAggregateFunctionInfo.java:36) > at > org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.extractPythonAggregateFunctionInfosFromAggregateCall(CommonPythonUtil.java:236) > at > org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupWindowAggregate.createPandasPythonStreamWindowGroupOneInputTransformation(StreamExecPythonGroupWindowAggregate.java:365) > at > org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupWindowAggregate.translateToPlanInternal(StreamExecPythonGroupWindowAggregate.java:264) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250) > at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:88) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250) > at > org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:114) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:71) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:70) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:752) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:872) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:742) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.lang.Thread.run(Thread.java:748) > {code} > The exception message is confusing and should be improved. -- This message was sent by Atlassian Jira (v8.20.10#820010)