Dian Fu created FLINK-27598: ------------------------------- Summary: Improve the exception message when mixing use Python UDF and Pandas UDF Key: FLINK-27598 URL: https://issues.apache.org/jira/browse/FLINK-27598 Project: Flink Issue Type: Bug Components: API / Python Reporter: Dian Fu
For the following job: {code} import argparse from decimal import Decimal from pyflink.common import Row from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, DataTypes from pyflink.table.udf import AggregateFunction, udaf class DeduplicatedSum(AggregateFunction): def create_accumulator(self): return \{int(0), float(0)} def get_value(self, accumulator) -> float: sum(accumulator.values()) def accumulate(self, accumulator, k: int, v: float): if k not in accumulator: accumulator[k] = v def retract(self, accumulator, k: int, v: float): if k in accumulator: del accumulator[k] deduplicated_sum = udaf(f=DeduplicatedSum(), func_type="pandas", result_type=DataTypes.DOUBLE(), input_types=[DataTypes.BIGINT(), DataTypes.DOUBLE()]) class FirstValue(AggregateFunction): def create_accumulator(self): return [int(-1), float(0)] def get_value(self, accumulator) -> float: return accumulator[1] def accumulate(self, accumulator, k: int, v: float): ck = accumulator[0] if ck > k: accumulator[0] = k accumulator[1] = v first_value = udaf(f=FirstValue(), result_type=DataTypes.DOUBLE(), func_type="pandas", input_types=[DataTypes.BIGINT(), DataTypes.DOUBLE()]) class LastValue(AggregateFunction): def create_accumulator(self): return [int(-1), float(0)] def get_value(self, accumulator: Row) -> float: return accumulator[1] def accumulate(self, accumulator: Row, k: int, v: float): ck = accumulator[0] if ck < k: accumulator[0] = k accumulator[1] = v last_value = udaf(f=LastValue(), func_type="pandas", result_type=DataTypes.DOUBLE(), input_types=[DataTypes.BIGINT(), DataTypes.DOUBLE()]) def create_source_table_trades(table_env): source = f""" CREATE TABLE src_trade ( `id` VARCHAR ,`timestamp` BIGINT ,`side` VARCHAR ,`price` DOUBLE ,`size` DOUBLE ,`uniqueId` BIGINT ,ts_micro AS `timestamp` ,ts_milli AS `timestamp` / 1000 ,ts AS TO_TIMESTAMP_LTZ(`timestamp` / 1000, 3) ,WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND ) WITH ( 'connector' = 'datagen') """ table_env.execute_sql(source) def create_sink_table(table_env): sink = f""" CREATE TABLE dst_kline ( wst TIMESTAMP_LTZ(3) ,wet TIMESTAMP_LTZ(3) ,otm BIGINT ,ot TIMESTAMP_LTZ(3) ,ctm BIGINT ,ct TIMESTAMP_LTZ(3) ,ptm BIGINT ,pt TIMESTAMP_LTZ(3) ,`open` DOUBLE ,`close` DOUBLE ,`high` DOUBLE ,`low` DOUBLE ,`vol` DOUBLE -- total trade volume ,`to` DOUBLE -- total turnover value ,`rev` INT -- revision, something we might use for versioning ,`gap` INT -- if this value is reliable ,PRIMARY KEY(wst) NOT ENFORCED ) WITH ( 'connector' = 'print' ) """ table_env.execute_sql(sink) def kafka_src_topic(value): if not len(value.split('-')) == 5: raise argparse.ArgumentTypeError("{} is not a valid kafka topic".format(value)) return value def interval(value): i = [] prev_num = [] for character in value: if character.isalpha(): if prev_num: num = Decimal(''.join(prev_num)) if character == 'd': i.append(f"'\{num}' DAYS") elif character == 'h': i.append(f"'\{num}' HOURS") elif character == 'm': i.append(f"'\{num}' MINUTES") elif character == 's': i.append(f"'\{num}' SECONDS") prev_num = [] elif character.isnumeric() or character == '.': prev_num.append(character) return " ".join(i) def fetch_arguments_flink_kline(): import argparse parser = argparse.ArgumentParser() parser.add_argument('--bootstrap-servers', type=str, required=True) parser.add_argument('--src-topic', type=kafka_src_topic) parser.add_argument('--consume-mode', type=str, default='group-offsets', choices=['group-offsets', 'latest-offset'], help='scan.startup.mode for kafka') parser.add_argument('--interval', type=str, default='20s', help='output interval e.g. 5d4h3m1s, default to 20s') parser.add_argument('--force-test', action='store_true') parser.add_argument('--consumer-group-hint', type=str, default='1') args = parser.parse_args() if args.force_test and args.consumer_group_hint == '1': parser.error("With --force-test, should not use default '1' for --consumer-group-hint") return args def main(): # args = fetch_arguments_flink_kline() # parts = args.src_topic.split('-') # _, e, p, s, _ = parts # dst_topic = f'\{e}-\{p}-\{s}-Kline\{args.interval}' env = StreamExecutionEnvironment.get_execution_environment() table_env = StreamTableEnvironment.create(env) # table_env.get_config().get_configuration().set_boolean("table.exec.emit.early-fire.enabled", True) # table_env.get_config().get_configuration().set_string("table.exec.emit.early-fire.delay", "0 s") table_env.get_config().get_configuration().set_string("table.exec.emit.allow-lateness", "1 h") # table_env.get_config().get_configuration().set_boolean("table.exec.emit.late-fire.enabled", True) # table_env.get_config().get_configuration().set_string("table.exec.emit.late-fire.delay", "0 s") table_env.create_temporary_function("deduplicated_sum", deduplicated_sum) table_env.create_temporary_function("first_value", first_value) table_env.create_temporary_function("last_value", last_value) create_source_table_trades(table_env) create_sink_table(table_env) stmt = f""" INSERT INTO dst_kline SELECT TUMBLE_START(ts, INTERVAL '1' DAY) ,TUMBLE_END(ts, INTERVAL '1' DAY) ,MIN(ts_milli) ,MIN(ts) AS st ,MAX(ts_milli) ,MAX(ts) AS et ,EXTRACT(MILLISECOND FROM CURRENT_TIMESTAMP) + UNIX_TIMESTAMP() * 1000 ,CURRENT_TIMESTAMP ,first_value(ts_micro, price) ,last_value(ts_micro, price) ,MAX(price) ,MIN(price) ,deduplicated_sum(uniqueId, `size`) ,deduplicated_sum(uniqueId, price * `size`) ,1 ,CAST((MAX(ts_milli) - MIN(ts_milli)) / 1000 AS INT) FROM src_trade GROUP BY TUMBLE(ts, INTERVAL '1' DAY) """ table_env.execute_sql(stmt) if __name__ == '__main__': main() {code} It throws the following exception: {code} Traceback (most recent call last): File "/Users/dianfu/code/src/workspace/pyflink-examples/tests/kl(3).py", line 207, in <module> main() File "/Users/dianfu/code/src/workspace/pyflink-examples/tests/kl(3).py", line 203, in main table_env.execute_sql(stmt) File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/table/table_environment.py", line 876, in execute_sql return TableResult(self._j_tenv.executeSql(stmt)) File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 146, in deco return f(*a, **kw) File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o10.executeSql. : java.lang.NullPointerException at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59) at org.apache.flink.table.functions.python.PythonFunctionInfo.<init>(PythonFunctionInfo.java:45) at org.apache.flink.table.functions.python.PythonAggregateFunctionInfo.<init>(PythonAggregateFunctionInfo.java:36) at org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.extractPythonAggregateFunctionInfosFromAggregateCall(CommonPythonUtil.java:236) at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupWindowAggregate.createPandasPythonStreamWindowGroupOneInputTransformation(StreamExecPythonGroupWindowAggregate.java:365) at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupWindowAggregate.translateToPlanInternal(StreamExecPythonGroupWindowAggregate.java:264) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250) at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:88) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250) at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:114) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:71) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:70) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:752) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:872) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:742) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) {code} The exception message is confusing and should be improved. -- This message was sent by Atlassian Jira (v8.20.7#820007)