[
https://issues.apache.org/jira/browse/FLINK-31905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dian Fu updated FLINK-31905:
----------------------------
Description:
For the following job:
{code}
import logging, sys
from pyflink.common import Row
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import Schema, DataTypes, TableDescriptor,
StreamTableEnvironment
from pyflink.table.expressions import col, row
from pyflink.table.udf import ACC, T, udaf, AggregateFunction, udf
logging.basicConfig(stream=sys.stdout, level=logging.ERROR,
format="%(message)s")
class EmitLastState(AggregateFunction):
"""
Aggregator that emits the latest state for the purpose of
enabling parallelism on CDC tables.
"""
def create_accumulator(self) -> ACC:
return Row(None, None)
def accumulate(self, accumulator: ACC, *args):
key, obj = args
if (accumulator[0] is None) or (key > accumulator[0]):
accumulator[0] = key
accumulator[1] = obj
def retract(self, accumulator: ACC, *args):
pass
def get_value(self, accumulator: ACC) -> T:
return accumulator[1]
some_complex_inner_type = DataTypes.ROW(
[
DataTypes.FIELD("f0", DataTypes.STRING()),
DataTypes.FIELD("f1", DataTypes.STRING())
]
)
some_complex_type = DataTypes.ROW(
[
DataTypes.FIELD(k, DataTypes.ARRAY(some_complex_inner_type))
for k in ("f0", "f1", "f2")
]
+ [
DataTypes.FIELD("f3", DataTypes.DATE()),
DataTypes.FIELD("f4", DataTypes.VARCHAR(32)),
DataTypes.FIELD("f5", DataTypes.VARCHAR(2)),
]
)
@udf(input_types=DataTypes.STRING(), result_type=some_complex_type)
def complex_udf(s):
return Row(f0=None, f1=None, f2=None, f3=None, f4=None, f5=None)
if __name__ == "__main__":
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)
table_env.get_config().set('pipeline.classpaths',
'file:///Users/dianfu/code/src/workspace/pyflink-examples/flink-sql-connector-postgres-cdc-2.1.1.jar')
# Create schema
_schema = {
"p_key": DataTypes.INT(False),
"modified_id": DataTypes.INT(False),
"content": DataTypes.STRING()
}
schema = Schema.new_builder().from_fields(
*zip(*[(k, v) for k, v in _schema.items()])
).\
primary_key("p_key").\
build()
# Create table descriptor
descriptor = TableDescriptor.for_connector("postgres-cdc").\
option("hostname", "host.docker.internal").\
option("port", "5432").\
option("database-name", "flink_issue").\
option("username", "root").\
option("password", "root").\
option("debezium.plugin.name", "pgoutput").\
option("schema-name", "flink_schema").\
option("table-name", "flink_table").\
option("slot.name", "flink_slot").\
schema(schema).\
build()
table_env.create_temporary_table("flink_table", descriptor)
# Create changelog stream
stream = table_env.from_path("flink_table")\
# Define UDAF
accumulator_type = DataTypes.ROW(
[
DataTypes.FIELD("f0", DataTypes.INT(False)),
DataTypes.FIELD("f1", DataTypes.ROW([DataTypes.FIELD(k, v) for k, v
in _schema.items()])),
]
)
result_type = DataTypes.ROW([DataTypes.FIELD(k, v) for k, v in
_schema.items()])
emit_last = udaf(EmitLastState(), accumulator_type=accumulator_type,
result_type=result_type)
# Emit last state based on modified_id to enable parallel processing
stream = stream.\
group_by(col("p_key")).\
select(
col("p_key"),
emit_last(col("modified_id"),row(*(col(k) for k in
_schema.keys())).cast(result_type)).alias("tmp_obj")
)
# Select the elements of the objects
stream = stream.select(*(col("tmp_obj").get(k).alias(k) for k in
_schema.keys()))
# We apply a UDF which parses the xml and returns a complex nested structure
stream = stream.select(col("p_key"),
complex_udf(col("content")).alias("nested_obj"))
# We select an element from the nested structure in order to flatten it
# The next line is the line causing issues, commenting the next line will
make the pipeline work
stream = stream.select(col("p_key"), col("nested_obj").get("f0"))
# Interestingly, the below part does work...
# stream = stream.select(col("nested_obj").get("f0"))
table_env.to_changelog_stream(stream).print()
# Execute
env.execute_async()
{code}
{code}
py4j.protocol.Py4JJavaError: An error occurred while calling
o8.toChangelogStream.
: java.lang.IndexOutOfBoundsException: Index 1 out of bounds for length 1
at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
at
java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source)
at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
at java.base/java.util.Objects.checkIndex(Unknown Source)
at java.base/java.util.ArrayList.get(Unknown Source)
at
org.apache.calcite.rex.RexProgramBuilder$RegisterInputShuttle.visitLocalRef(RexProgramBuilder.java:975)
at
org.apache.calcite.rex.RexProgramBuilder$RegisterInputShuttle.visitLocalRef(RexProgramBuilder.java:924)
at org.apache.calcite.rex.RexLocalRef.accept(RexLocalRef.java:75)
at
org.apache.calcite.rex.RexShuttle.visitFieldAccess(RexShuttle.java:198)
at
org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:904)
at
org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:887)
at org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92)
at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
at
org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:889)
at
org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:887)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
at
org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:889)
at
org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:887)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at
org.apache.calcite.rex.RexProgramBuilder.registerInput(RexProgramBuilder.java:295)
at
org.apache.calcite.rex.RexProgramBuilder.addProject(RexProgramBuilder.java:206)
at org.apache.calcite.rex.RexProgram.create(RexProgram.java:224)
at org.apache.calcite.rex.RexProgram.create(RexProgram.java:193)
at
org.apache.flink.table.planner.plan.rules.logical.PythonCalcSplitRuleBase.onMatch(PythonCalcSplitRule.scala:98)
at
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
at
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
at
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
at
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
at
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:64)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:78)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59)
at
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51)
at
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.immutable.Range.foreach(Range.scala:155)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
at
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
at
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
at
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
at
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
at
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
at
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toChangelogStream(StreamTableEnvironmentImpl.java:263)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Unknown Source)
{code}
PS: This issue is reported in
https://apache-flink.slack.com/archives/C03G7LJTS2G/p1681918854457699
was:
For the following job:
{code}
import logging, sys
from pyflink.common import Row
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import Schema, DataTypes, TableDescriptor,
StreamTableEnvironment
from pyflink.table.expressions import col, row
from pyflink.table.udf import ACC, T, udaf, AggregateFunction, udf
logging.basicConfig(stream=sys.stdout, level=logging.ERROR,
format="%(message)s")
class EmitLastState(AggregateFunction):
"""
Aggregator that emits the latest state for the purpose of
enabling parallelism on CDC tables.
"""
def create_accumulator(self) -> ACC:
return Row(None, None)
def accumulate(self, accumulator: ACC, *args):
key, obj = args
if (accumulator[0] is None) or (key > accumulator[0]):
accumulator[0] = key
accumulator[1] = obj
def retract(self, accumulator: ACC, *args):
pass
def get_value(self, accumulator: ACC) -> T:
return accumulator[1]
some_complex_inner_type = DataTypes.ROW(
[
DataTypes.FIELD("f0", DataTypes.STRING()),
DataTypes.FIELD("f1", DataTypes.STRING())
]
)
some_complex_type = DataTypes.ROW(
[
DataTypes.FIELD(k, DataTypes.ARRAY(some_complex_inner_type))
for k in ("f0", "f1", "f2")
]
+ [
DataTypes.FIELD("f3", DataTypes.DATE()),
DataTypes.FIELD("f4", DataTypes.VARCHAR(32)),
DataTypes.FIELD("f5", DataTypes.VARCHAR(2)),
]
)
@udf(input_types=DataTypes.STRING(), result_type=some_complex_type)
def complex_udf(s):
return Row(f0=None, f1=None, f2=None, f3=None, f4=None, f5=None)
if __name__ == "__main__":
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)
table_env.get_config().set('pipeline.classpaths',
'file:///Users/dianfu/code/src/workspace/pyflink-examples/flink-sql-connector-postgres-cdc-2.1.1.jar')
# Create schema
_schema = {
"p_key": DataTypes.INT(False),
"modified_id": DataTypes.INT(False),
"content": DataTypes.STRING()
}
schema = Schema.new_builder().from_fields(
*zip(*[(k, v) for k, v in _schema.items()])
).\
primary_key("p_key").\
build()
# Create table descriptor
descriptor = TableDescriptor.for_connector("postgres-cdc").\
option("hostname", "host.docker.internal").\
option("port", "5432").\
option("database-name", "flink_issue").\
option("username", "root").\
option("password", "root").\
option("debezium.plugin.name", "pgoutput").\
option("schema-name", "flink_schema").\
option("table-name", "flink_table").\
option("slot.name", "flink_slot").\
schema(schema).\
build()
table_env.create_temporary_table("flink_table", descriptor)
# Create changelog stream
stream = table_env.from_path("flink_table")\
# Define UDAF
accumulator_type = DataTypes.ROW(
[
DataTypes.FIELD("f0", DataTypes.INT(False)),
DataTypes.FIELD("f1", DataTypes.ROW([DataTypes.FIELD(k, v) for k, v
in _schema.items()])),
]
)
result_type = DataTypes.ROW([DataTypes.FIELD(k, v) for k, v in
_schema.items()])
emit_last = udaf(EmitLastState(), accumulator_type=accumulator_type,
result_type=result_type)
# Emit last state based on modified_id to enable parallel processing
stream = stream.\
group_by(col("p_key")).\
select(
col("p_key"),
emit_last(col("modified_id"),row(*(col(k) for k in
_schema.keys())).cast(result_type)).alias("tmp_obj")
)
# Select the elements of the objects
stream = stream.select(*(col("tmp_obj").get(k).alias(k) for k in
_schema.keys()))
# We apply a UDF which parses the xml and returns a complex nested structure
stream = stream.select(col("p_key"),
complex_udf(col("content")).alias("nested_obj"))
# We select an element from the nested structure in order to flatten it
# The next line is the line causing issues, commenting the next line will
make the pipeline work
stream = stream.select(col("p_key"), col("nested_obj").get("f0"))
# Interestingly, the below part does work...
# stream = stream.select(col("nested_obj").get("f0"))
table_env.to_changelog_stream(stream).print()
# Execute
env.execute_async()
{code}
{code}
py4j.protocol.Py4JJavaError: An error occurred while calling
o8.toChangelogStream.
: java.lang.IndexOutOfBoundsException: Index 1 out of bounds for length 1
at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
at
java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source)
at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
at java.base/java.util.Objects.checkIndex(Unknown Source)
at java.base/java.util.ArrayList.get(Unknown Source)
at
org.apache.calcite.rex.RexProgramBuilder$RegisterInputShuttle.visitLocalRef(RexProgramBuilder.java:975)
at
org.apache.calcite.rex.RexProgramBuilder$RegisterInputShuttle.visitLocalRef(RexProgramBuilder.java:924)
at org.apache.calcite.rex.RexLocalRef.accept(RexLocalRef.java:75)
at
org.apache.calcite.rex.RexShuttle.visitFieldAccess(RexShuttle.java:198)
at
org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:904)
at
org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:887)
at org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92)
at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
at
org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:889)
at
org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:887)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
at
org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:889)
at
org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:887)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at
org.apache.calcite.rex.RexProgramBuilder.registerInput(RexProgramBuilder.java:295)
at
org.apache.calcite.rex.RexProgramBuilder.addProject(RexProgramBuilder.java:206)
at org.apache.calcite.rex.RexProgram.create(RexProgram.java:224)
at org.apache.calcite.rex.RexProgram.create(RexProgram.java:193)
at
org.apache.flink.table.planner.plan.rules.logical.PythonCalcSplitRuleBase.onMatch(PythonCalcSplitRule.scala:98)
at
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
at
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
at
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
at
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
at
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:64)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:78)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59)
at
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51)
at
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.immutable.Range.foreach(Range.scala:155)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
at
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
at
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
at
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
at
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
at
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
at
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toChangelogStream(StreamTableEnvironmentImpl.java:263)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Unknown Source)
{code}
> Exception thrown when accessing nested field of the result of Python UDF with
> complex result type
> -------------------------------------------------------------------------------------------------
>
> Key: FLINK-31905
> URL: https://issues.apache.org/jira/browse/FLINK-31905
> Project: Flink
> Issue Type: Bug
> Components: API / Python
> Reporter: Dian Fu
> Priority: Major
>
> For the following job:
> {code}
> import logging, sys
> from pyflink.common import Row
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import Schema, DataTypes, TableDescriptor,
> StreamTableEnvironment
> from pyflink.table.expressions import col, row
> from pyflink.table.udf import ACC, T, udaf, AggregateFunction, udf
> logging.basicConfig(stream=sys.stdout, level=logging.ERROR,
> format="%(message)s")
> class EmitLastState(AggregateFunction):
> """
> Aggregator that emits the latest state for the purpose of
> enabling parallelism on CDC tables.
> """
> def create_accumulator(self) -> ACC:
> return Row(None, None)
> def accumulate(self, accumulator: ACC, *args):
> key, obj = args
> if (accumulator[0] is None) or (key > accumulator[0]):
> accumulator[0] = key
> accumulator[1] = obj
> def retract(self, accumulator: ACC, *args):
> pass
> def get_value(self, accumulator: ACC) -> T:
> return accumulator[1]
> some_complex_inner_type = DataTypes.ROW(
> [
> DataTypes.FIELD("f0", DataTypes.STRING()),
> DataTypes.FIELD("f1", DataTypes.STRING())
> ]
> )
> some_complex_type = DataTypes.ROW(
> [
> DataTypes.FIELD(k, DataTypes.ARRAY(some_complex_inner_type))
> for k in ("f0", "f1", "f2")
> ]
> + [
> DataTypes.FIELD("f3", DataTypes.DATE()),
> DataTypes.FIELD("f4", DataTypes.VARCHAR(32)),
> DataTypes.FIELD("f5", DataTypes.VARCHAR(2)),
> ]
> )
> @udf(input_types=DataTypes.STRING(), result_type=some_complex_type)
> def complex_udf(s):
> return Row(f0=None, f1=None, f2=None, f3=None, f4=None, f5=None)
> if __name__ == "__main__":
> env = StreamExecutionEnvironment.get_execution_environment()
> table_env = StreamTableEnvironment.create(env)
> table_env.get_config().set('pipeline.classpaths',
> 'file:///Users/dianfu/code/src/workspace/pyflink-examples/flink-sql-connector-postgres-cdc-2.1.1.jar')
> # Create schema
> _schema = {
> "p_key": DataTypes.INT(False),
> "modified_id": DataTypes.INT(False),
> "content": DataTypes.STRING()
> }
> schema = Schema.new_builder().from_fields(
> *zip(*[(k, v) for k, v in _schema.items()])
> ).\
> primary_key("p_key").\
> build()
> # Create table descriptor
> descriptor = TableDescriptor.for_connector("postgres-cdc").\
> option("hostname", "host.docker.internal").\
> option("port", "5432").\
> option("database-name", "flink_issue").\
> option("username", "root").\
> option("password", "root").\
> option("debezium.plugin.name", "pgoutput").\
> option("schema-name", "flink_schema").\
> option("table-name", "flink_table").\
> option("slot.name", "flink_slot").\
> schema(schema).\
> build()
> table_env.create_temporary_table("flink_table", descriptor)
> # Create changelog stream
> stream = table_env.from_path("flink_table")\
> # Define UDAF
> accumulator_type = DataTypes.ROW(
> [
> DataTypes.FIELD("f0", DataTypes.INT(False)),
> DataTypes.FIELD("f1", DataTypes.ROW([DataTypes.FIELD(k, v) for k,
> v in _schema.items()])),
> ]
> )
> result_type = DataTypes.ROW([DataTypes.FIELD(k, v) for k, v in
> _schema.items()])
> emit_last = udaf(EmitLastState(), accumulator_type=accumulator_type,
> result_type=result_type)
> # Emit last state based on modified_id to enable parallel processing
> stream = stream.\
> group_by(col("p_key")).\
> select(
> col("p_key"),
> emit_last(col("modified_id"),row(*(col(k) for k in
> _schema.keys())).cast(result_type)).alias("tmp_obj")
> )
> # Select the elements of the objects
> stream = stream.select(*(col("tmp_obj").get(k).alias(k) for k in
> _schema.keys()))
> # We apply a UDF which parses the xml and returns a complex nested
> structure
> stream = stream.select(col("p_key"),
> complex_udf(col("content")).alias("nested_obj"))
> # We select an element from the nested structure in order to flatten it
> # The next line is the line causing issues, commenting the next line will
> make the pipeline work
> stream = stream.select(col("p_key"), col("nested_obj").get("f0"))
> # Interestingly, the below part does work...
> # stream = stream.select(col("nested_obj").get("f0"))
> table_env.to_changelog_stream(stream).print()
> # Execute
> env.execute_async()
> {code}
> {code}
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o8.toChangelogStream.
> : java.lang.IndexOutOfBoundsException: Index 1 out of bounds for length 1
> at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown
> Source)
> at
> java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown
> Source)
> at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown
> Source)
> at java.base/java.util.Objects.checkIndex(Unknown Source)
> at java.base/java.util.ArrayList.get(Unknown Source)
> at
> org.apache.calcite.rex.RexProgramBuilder$RegisterInputShuttle.visitLocalRef(RexProgramBuilder.java:975)
> at
> org.apache.calcite.rex.RexProgramBuilder$RegisterInputShuttle.visitLocalRef(RexProgramBuilder.java:924)
> at org.apache.calcite.rex.RexLocalRef.accept(RexLocalRef.java:75)
> at
> org.apache.calcite.rex.RexShuttle.visitFieldAccess(RexShuttle.java:198)
> at
> org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:904)
> at
> org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:887)
> at
> org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92)
> at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
> at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
> at
> org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:889)
> at
> org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:887)
> at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
> at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
> at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
> at
> org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:889)
> at
> org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:887)
> at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
> at
> org.apache.calcite.rex.RexProgramBuilder.registerInput(RexProgramBuilder.java:295)
> at
> org.apache.calcite.rex.RexProgramBuilder.addProject(RexProgramBuilder.java:206)
> at org.apache.calcite.rex.RexProgram.create(RexProgram.java:224)
> at org.apache.calcite.rex.RexProgram.create(RexProgram.java:193)
> at
> org.apache.flink.table.planner.plan.rules.logical.PythonCalcSplitRuleBase.onMatch(PythonCalcSplitRule.scala:98)
> at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
> at
> org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
> at
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
> at
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
> at
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:64)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:78)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59)
> at
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
> at
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
> at scala.collection.Iterator.foreach(Iterator.scala:937)
> at scala.collection.Iterator.foreach$(Iterator.scala:937)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
> at
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
> at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51)
> at
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
> at
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
> at scala.collection.immutable.Range.foreach(Range.scala:155)
> at
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
> at
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
> at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
> at
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
> at
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
> at scala.collection.Iterator.foreach(Iterator.scala:937)
> at scala.collection.Iterator.foreach$(Iterator.scala:937)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
> at
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
> at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
> at
> org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
> at
> org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
> at
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toChangelogStream(StreamTableEnvironmentImpl.java:263)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source)
> at java.base/java.lang.reflect.Method.invoke(Unknown Source)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.base/java.lang.Thread.run(Unknown Source)
> {code}
> PS: This issue is reported in
> https://apache-flink.slack.com/archives/C03G7LJTS2G/p1681918854457699
--
This message was sent by Atlassian Jira
(v8.20.10#820010)