[
https://issues.apache.org/jira/browse/FLINK-31905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dian Fu updated FLINK-31905:
----------------------------
Summary: Exception thrown when accessing nested field of the result of
Python UDF with complex result type (was: Exception thrown when accessing
nested field of the result of Python UDF with complex type)
> Exception thrown when accessing nested field of the result of Python UDF with
> complex result type
> -------------------------------------------------------------------------------------------------
>
> Key: FLINK-31905
> URL: https://issues.apache.org/jira/browse/FLINK-31905
> Project: Flink
> Issue Type: Bug
> Components: API / Python
> Reporter: Dian Fu
> Priority: Major
>
> For the following job:
> {code}
> import logging, sys
> from pyflink.common import Row
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import Schema, DataTypes, TableDescriptor,
> StreamTableEnvironment
> from pyflink.table.expressions import col, row
> from pyflink.table.udf import ACC, T, udaf, AggregateFunction, udf
> logging.basicConfig(stream=sys.stdout, level=logging.ERROR,
> format="%(message)s")
> class EmitLastState(AggregateFunction):
> """
> Aggregator that emits the latest state for the purpose of
> enabling parallelism on CDC tables.
> """
> def create_accumulator(self) -> ACC:
> return Row(None, None)
> def accumulate(self, accumulator: ACC, *args):
> key, obj = args
> if (accumulator[0] is None) or (key > accumulator[0]):
> accumulator[0] = key
> accumulator[1] = obj
> def retract(self, accumulator: ACC, *args):
> pass
> def get_value(self, accumulator: ACC) -> T:
> return accumulator[1]
> some_complex_inner_type = DataTypes.ROW(
> [
> DataTypes.FIELD("f0", DataTypes.STRING()),
> DataTypes.FIELD("f1", DataTypes.STRING())
> ]
> )
> some_complex_type = DataTypes.ROW(
> [
> DataTypes.FIELD(k, DataTypes.ARRAY(some_complex_inner_type))
> for k in ("f0", "f1", "f2")
> ]
> + [
> DataTypes.FIELD("f3", DataTypes.DATE()),
> DataTypes.FIELD("f4", DataTypes.VARCHAR(32)),
> DataTypes.FIELD("f5", DataTypes.VARCHAR(2)),
> ]
> )
> @udf(input_types=DataTypes.STRING(), result_type=some_complex_type)
> def complex_udf(s):
> return Row(f0=None, f1=None, f2=None, f3=None, f4=None, f5=None)
> if __name__ == "__main__":
> env = StreamExecutionEnvironment.get_execution_environment()
> table_env = StreamTableEnvironment.create(env)
> table_env.get_config().set('pipeline.classpaths',
> 'file:///Users/dianfu/code/src/workspace/pyflink-examples/flink-sql-connector-postgres-cdc-2.1.1.jar')
> # Create schema
> _schema = {
> "p_key": DataTypes.INT(False),
> "modified_id": DataTypes.INT(False),
> "content": DataTypes.STRING()
> }
> schema = Schema.new_builder().from_fields(
> *zip(*[(k, v) for k, v in _schema.items()])
> ).\
> primary_key("p_key").\
> build()
> # Create table descriptor
> descriptor = TableDescriptor.for_connector("postgres-cdc").\
> option("hostname", "host.docker.internal").\
> option("port", "5432").\
> option("database-name", "flink_issue").\
> option("username", "root").\
> option("password", "root").\
> option("debezium.plugin.name", "pgoutput").\
> option("schema-name", "flink_schema").\
> option("table-name", "flink_table").\
> option("slot.name", "flink_slot").\
> schema(schema).\
> build()
> table_env.create_temporary_table("flink_table", descriptor)
> # Create changelog stream
> stream = table_env.from_path("flink_table")\
> # Define UDAF
> accumulator_type = DataTypes.ROW(
> [
> DataTypes.FIELD("f0", DataTypes.INT(False)),
> DataTypes.FIELD("f1", DataTypes.ROW([DataTypes.FIELD(k, v) for k,
> v in _schema.items()])),
> ]
> )
> result_type = DataTypes.ROW([DataTypes.FIELD(k, v) for k, v in
> _schema.items()])
> emit_last = udaf(EmitLastState(), accumulator_type=accumulator_type,
> result_type=result_type)
> # Emit last state based on modified_id to enable parallel processing
> stream = stream.\
> group_by(col("p_key")).\
> select(
> col("p_key"),
> emit_last(col("modified_id"),row(*(col(k) for k in
> _schema.keys())).cast(result_type)).alias("tmp_obj")
> )
> # Select the elements of the objects
> stream = stream.select(*(col("tmp_obj").get(k).alias(k) for k in
> _schema.keys()))
> # We apply a UDF which parses the xml and returns a complex nested
> structure
> stream = stream.select(col("p_key"),
> complex_udf(col("content")).alias("nested_obj"))
> # We select an element from the nested structure in order to flatten it
> # The next line is the line causing issues, commenting the next line will
> make the pipeline work
> stream = stream.select(col("p_key"), col("nested_obj").get("f0"))
> # Interestingly, the below part does work...
> # stream = stream.select(col("nested_obj").get("f0"))
> table_env.to_changelog_stream(stream).print()
> # Execute
> env.execute_async()
> {code}
> {code}
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o8.toChangelogStream.
> : java.lang.IndexOutOfBoundsException: Index 1 out of bounds for length 1
> at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown
> Source)
> at
> java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown
> Source)
> at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown
> Source)
> at java.base/java.util.Objects.checkIndex(Unknown Source)
> at java.base/java.util.ArrayList.get(Unknown Source)
> at
> org.apache.calcite.rex.RexProgramBuilder$RegisterInputShuttle.visitLocalRef(RexProgramBuilder.java:975)
> at
> org.apache.calcite.rex.RexProgramBuilder$RegisterInputShuttle.visitLocalRef(RexProgramBuilder.java:924)
> at org.apache.calcite.rex.RexLocalRef.accept(RexLocalRef.java:75)
> at
> org.apache.calcite.rex.RexShuttle.visitFieldAccess(RexShuttle.java:198)
> at
> org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:904)
> at
> org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:887)
> at
> org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92)
> at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
> at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
> at
> org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:889)
> at
> org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:887)
> at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
> at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
> at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
> at
> org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:889)
> at
> org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:887)
> at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
> at
> org.apache.calcite.rex.RexProgramBuilder.registerInput(RexProgramBuilder.java:295)
> at
> org.apache.calcite.rex.RexProgramBuilder.addProject(RexProgramBuilder.java:206)
> at org.apache.calcite.rex.RexProgram.create(RexProgram.java:224)
> at org.apache.calcite.rex.RexProgram.create(RexProgram.java:193)
> at
> org.apache.flink.table.planner.plan.rules.logical.PythonCalcSplitRuleBase.onMatch(PythonCalcSplitRule.scala:98)
> at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
> at
> org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
> at
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
> at
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
> at
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:64)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:78)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59)
> at
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
> at
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
> at scala.collection.Iterator.foreach(Iterator.scala:937)
> at scala.collection.Iterator.foreach$(Iterator.scala:937)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
> at
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
> at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51)
> at
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
> at
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
> at scala.collection.immutable.Range.foreach(Range.scala:155)
> at
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
> at
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
> at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
> at
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
> at
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
> at scala.collection.Iterator.foreach(Iterator.scala:937)
> at scala.collection.Iterator.foreach$(Iterator.scala:937)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
> at
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
> at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
> at
> org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
> at
> org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
> at
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toChangelogStream(StreamTableEnvironmentImpl.java:263)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source)
> at java.base/java.lang.reflect.Method.invoke(Unknown Source)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.base/java.lang.Thread.run(Unknown Source)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)