[ 
https://issues.apache.org/jira/browse/BEAM-11862?focusedWorklogId=570220&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-570220
 ]

ASF GitHub Bot logged work on BEAM-11862:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Mar/21 04:27
            Start Date: 23/Mar/21 04:27
    Worklog Time Spent: 10m 
      Work Description: chamikaramj opened a new pull request #14306:
URL: https://github.com/apache/beam/pull/14306


   This cherry-picks https://github.com/apache/beam/pull/14293 to 2.29.0 
release branch.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
    - [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   
------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
 | ---
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
 | ---
   XLang | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
 | ---
   
   Pre-Commit Tests Status (on master branch)
   
------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build 
Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/)
 <br>[![Build 
Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/)
 | --- | --- | --- | ---
   
   See 
[.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md)
 for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   
------------------------------------------------------------------------------------------------
   [![Build python source distribution and 
wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python 
tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java 
tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more 
information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 570220)
    Time Spent: 2h 20m  (was: 2h 10m)

> Write To Kafka does not work
> ----------------------------
>
>                 Key: BEAM-11862
>                 URL: https://issues.apache.org/jira/browse/BEAM-11862
>             Project: Beam
>          Issue Type: Bug
>          Components: cross-language, io-py-kafka
>    Affects Versions: 2.28.0
>            Reporter: Dénes Bartha
>            Assignee: Chamikara Madhusanka Jayalath
>            Priority: P1
>             Fix For: 2.29.0
>
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> I am trying to send data to a Kafka topic in Python using {{WriteToKafka}} 
> via Apache Beam using Dataflow as a runner.
> By running the following script:
> {code:java}
> with beam.Pipeline(options=beam_options) as p:
>         (p
>         | beam.Impulse()
>         | beam.Map(lambda input: (1, input))
>         | WriteToKafka(
>                     producer_config={
>                         'bootstrap.servers': 'ip:9092,',
>                     },
>                     topic='testclient',
>                     
> key_serializer='org.apache.kafka.common.serialization.LongSerializer',
>                     
> value_serializer='org.apache.kafka.common.serialization.ByteArraySerializer',
>                 )
>          )
> {code}
> I am getting this error:
>  
> {code:java}
> Traceback (most recent call last):
>  File "/home/denes/data-science/try_write_to_kafka.py", line 75, in <module>
>  run_pipeline(beam_options)
>  File "/home/denes/data-science/try_write_to_kafka.py", line 38, in 
> run_pipeline
>  (p
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
>  line 582, in _exit_
>  self.result = self.run()
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
>  line 529, in run
>  return Pipeline.from_runner_api(
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
>  line 904, in from_runner_api
>  p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",
>  line 115, in get_by_id
>  self._id_to_obj[id] = self._obj_type.from_runner_api(
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
>  line 1259, in from_runner_api
>  part = context.transforms.get_by_id(transform_id)
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",
>  line 115, in get_by_id
>  self._id_to_obj[id] = self._obj_type.from_runner_api(
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
>  line 1259, in from_runner_api
>  part = context.transforms.get_by_id(transform_id)
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",
>  line 115, in get_by_id
>  self._id_to_obj[id] = self._obj_type.from_runner_api(
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
>  line 1259, in from_runner_api
>  part = context.transforms.get_by_id(transform_id)
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",
>  line 115, in get_by_id
>  self._id_to_obj[id] = self._obj_type.from_runner_api(
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
>  line 1259, in from_runner_api
>  part = context.transforms.get_by_id(transform_id)
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",
>  line 115, in get_by_id
>  self._id_to_obj[id] = self._obj_type.from_runner_api(
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
>  line 1236, in from_runner_api
>  transform = ptransform.PTransform.from_runner_api(proto, context)
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py",
>  line 700, in from_runner_api
>  return constructor(
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/core.py",
>  line 1419, in from_runner_api_parameter
>  DoFnInfo.from_runner_api(
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/core.py",
>  line 1493, in from_runner_api
>  raise ValueError('Unexpected DoFn type: %s' % spec.urn)
>  ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1
> {code}
>  
> If I am not wrong, the problem is with the serialization methods. I have 
> tried all sorts of combinations that I have found on 
> [this|https://kafka.apache.org/26/javadoc/org/apache/kafka/common/serialization/]
>  page.
> When I do not specify the serializers then I get this error: {{RuntimeError:}}
> {code:java}
> Traceback (most recent call last):Traceback (most recent call last):  File 
> "/home/denes/data-science/try_write_to_kafka.py", line 48, in <module>    
> run_pipeline(beam_options)  File 
> "/home/denes/data-science/try_write_to_kafka.py", line 14, in run_pipeline    
> WriteToKafka(  File 
> "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/pvalue.py",
>  line 141, in __or__    return self.pipeline.apply(ptransform, self)  File 
> "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
>  line 689, in apply    pvalueish_result = self.runner.apply(transform, 
> pvalueish, self._options)  File 
> "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/runners/runner.py",
>  line 188, in apply    return m(transform, input, options)  File 
> "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/runners/runner.py",
>  line 218, in apply_PTransform    return transform.expand(input)  File 
> "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/transforms/external.py",
>  line 318, in expand    raise RuntimeError(response.error)RuntimeError: 
> java.lang.ClassCastException: class org.apache.beam.sdk.coders.VarLongCoder 
> cannot be cast to class org.apache.beam.sdk.coders.KvCoder 
> (org.apache.beam.sdk.coders.VarLongCoder and 
> org.apache.beam.sdk.coders.KvCoder are in unnamed module of loader 'app') at 
> org.apache.beam.sdk.io.kafka.KafkaIO$Write.expand(KafkaIO.java:2295) at 
> org.apache.beam.sdk.io.kafka.KafkaIO$Write.expand(KafkaIO.java:2088) at 
> org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:547) at 
> org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:498) at 
> org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(ExpansionService.java:360)
>  at 
> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:436)
>  at 
> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:491)
>  at 
> org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:232)
>  at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
>  at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
>  at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
>  at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>  at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:834)
> {code}
> {{Note that I have installed the latest apache-beam version  via `pip install 
> 'apache-beam'`}}.
> apache-beam==2.28.0



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to