[ 
https://issues.apache.org/jira/browse/FLINK-31905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-31905:
----------------------------
    Description: 
For the following job:
{code}
import logging, sys

from pyflink.common import Row
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import Schema, DataTypes, TableDescriptor, 
StreamTableEnvironment
from pyflink.table.expressions import col, row
from pyflink.table.udf import ACC, T, udaf, AggregateFunction, udf

logging.basicConfig(stream=sys.stdout, level=logging.ERROR, 
format="%(message)s")


class EmitLastState(AggregateFunction):
    """
    Aggregator that emits the latest state for the purpose of
    enabling parallelism on CDC tables.
    """

    def create_accumulator(self) -> ACC:
        return Row(None, None)

    def accumulate(self, accumulator: ACC, *args):
        key, obj = args
        if (accumulator[0] is None) or (key > accumulator[0]):
            accumulator[0] = key
            accumulator[1] = obj

    def retract(self, accumulator: ACC, *args):
        pass

    def get_value(self, accumulator: ACC) -> T:
        return accumulator[1]


some_complex_inner_type = DataTypes.ROW(
    [
        DataTypes.FIELD("f0", DataTypes.STRING()),
        DataTypes.FIELD("f1", DataTypes.STRING())
    ]
)

some_complex_type = DataTypes.ROW(
    [
        DataTypes.FIELD(k, DataTypes.ARRAY(some_complex_inner_type))
        for k in ("f0", "f1", "f2")
    ]
    + [
        DataTypes.FIELD("f3", DataTypes.DATE()),
        DataTypes.FIELD("f4", DataTypes.VARCHAR(32)),
        DataTypes.FIELD("f5", DataTypes.VARCHAR(2)),
    ]
)

@udf(input_types=DataTypes.STRING(), result_type=some_complex_type)
def complex_udf(s):
    return Row(f0=None, f1=None, f2=None, f3=None, f4=None, f5=None)


if __name__ == "__main__":
    env = StreamExecutionEnvironment.get_execution_environment()
    table_env = StreamTableEnvironment.create(env)
    table_env.get_config().set('pipeline.classpaths', 
'file:///Users/dianfu/code/src/workspace/pyflink-examples/flink-sql-connector-postgres-cdc-2.1.1.jar')

    # Create schema
    _schema = {
        "p_key": DataTypes.INT(False),
        "modified_id": DataTypes.INT(False),
        "content": DataTypes.STRING()
    }
    schema = Schema.new_builder().from_fields(
        *zip(*[(k, v) for k, v in _schema.items()])
    ).\
        primary_key("p_key").\
        build()

    # Create table descriptor
    descriptor = TableDescriptor.for_connector("postgres-cdc").\
        option("hostname", "host.docker.internal").\
        option("port", "5432").\
        option("database-name", "flink_issue").\
        option("username", "root").\
        option("password", "root").\
        option("debezium.plugin.name", "pgoutput").\
        option("schema-name", "flink_schema").\
        option("table-name", "flink_table").\
        option("slot.name", "flink_slot").\
        schema(schema).\
        build()

    table_env.create_temporary_table("flink_table", descriptor)

    # Create changelog stream
    stream = table_env.from_path("flink_table")\

    # Define UDAF
    accumulator_type = DataTypes.ROW(
        [
            DataTypes.FIELD("f0", DataTypes.INT(False)),
            DataTypes.FIELD("f1", DataTypes.ROW([DataTypes.FIELD(k, v) for k, v 
in _schema.items()])),
        ]
    )
    result_type = DataTypes.ROW([DataTypes.FIELD(k, v) for k, v in 
_schema.items()])
    emit_last = udaf(EmitLastState(), accumulator_type=accumulator_type, 
result_type=result_type)

    # Emit last state based on modified_id to enable parallel processing
    stream = stream.\
        group_by(col("p_key")).\
        select(
        col("p_key"),
        emit_last(col("modified_id"),row(*(col(k) for k in 
_schema.keys())).cast(result_type)).alias("tmp_obj")
    )

    # Select the elements of the objects
    stream = stream.select(*(col("tmp_obj").get(k).alias(k) for k in 
_schema.keys()))

    # We apply a UDF which parses the xml and returns a complex nested structure
    stream = stream.select(col("p_key"), 
complex_udf(col("content")).alias("nested_obj"))

    # We select an element from the nested structure in order to flatten it
    # The next line is the line causing issues, commenting the next line will 
make the pipeline work
    stream = stream.select(col("p_key"), col("nested_obj").get("f0"))

    # Interestingly, the below part does work...
    # stream = stream.select(col("nested_obj").get("f0"))

    table_env.to_changelog_stream(stream).print()

    # Execute
    env.execute_async()
{code}

{code}
py4j.protocol.Py4JJavaError: An error occurred while calling 
o8.toChangelogStream.
: java.lang.IndexOutOfBoundsException: Index 1 out of bounds for length 1
        at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
        at 
java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source)
        at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
        at java.base/java.util.Objects.checkIndex(Unknown Source)
        at java.base/java.util.ArrayList.get(Unknown Source)
        at 
