damccorm opened a new issue, #21138:
URL: https://github.com/apache/beam/issues/21138
Kafka.ReadFromKafka throws *IndexError: tuple index out of range* due to
unimplemented "*_get_named_tuple_instance*" function of class
*SchemaBasedPayloadBuilder(PayloadBuilder):*
*Stacktrace:*
Traceback (most recent call last):
File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/usr/lib/python3.9/runpy.py", line 87, in _run_code
exec(code, run_globals)
File "/code/src/beam_example/beamKafkaRedis.py", line 36, in <module\>
notifications = pipeline | "Reading messages from Kafka" \>\>
kafka.ReadFromKafka(
File "/usr/local/lib/python3.9/dist-packages/apache_beam/io/kafka.py", line
166, in __init__
super(ReadFromKafka, self).__init__(
File
"/usr/local/lib/python3.9/dist-packages/apache_beam/transforms/external.py",
line 217, in __init__
payload.payload() if isinstance(payload, PayloadBuilder) else payload)
File
"/usr/local/lib/python3.9/dist-packages/apache_beam/transforms/external.py",
line 93, in payload
return self.build().SerializeToString()
File
"/usr/local/lib/python3.9/dist-packages/apache_beam/transforms/external.py",
line 106, in build
schema = named_tuple_to_schema(type(row))
File
"/usr/local/lib/python3.9/dist-packages/apache_beam/typehints/schemas.py", line
276, in named_tuple_to_schema
return typing_to_runner_api(named_tuple).row_type.schema
File
"/usr/local/lib/python3.9/dist-packages/apache_beam/typehints/schemas.py", line
184, in typing_to_runner_api
element_type = typing_to_runner_api(_get_args(type_)[0])
IndexError: tuple index out of range
args: ['\--runner=PortableRunner', '\--streaming',
'\--sdk_worker_parallelism=2', '\--job_name=beam-readKafkaTopic',
'\--environment_type=PROCESS', '\--environment_config=\{"command":
"/opt/apache/beam/boot"}', '\--job_name=beam-kafkaConnect',
'\--job_endpoint=localhost:39295']
at
org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.runDriverProgram(FlinkPortableClientEntryPoint.java:192)
~[?:?]
at
org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.main(FlinkPortableClientEntryPoint.java:100)
~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:?]
at
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:?]
at
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:?]
at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
Imported from Jira
[BEAM-12848](https://issues.apache.org/jira/browse/BEAM-12848). Original Jira
may contain additional context.
Reported by: Harsh_99.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]