[ 
https://issues.apache.org/jira/browse/FLINK-31905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-31905:
----------------------------
    Summary: Exception thrown when accessing nested field of the result of 
Python UDF with complex result type  (was: Exception thrown when accessing 
nested field of the result of Python UDF with complex type)

> Exception thrown when accessing nested field of the result of Python UDF with 
> complex result type
> -------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-31905
>                 URL: https://issues.apache.org/jira/browse/FLINK-31905
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>            Reporter: Dian Fu
>            Priority: Major
>
> For the following job:
> {code}
> import logging, sys
> from pyflink.common import Row
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import Schema, DataTypes, TableDescriptor, 
> StreamTableEnvironment
> from pyflink.table.expressions import col, row
> from pyflink.table.udf import ACC, T, udaf, AggregateFunction, udf
> logging.basicConfig(stream=sys.stdout, level=logging.ERROR, 
> format="%(message)s")
> class EmitLastState(AggregateFunction):
>     """
>     Aggregator that emits the latest state for the purpose of
>     enabling parallelism on CDC tables.
>     """
>     def create_accumulator(self) -> ACC:
>         return Row(None, None)
>     def accumulate(self, accumulator: ACC, *args):
>         key, obj = args
>         if (accumulator[0] is None) or (key > accumulator[0]):
>             accumulator[0] = key
>             accumulator[1] = obj
>     def retract(self, accumulator: ACC, *args):
>         pass
>     def get_value(self, accumulator: ACC) -> T:
>         return accumulator[1]
> some_complex_inner_type = DataTypes.ROW(
>     [
>         DataTypes.FIELD("f0", DataTypes.STRING()),
>         DataTypes.FIELD("f1", DataTypes.STRING())
>     ]
> )
> some_complex_type = DataTypes.ROW(
>     [
>         DataTypes.FIELD(k, DataTypes.ARRAY(some_complex_inner_type))
>         for k in ("f0", "f1", "f2")
>     ]
>     + [
>         DataTypes.FIELD("f3", DataTypes.DATE()),
>         DataTypes.FIELD("f4", DataTypes.VARCHAR(32)),
>         DataTypes.FIELD("f5", DataTypes.VARCHAR(2)),
>     ]
> )
> @udf(input_types=DataTypes.STRING(), result_type=some_complex_type)
> def complex_udf(s):
>     return Row(f0=None, f1=None, f2=None, f3=None, f4=None, f5=None)
> if __name__ == "__main__":
>     env = StreamExecutionEnvironment.get_execution_environment()
>     table_env = StreamTableEnvironment.create(env)
>     table_env.get_config().set('pipeline.classpaths', 
> 'file:///Users/dianfu/code/src/workspace/pyflink-examples/flink-sql-connector-postgres-cdc-2.1.1.jar')
>     # Create schema
>     _schema = {
>         "p_key": DataTypes.INT(False),
>         "modified_id": DataTypes.INT(False),
>         "content": DataTypes.STRING()
>     }
>     schema = Schema.new_builder().from_fields(
>         *zip(*[(k, v) for k, v in _schema.items()])
>     ).\
>         primary_key("p_key").\
>         build()
>     # Create table descriptor
>     descriptor = TableDescriptor.for_connector("postgres-cdc").\
>         option("hostname", "host.docker.internal").\
>         option("port", "5432").\
>         option("database-name", "flink_issue").\
>         option("username", "root").\
>         option("password", "root").\
>         option("debezium.plugin.name", "pgoutput").\
>         option("schema-name", "flink_schema").\
>         option("table-name", "flink_table").\
>         option("slot.name", "flink_slot").\
>         schema(schema).\
>         build()
>     table_env.create_temporary_table("flink_table", descriptor)
>     # Create changelog stream
>     stream = table_env.from_path("flink_table")\
>     # Define UDAF
>     accumulator_type = DataTypes.ROW(
>         [
>             DataTypes.FIELD("f0", DataTypes.INT(False)),
>             DataTypes.FIELD("f1", DataTypes.ROW([DataTypes.FIELD(k, v) for k, 
> v in _schema.items()])),
>         ]
>     )
>     result_type = DataTypes.ROW([DataTypes.FIELD(k, v) for k, v in 
> _schema.items()])
>     emit_last = udaf(EmitLastState(), accumulator_type=accumulator_type, 
> result_type=result_type)
>     # Emit last state based on modified_id to enable parallel processing
>     stream = stream.\
>         group_by(col("p_key")).\
>         select(
>         col("p_key"),
>         emit_last(col("modified_id"),row(*(col(k) for k in 
> _schema.keys())).cast(result_type)).alias("tmp_obj")
>     )
>     # Select the elements of the objects
>     stream = stream.select(*(col("tmp_obj").get(k).alias(k) for k in 
> _schema.keys()))
>     # We apply a UDF which parses the xml and returns a complex nested 
> structure
>     stream = stream.select(col("p_key"), 
> complex_udf(col("content")).alias("nested_obj"))
>     # We select an element from the nested structure in order to flatten it
>     # The next line is the line causing issues, commenting the next line will 
> make the pipeline work
>     stream = stream.select(col("p_key"), col("nested_obj").get("f0"))
>     # Interestingly, the below part does work...
>     # stream = stream.select(col("nested_obj").get("f0"))
>     table_env.to_changelog_stream(stream).print()
>     # Execute
>     env.execute_async()
> {code}
> {code}
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> o8.toChangelogStream.
> : java.lang.IndexOutOfBoundsException: Index 1 out of bounds for length 1
>         at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown 
> Source)
>         at 
> java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown 
> Source)
>         at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown 
> Source)
>         at java.base/java.util.Objects.checkIndex(Unknown Source)
>         at java.base/java.util.ArrayList.get(Unknown Source)
>         at 
> org.apache.calcite.rex.RexProgramBuilder$RegisterInputShuttle.visitLocalRef(RexProgramBuilder.java:975)
>         at 
> org.apache.calcite.rex.RexProgramBuilder$RegisterInputShuttle.visitLocalRef(RexProgramBuilder.java:924)
>         at org.apache.calcite.rex.RexLocalRef.accept(RexLocalRef.java:75)
>         at 
> org.apache.calcite.rex.RexShuttle.visitFieldAccess(RexShuttle.java:198)
>         at 
> org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:904)
>         at 
> org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:887)
>         at 
> org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92)
>         at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
>         at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
>         at 
> org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:889)
>         at 
> org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:887)
>         at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
>         at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
>         at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
>         at 
> org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:889)
>         at 
> org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:887)
>         at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
>         at 
> org.apache.calcite.rex.RexProgramBuilder.registerInput(RexProgramBuilder.java:295)
>         at 
> org.apache.calcite.rex.RexProgramBuilder.addProject(RexProgramBuilder.java:206)
>         at org.apache.calcite.rex.RexProgram.create(RexProgram.java:224)
>         at org.apache.calcite.rex.RexProgram.create(RexProgram.java:193)
>         at 
> org.apache.flink.table.planner.plan.rules.logical.PythonCalcSplitRuleBase.onMatch(PythonCalcSplitRule.scala:98)
>         at 
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
>         at 
> org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
>         at 
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
>         at 
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
>         at 
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
>         at 
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
>         at 
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
>         at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:64)
>         at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:78)
>         at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59)
>         at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>         at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>         at scala.collection.Iterator.foreach(Iterator.scala:937)
>         at scala.collection.Iterator.foreach$(Iterator.scala:937)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>         at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>         at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at 
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
>         at 
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
>         at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>         at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56)
>         at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51)
>         at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>         at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>         at scala.collection.immutable.Range.foreach(Range.scala:155)
>         at 
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
>         at 
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
>         at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>         at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51)
>         at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
>         at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>         at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>         at scala.collection.Iterator.foreach(Iterator.scala:937)
>         at scala.collection.Iterator.foreach$(Iterator.scala:937)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>         at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>         at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at 
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
>         at 
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
>         at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>         at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
>         at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
>         at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
>         at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
>         at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
>         at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
>         at 
> org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
>         at 
> org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
>         at 
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toChangelogStream(StreamTableEnvironmentImpl.java:263)
>         at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
>         at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
> Source)
>         at java.base/java.lang.reflect.Method.invoke(Unknown Source)
>         at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>         at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>         at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>         at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>         at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>         at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>         at java.base/java.lang.Thread.run(Unknown Source)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to