org.apache.calcite.rex.RexProgramBuilder$RegisterInputShuttle.visitLocalRef(RexProgramBuilder.java:975)
        at 
org.apache.calcite.rex.RexProgramBuilder$RegisterInputShuttle.visitLocalRef(RexProgramBuilder.java:924)
        at org.apache.calcite.rex.RexLocalRef.accept(RexLocalRef.java:75)
        at 
org.apache.calcite.rex.RexShuttle.visitFieldAccess(RexShuttle.java:198)
        at 
org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:904)
        at 
org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:887)
        at org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92)
        at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
        at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
        at 
org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:889)
        at 
org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:887)
        at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
        at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
        at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
        at 
org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:889)
        at 
org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:887)
        at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
        at 
org.apache.calcite.rex.RexProgramBuilder.registerInput(RexProgramBuilder.java:295)
        at 
org.apache.calcite.rex.RexProgramBuilder.addProject(RexProgramBuilder.java:206)
        at org.apache.calcite.rex.RexProgram.create(RexProgram.java:224)
        at org.apache.calcite.rex.RexProgram.create(RexProgram.java:193)
        at 
org.apache.flink.table.planner.plan.rules.logical.PythonCalcSplitRuleBase.onMatch(PythonCalcSplitRule.scala:98)
        at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
        at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
        at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
        at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
        at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
        at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
        at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:64)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:78)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59)
        at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
        at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
        at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51)
        at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
        at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
        at scala.collection.immutable.Range.foreach(Range.scala:155)
        at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
        at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
        at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
        at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
        at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
        at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
        at 
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
        at 
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
        at 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toChangelogStream(StreamTableEnvironmentImpl.java:263)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Unknown Source)
{code}

PS: This issue is reported in 
https://apache-flink.slack.com/archives/C03G7LJTS2G/p1681918854457699

  was:
For the following job:
{code}
import logging, sys

from pyflink.common import Row
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import Schema, DataTypes, TableDescriptor, 
StreamTableEnvironment
from pyflink.table.expressions import col, row
from pyflink.table.udf import ACC, T, udaf, AggregateFunction, udf

logging.basicConfig(stream=sys.stdout, level=logging.ERROR, 
format="%(message)s")


class EmitLastState(AggregateFunction):
    """
    Aggregator that emits the latest state for the purpose of
    enabling parallelism on CDC tables.
    """

    def create_accumulator(self) -> ACC:
        return Row(None, None)

    def accumulate(self, accumulator: ACC, *args):
        key, obj = args
        if (accumulator[0] is None) or (key > accumulator[0]):
            accumulator[0] = key
            accumulator[1] = obj

    def retract(self, accumulator: ACC, *args):
        pass

    def get_value(self, accumulator: ACC) -> T:
        return accumulator[1]


some_complex_inner_type = DataTypes.ROW(
    [
        DataTypes.FIELD("f0", DataTypes.STRING()),
        DataTypes.FIELD("f1", DataTypes.STRING())
    ]
)

some_complex_type = DataTypes.ROW(
    [
        DataTypes.FIELD(k, DataTypes.ARRAY(some_complex_inner_type))
        for k in ("f0", "f1", "f2")
    ]
    + [
        DataTypes.FIELD("f3", DataTypes.DATE()),
        DataTypes.FIELD("f4", DataTypes.VARCHAR(32)),
        DataTypes.FIELD("f5", DataTypes.VARCHAR(2)),
    ]
)

@udf(input_types=DataTypes.STRING(), result_type=some_complex_type)
def complex_udf(s):
    return Row(f0=None, f1=None, f2=None, f3=None, f4=None, f5=None)


