damccorm opened a new issue, #21138:
URL: https://github.com/apache/beam/issues/21138

   Kafka.ReadFromKafka throws *IndexError: tuple index out of range* due to 
unimplemented "*_get_named_tuple_instance*" function  of class 
*SchemaBasedPayloadBuilder(PayloadBuilder):* 
   
    
   
    
   
   *Stacktrace:*
   
   Traceback (most recent call last):
    File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main
    return _run_code(code, main_globals, None,
    File "/usr/lib/python3.9/runpy.py", line 87, in _run_code
    exec(code, run_globals)
    File "/code/src/beam_example/beamKafkaRedis.py", line 36, in <module\>
    notifications = pipeline | "Reading messages from Kafka" \>\> 
kafka.ReadFromKafka(
    File "/usr/local/lib/python3.9/dist-packages/apache_beam/io/kafka.py", line 
166, in __init__
    super(ReadFromKafka, self).__init__(
    File 
"/usr/local/lib/python3.9/dist-packages/apache_beam/transforms/external.py", 
line 217, in __init__
    payload.payload() if isinstance(payload, PayloadBuilder) else payload)
    File 
"/usr/local/lib/python3.9/dist-packages/apache_beam/transforms/external.py", 
line 93, in payload
    return self.build().SerializeToString()
    File 
"/usr/local/lib/python3.9/dist-packages/apache_beam/transforms/external.py", 
line 106, in build
    schema = named_tuple_to_schema(type(row))
    File 
"/usr/local/lib/python3.9/dist-packages/apache_beam/typehints/schemas.py", line 
276, in named_tuple_to_schema
    return typing_to_runner_api(named_tuple).row_type.schema
    File 
"/usr/local/lib/python3.9/dist-packages/apache_beam/typehints/schemas.py", line 
184, in typing_to_runner_api
    element_type = typing_to_runner_api(_get_args(type_)[0])
   IndexError: tuple index out of range
   args: ['\--runner=PortableRunner', '\--streaming', 
'\--sdk_worker_parallelism=2', '\--job_name=beam-readKafkaTopic', 
'\--environment_type=PROCESS', '\--environment_config=\{"command": 
"/opt/apache/beam/boot"}', '\--job_name=beam-kafkaConnect', 
'\--job_endpoint=localhost:39295']
   
   at 
org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.runDriverProgram(FlinkPortableClientEntryPoint.java:192)
 ~[?:?]
    at 
org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.main(FlinkPortableClientEntryPoint.java:100)
 ~[?:?]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:?]
    at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 ~[?:?]
    at 
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:?]
    at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
    at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
   
   Imported from Jira 
[BEAM-12848](https://issues.apache.org/jira/browse/BEAM-12848). Original Jira 
may contain additional context.
   Reported by: Harsh_99.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to