[
https://issues.apache.org/jira/browse/BEAM-11862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chamikara Madhusanka Jayalath updated BEAM-11862:
-------------------------------------------------
Status: Open (was: Triage Needed)
> Write To Kafka does not work
> ----------------------------
>
> Key: BEAM-11862
> URL: https://issues.apache.org/jira/browse/BEAM-11862
> Project: Beam
> Issue Type: Bug
> Components: io-py-kafka
> Affects Versions: 2.28.0
> Reporter: Dénes Bartha
> Assignee: Chamikara Madhusanka Jayalath
> Priority: P1
> Fix For: 2.29.0
>
>
> I am trying to send data to a Kafka topic in Python using {{WriteToKafka}}
> via Apache Beam using Dataflow as a runner.
> By running the following script:
> {code:java}
> with beam.Pipeline(options=beam_options) as p:
> (p
> | beam.Impulse()
> | beam.Map(lambda input: (1, input))
> | WriteToKafka(
> producer_config={
> 'bootstrap.servers': 'ip:9092,',
> },
> topic='testclient',
>
> key_serializer='org.apache.kafka.common.serialization.LongSerializer',
>
> value_serializer='org.apache.kafka.common.serialization.ByteArraySerializer',
> )
> )
> {code}
> I am getting this error:
>
> {code:java}
> Traceback (most recent call last):
> File "/home/denes/data-science/try_write_to_kafka.py", line 75, in <module>
> run_pipeline(beam_options)
> File "/home/denes/data-science/try_write_to_kafka.py", line 38, in
> run_pipeline
> (p
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
> line 582, in _exit_
> self.result = self.run()
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
> line 529, in run
> return Pipeline.from_runner_api(
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
> line 904, in from_runner_api
> p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",
> line 115, in get_by_id
> self._id_to_obj[id] = self._obj_type.from_runner_api(
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
> line 1259, in from_runner_api
> part = context.transforms.get_by_id(transform_id)
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",
> line 115, in get_by_id
> self._id_to_obj[id] = self._obj_type.from_runner_api(
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
> line 1259, in from_runner_api
> part = context.transforms.get_by_id(transform_id)
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",
> line 115, in get_by_id
> self._id_to_obj[id] = self._obj_type.from_runner_api(
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
> line 1259, in from_runner_api
> part = context.transforms.get_by_id(transform_id)
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",
> line 115, in get_by_id
> self._id_to_obj[id] = self._obj_type.from_runner_api(
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
> line 1259, in from_runner_api
> part = context.transforms.get_by_id(transform_id)
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",
> line 115, in get_by_id
> self._id_to_obj[id] = self._obj_type.from_runner_api(
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
> line 1236, in from_runner_api
> transform = ptransform.PTransform.from_runner_api(proto, context)
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py",
> line 700, in from_runner_api
> return constructor(
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/core.py",
> line 1419, in from_runner_api_parameter
> DoFnInfo.from_runner_api(
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/core.py",
> line 1493, in from_runner_api
> raise ValueError('Unexpected DoFn type: %s' % spec.urn)
> ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1
> {code}
>
> If I am not wrong, the problem is with the serialization methods. I have
> tried all sorts of combinations that I have found on
> [this|https://kafka.apache.org/26/javadoc/org/apache/kafka/common/serialization/]
> page.
> When I do not specify the serializers then I get this error: {{RuntimeError:}}
> {code:java}
> Traceback (most recent call last):Traceback (most recent call last): File
> "/home/denes/data-science/try_write_to_kafka.py", line 48, in <module>
> run_pipeline(beam_options) File
> "/home/denes/data-science/try_write_to_kafka.py", line 14, in run_pipeline
> WriteToKafka( File
> "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/pvalue.py",
> line 141, in __or__ return self.pipeline.apply(ptransform, self) File
> "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
> line 689, in apply pvalueish_result = self.runner.apply(transform,
> pvalueish, self._options) File
> "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/runners/runner.py",
> line 188, in apply return m(transform, input, options) File
> "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/runners/runner.py",
> line 218, in apply_PTransform return transform.expand(input) File
> "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/transforms/external.py",
> line 318, in expand raise RuntimeError(response.error)RuntimeError:
> java.lang.ClassCastException: class org.apache.beam.sdk.coders.VarLongCoder
> cannot be cast to class org.apache.beam.sdk.coders.KvCoder
> (org.apache.beam.sdk.coders.VarLongCoder and
> org.apache.beam.sdk.coders.KvCoder are in unnamed module of loader 'app') at
> org.apache.beam.sdk.io.kafka.KafkaIO$Write.expand(KafkaIO.java:2295) at
> org.apache.beam.sdk.io.kafka.KafkaIO$Write.expand(KafkaIO.java:2088) at
> org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:547) at
> org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:498) at
> org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(ExpansionService.java:360)
> at
> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:436)
> at
> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:491)
> at
> org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:232)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:834)
> {code}
> {{Note that I have installed the latest apache-beam version via `pip install
> 'apache-beam'`}}.
> apache-beam==2.28.0
--
This message was sent by Atlassian Jira
(v8.3.4#803005)