if __name__ == "__main__":
    env = StreamExecutionEnvironment.get_execution_environment()
    table_env = StreamTableEnvironment.create(env)
    table_env.get_config().set('pipeline.classpaths', 
'file:///Users/dianfu/code/src/workspace/pyflink-examples/flink-sql-connector-postgres-cdc-2.1.1.jar')

    # Create schema
    _schema = {
        "p_key": DataTypes.INT(False),
        "modified_id": DataTypes.INT(False),
        "content": DataTypes.STRING()
    }
    schema = Schema.new_builder().from_fields(
        *zip(*[(k, v) for k, v in _schema.items()])
    ).\
        primary_key("p_key").\
        build()

    # Create table descriptor
    descriptor = TableDescriptor.for_connector("postgres-cdc").\
        option("hostname", "host.docker.internal").\
        option("port", "5432").\
        option("database-name", "flink_issue").\
        option("username", "root").\
        option("password", "root").\
        option("debezium.plugin.name", "pgoutput").\
        option("schema-name", "flink_schema").\
        option("table-name", "flink_table").\
        option("slot.name", "flink_slot").\
        schema(schema).\
        build()

    table_env.create_temporary_table("flink_table", descriptor)

    # Create changelog stream
    stream = table_env.from_path("flink_table")\

    # Define UDAF
    accumulator_type = DataTypes.ROW(
        [
            DataTypes.FIELD("f0", DataTypes.INT(False)),
            DataTypes.FIELD("f1", DataTypes.ROW([DataTypes.FIELD(k, v) for k, v 
in _schema.items()])),
        ]
    )
    result_type = DataTypes.ROW([DataTypes.FIELD(k, v) for k, v in 
_schema.items()])
    emit_last = udaf(EmitLastState(), accumulator_type=accumulator_type, 
result_type=result_type)

    # Emit last state based on modified_id to enable parallel processing
    stream = stream.\
        group_by(col("p_key")).\
        select(
        col("p_key"),
        emit_last(col("modified_id"),row(*(col(k) for k in 
_schema.keys())).cast(result_type)).alias("tmp_obj")
    )

    # Select the elements of the objects
    stream = stream.select(*(col("tmp_obj").get(k).alias(k) for k in 
_schema.keys()))

    # We apply a UDF which parses the xml and returns a complex nested structure
    stream = stream.select(col("p_key"), 
complex_udf(col("content")).alias("nested_obj"))

    # We select an element from the nested structure in order to flatten it
    # The next line is the line causing issues, commenting the next line will 
make the pipeline work
    stream = stream.select(col("p_key"), col("nested_obj").get("f0"))

    # Interestingly, the below part does work...
    # stream = stream.select(col("nested_obj").get("f0"))

    table_env.to_changelog_stream(stream).print()

    # Execute
    env.execute_async()
{code}

{code}
py4j.protocol.Py4JJavaError: An error occurred while calling 
o8.toChangelogStream.
: java.lang.IndexOutOfBoundsException: Index 1 out of bounds for length 1
        at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
        at 
java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source)
        at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
        at java.base/java.util.Objects.checkIndex(Unknown Source)
        at java.base/java.util.ArrayList.get(Unknown Source)
        at 
org.apache.calcite.rex.RexProgramBuilder$RegisterInputShuttle.visitLocalRef(RexProgramBuilder.java:975)
        at 
org.apache.calcite.rex.RexProgramBuilder$RegisterInputShuttle.visitLocalRef(RexProgramBuilder.java:924)
        at org.apache.calcite.rex.RexLocalRef.accept(RexLocalRef.java:75)
        at 
org.apache.calcite.rex.RexShuttle.visitFieldAccess(RexShuttle.java:198)
        at 
org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:904)
        at 
org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:887)
        at org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92)
        at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
        at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
        at 
org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:889)
        at 
org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:887)
        at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
        at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
        at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
        at 
org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:889)
        at 
org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:887)
        at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
        at 
org.apache.calcite.rex.RexProgramBuilder.registerInput(RexProgramBuilder.java:295)
        at 
org.apache.calcite.rex.RexProgramBuilder.addProject(RexProgramBuilder.java:206)
        at org.apache.calcite.rex.RexProgram.create(RexProgram.java:224)
        at org.apache.calcite.rex.RexProgram.create(RexProgram.java:193)
        at 
