Harshvardhan created BEAM-12848:
-----------------------------------

             Summary: apache_beam.io.external.kafka.ReadFromKafka throws 
IndexError
                 Key: BEAM-12848
                 URL: https://issues.apache.org/jira/browse/BEAM-12848
             Project: Beam
          Issue Type: Bug
          Components: beam-model
            Reporter: Harshvardhan


Kafka.ReadFromKafka throws *IndexError: tuple index out of range* due to 
unimplemented "*_get_named_tuple_instance*" function  of class 
*SchemaBasedPayloadBuilder(PayloadBuilder):* 

 

 

*Stacktrace:*

Traceback (most recent call last):
 File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main
 return _run_code(code, main_globals, None,
 File "/usr/lib/python3.9/runpy.py", line 87, in _run_code
 exec(code, run_globals)
 File "/code/src/beam_example/beamKafkaRedis.py", line 36, in <module>
 notifications = pipeline | "Reading messages from Kafka" >> 
kafka.ReadFromKafka(
 File "/usr/local/lib/python3.9/dist-packages/apache_beam/io/kafka.py", line 
166, in __init__
 super(ReadFromKafka, self).__init__(
 File 
"/usr/local/lib/python3.9/dist-packages/apache_beam/transforms/external.py", 
line 217, in __init__
 payload.payload() if isinstance(payload, PayloadBuilder) else payload)
 File 
"/usr/local/lib/python3.9/dist-packages/apache_beam/transforms/external.py", 
line 93, in payload
 return self.build().SerializeToString()
 File 
"/usr/local/lib/python3.9/dist-packages/apache_beam/transforms/external.py", 
line 106, in build
 schema = named_tuple_to_schema(type(row))
 File 
"/usr/local/lib/python3.9/dist-packages/apache_beam/typehints/schemas.py", line 
276, in named_tuple_to_schema
 return typing_to_runner_api(named_tuple).row_type.schema
 File 
"/usr/local/lib/python3.9/dist-packages/apache_beam/typehints/schemas.py", line 
184, in typing_to_runner_api
 element_type = typing_to_runner_api(_get_args(type_)[0])
IndexError: tuple index out of range
args: ['--runner=PortableRunner', '--streaming', '--sdk_worker_parallelism=2', 
'--job_name=beam-readKafkaTopic', '--environment_type=PROCESS', 
'--environment_config=\{"command": "/opt/apache/beam/boot"}', 
'--job_name=beam-kafkaConnect', '--job_endpoint=localhost:39295']

at 
org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.runDriverProgram(FlinkPortableClientEntryPoint.java:192)
 ~[?:?]
 at 
org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.main(FlinkPortableClientEntryPoint.java:100)
 ~[?:?]
 at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
 at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 ~[?:?]
 at 
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:?]
 at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
 at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to