org.apache.flink.table.planner.plan.rules.logical.PythonCalcSplitRuleBase.onMatch(PythonCalcSplitRule.scala:98)
        at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
        at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
        at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
        at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
        at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
        at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
        at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:64)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:78)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59)
        at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
        at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
        at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51)
        at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
        at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
        at scala.collection.immutable.Range.foreach(Range.scala:155)
        at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
        at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
        at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
        at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
        at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
        at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
        at 
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
        at 
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
        at 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toChangelogStream(StreamTableEnvironmentImpl.java:263)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Unknown Source)
{code}


> Exception thrown when accessing nested field of the result of Python UDF with 
> complex result type
> -------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-31905
>                 URL: https://issues.apache.org/jira/browse/FLINK-31905
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>            Reporter: Dian Fu
>            Priority: Major
>
> For the following job:
> {code}
> import logging, sys
> from pyflink.common import Row
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import Schema, DataTypes, TableDescriptor, 
> StreamTableEnvironment
> from pyflink.table.expressions import col, row
> from pyflink.table.udf import ACC, T, udaf, AggregateFunction, udf
> logging.basicConfig(stream=sys.stdout, level=logging.ERROR, 
> format="%(message)s")
> class EmitLastState(AggregateFunction):
>     """
>     Aggregator that emits the latest state for the purpose of
>     enabling parallelism on CDC tables.
>     """
>     def create_accumulator(self) -> ACC:
>         return Row(None, None)
>     def accumulate(self, accumulator: ACC, *args):
>         key, obj = args
>         if (accumulator[0] is None) or (key > accumulator[0]):
>             accumulator[0] = key
>             accumulator[1] = obj
>     def retract(self, accumulator: ACC, *args):
>         pass
>     def get_value(self, accumulator: ACC) -> T:
>         return accumulator[1]
> some_complex_inner_type = DataTypes.ROW(
>     [
>         DataTypes.FIELD("f0", DataTypes.STRING()),
>         DataTypes.FIELD("f1", DataTypes.STRING())
>     ]
> )
> some_complex_type = DataTypes.ROW(
>     [
>         DataTypes.FIELD(k, DataTypes.ARRAY(some_complex_inner_type))
>         for k in ("f0", "f1", "f2")
>     ]
>     + [
>         DataTypes.FIELD("f3", DataTypes.DATE()),
>         DataTypes.FIELD("f4", DataTypes.VARCHAR(32)),
>         DataTypes.FIELD("f5", DataTypes.VARCHAR(2)),
>     ]
> )
> @udf(input_types=DataTypes.STRING(), result_type=some_complex_type)
> def complex_udf(s):
>     return Row(f0=None, f1=None, f2=None, f3=None, f4=None, f5=None)
> if __name__ == "__main__":
>     env = StreamExecutionEnvironment.get_execution_environment()
>     table_env = StreamTableEnvironment.create(env)
>     table_env.get_config().set('pipeline.classpaths', 
> 'file:///Users/dianfu/code/src/workspace/pyflink-examples/flink-sql-connector-postgres-cdc-2.1.1.jar')
>     # Create schema
>     _schema = {
>         "p_key": DataTypes.INT(False),
>         "modified_id": DataTypes.INT(False),
>         "content": DataTypes.STRING()
>     }
>     schema = Schema.new_builder().from_fields(
>         *zip(*[(k, v) for k, v in _schema.items()])
>     ).\
>         primary_key("p_key").\
>         build()
>     # Create table descriptor
>     descriptor = TableDescriptor.for_connector("postgres-cdc").\
>         option("hostname", "host.docker.internal").\
>         option("port", "5432").\
>         option("database-name", "flink_issue").\
>         option("username", "root").\
>         option("password", "root").\
>         option("debezium.plugin.name", "pgoutput").\
>         option("schema-name", "flink_schema").\
>         option("table-name", "flink_table").\
>         option("slot.name", "flink_slot").\
>         schema(schema).\
>         build()
>     table_env.create_temporary_table("flink_table", descriptor)
>     # Create changelog stream
>     stream = table_env.from_path("flink_table")\
>     # Define UDAF
>     accumulator_type = DataTypes.ROW(
>         [
>             DataTypes.FIELD("f0", DataTypes.INT(False)),
>             DataTypes.FIELD("f1", DataTypes.ROW([DataTypes.FIELD(k, v) for k, 
> v in _schema.items()])),
>         ]
>     )
>     result_type = DataTypes.ROW([DataTypes.FIELD(k, v) for k, v in 
> _schema.items()])
>     emit_last = udaf(EmitLastState(), accumulator_type=accumulator_type, 
> result_type=result_type)
>     # Emit last state based on modified_id to enable parallel processing
>     stream = stream.\
>         group_by(col("p_key")).\
>         select(
>         col("p_key"),
>         emit_last(col("modified_id"),row(*(col(k) for k in 
> _schema.keys())).cast(result_type)).alias("tmp_obj")
>     )
>     # Select the elements of the objects
>     stream = stream.select(*(col("tmp_obj").get(k).alias(k) for k in 
> _schema.keys()))
>     # We apply a UDF which parses the xml and returns a complex nested 
> structure
>     stream = stream.select(col("p_key"), 
> complex_udf(col("content")).alias("nested_obj"))
>     # We select an element from the nested structure in order to flatten it
>     # The next line is the line causing issues, commenting the next line will 
> make the pipeline work
>     stream = stream.select(col("p_key"), col("nested_obj").get("f0"))
>     # Interestingly, the below part does work...
>     # stream = stream.select(col("nested_obj").get("f0"))
>     table_env.to_changelog_stream(stream).print()
>     # Execute
>     env.execute_async()
> {code}
> {code}
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> o8.toChangelogStream.
> : java.lang.IndexOutOfBoundsException: Index 1 out of bounds for length 1
>         at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown 
> Source)
>         at 
> java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown 
> Source)
>         at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown 
> Source)
>         at java.base/java.util.Objects.checkIndex(Unknown Source)
>         at java.base/java.util.ArrayList.get(Unknown Source)
>         at 
> org.apache.calcite.rex.RexProgramBuilder$RegisterInputShuttle.visitLocalRef(RexProgramBuilder.java:975)
>         at 
> org.apache.calcite.rex.RexProgramBuilder$RegisterInputShuttle.visitLocalRef(RexProgramBuilder.java:924)
>         at org.apache.calcite.rex.RexLocalRef.accept(RexLocalRef.java:75)
>         at 
> org.apache.calcite.rex.RexShuttle.visitFieldAccess(RexShuttle.java:198)
>         at 
> org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:904)
>         at 
> org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:887)
>         at 
> org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92)
>         at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
>         at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
>         at 
> org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:889)
>         at 
> org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:887)
>         at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
>         at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
>         at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
>         at 
> org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:889)
>         at 
> org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:887)
>         at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
>         at 
> org.apache.calcite.rex.RexProgramBuilder.registerInput(RexProgramBuilder.java:295)
>         at 
> org.apache.calcite.rex.RexProgramBuilder.addProject(RexProgramBuilder.java:206)
>         at org.apache.calcite.rex.RexProgram.create(RexProgram.java:224)
>         at org.apache.calcite.rex.RexProgram.create(RexProgram.java:193)
>         at 
> org.apache.flink.table.planner.plan.rules.logical.PythonCalcSplitRuleBase.onMatch(PythonCalcSplitRule.scala:98)
>         at 
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
>         at 
> org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
>         at 
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
>         at 
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
>         at 
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
>         at 
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
>         at 
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
>         at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:64)
>         at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:78)
>         at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59)
>         at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>         at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>         at scala.collection.Iterator.foreach(Iterator.scala:937)
>         at scala.collection.Iterator.foreach$(Iterator.scala:937)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>         at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>         at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at 
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
>         at 
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
>         at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>         at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56)
>         at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51)
>         at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>         at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>         at scala.collection.immutable.Range.foreach(Range.scala:155)
>         at 
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
>         at 
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
>         at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>         at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51)
>         at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
>         at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>         at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>         at scala.collection.Iterator.foreach(Iterator.scala:937)
>         at scala.collection.Iterator.foreach$(Iterator.scala:937)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>         at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>         at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at 
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
>         at 
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
>         at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>         at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
>         at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
>         at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
>         at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
>         at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
>         at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
>         at 
> org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
>         at 
> org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
>         at 
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toChangelogStream(StreamTableEnvironmentImpl.java:263)
>         at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
>         at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
> Source)
>         at java.base/java.lang.reflect.Method.invoke(Unknown Source)
>         at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>         at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>         at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>         at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>         at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>         at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>         at java.base/java.lang.Thread.run(Unknown Source)
> {code}
> PS: This issue is reported in 
> https://apache-flink.slack.com/archives/C03G7LJTS2G/p1681918854457699



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to