[ 
https://issues.apache.org/jira/browse/BEAM-11862?focusedWorklogId=570233&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-570233
 ]

ASF GitHub Bot logged work on BEAM-11862:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Mar/21 05:00
            Start Date: 23/Mar/21 05:00
    Worklog Time Spent: 10m 
      Work Description: codecov[bot] edited a comment on pull request #14306:
URL: https://github.com/apache/beam/pull/14306#issuecomment-804613009


   # [Codecov](https://codecov.io/gh/apache/beam/pull/14306?src=pr&el=h1) Report
   > Merging 
[#14306](https://codecov.io/gh/apache/beam/pull/14306?src=pr&el=desc) (e334045) 
into 
[release-2.29.0](https://codecov.io/gh/apache/beam/commit/9a2fbcb7eba6348795b8801162d8afcff5013123?el=desc)
 (9a2fbcb) will **decrease** coverage by `0.00%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree 
graph](https://codecov.io/gh/apache/beam/pull/14306/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/14306?src=pr&el=tree)
   
   ```diff
   @@                Coverage Diff                 @@
   ##           release-2.29.0   #14306      +/-   ##
   ==================================================
   - Coverage           83.16%   83.15%   -0.01%     
   ==================================================
     Files                 469      469              
     Lines               58977    58983       +6     
   ==================================================
   + Hits                49046    49049       +3     
   - Misses               9931     9934       +3     
   ```
   
   
   | [Impacted 
Files](https://codecov.io/gh/apache/beam/pull/14306?src=pr&el=tree) | Coverage 
Δ | |
   |---|---|---|
   | 
[...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/14306/diff?src=pr&el=tree#diff-YmVhbV9QcmVDb21taXRfUHl0aG9uX0NvbW1pdC9zcmMvc2Rrcy9weXRob24vdGVzdC1zdWl0ZXMvdG94L3B5MzgvYnVpbGQvc3Jjcy9zZGtzL3B5dGhvbi9hcGFjaGVfYmVhbS9ydW5uZXJzL3dvcmtlci9kYXRhX3BsYW5lLnB5)
 | `89.52% <0.00%> (-1.80%)` | :arrow_down: |
   | 
[...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/14306/diff?src=pr&el=tree#diff-YmVhbV9QcmVDb21taXRfUHl0aG9uX0NvbW1pdC9zcmMvc2Rrcy9weXRob24vdGVzdC1zdWl0ZXMvdG94L3B5MzgvYnVpbGQvc3Jjcy9zZGtzL3B5dGhvbi9hcGFjaGVfYmVhbS9ydW5uZXJzL3dvcmtlci9idW5kbGVfcHJvY2Vzc29yLnB5)
 | `93.74% <0.00%> (-0.13%)` | :arrow_down: |
   | 
[...y38/build/srcs/sdks/python/apache\_beam/pipeline.py](https://codecov.io/gh/apache/beam/pull/14306/diff?src=pr&el=tree#diff-YmVhbV9QcmVDb21taXRfUHl0aG9uX0NvbW1pdC9zcmMvc2Rrcy9weXRob24vdGVzdC1zdWl0ZXMvdG94L3B5MzgvYnVpbGQvc3Jjcy9zZGtzL3B5dGhvbi9hcGFjaGVfYmVhbS9waXBlbGluZS5weQ==)
 | `91.31% <0.00%> (+0.08%)` | :arrow_up: |
   | 
[...ks/python/apache\_beam/runners/worker/sdk\_worker.py](https://codecov.io/gh/apache/beam/pull/14306/diff?src=pr&el=tree#diff-YmVhbV9QcmVDb21taXRfUHl0aG9uX0NvbW1pdC9zcmMvc2Rrcy9weXRob24vdGVzdC1zdWl0ZXMvdG94L3B5MzgvYnVpbGQvc3Jjcy9zZGtzL3B5dGhvbi9hcGFjaGVfYmVhbS9ydW5uZXJzL3dvcmtlci9zZGtfd29ya2VyLnB5)
 | `89.69% <0.00%> (+0.15%)` | :arrow_up: |
   | 
[...38/build/srcs/sdks/python/apache\_beam/io/iobase.py](https://codecov.io/gh/apache/beam/pull/14306/diff?src=pr&el=tree#diff-YmVhbV9QcmVDb21taXRfUHl0aG9uX0NvbW1pdC9zcmMvc2Rrcy9weXRob24vdGVzdC1zdWl0ZXMvdG94L3B5MzgvYnVpbGQvc3Jjcy9zZGtzL3B5dGhvbi9hcGFjaGVfYmVhbS9pby9pb2Jhc2UucHk=)
 | `84.81% <0.00%> (+0.26%)` | :arrow_up: |
   | 
[...eam/runners/interactive/interactive\_environment.py](https://codecov.io/gh/apache/beam/pull/14306/diff?src=pr&el=tree#diff-YmVhbV9QcmVDb21taXRfUHl0aG9uX0NvbW1pdC9zcmMvc2Rrcy9weXRob24vdGVzdC1zdWl0ZXMvdG94L3B5MzgvYnVpbGQvc3Jjcy9zZGtzL3B5dGhvbi9hcGFjaGVfYmVhbS9ydW5uZXJzL2ludGVyYWN0aXZlL2ludGVyYWN0aXZlX2Vudmlyb25tZW50LnB5)
 | `90.74% <0.00%> (+0.37%)` | :arrow_up: |
   | 
[...rcs/sdks/python/apache\_beam/transforms/external.py](https://codecov.io/gh/apache/beam/pull/14306/diff?src=pr&el=tree#diff-YmVhbV9QcmVDb21taXRfUHl0aG9uX0NvbW1pdC9zcmMvc2Rrcy9weXRob24vdGVzdC1zdWl0ZXMvdG94L3B5MzgvYnVpbGQvc3Jjcy9zZGtzL3B5dGhvbi9hcGFjaGVfYmVhbS90cmFuc2Zvcm1zL2V4dGVybmFsLnB5)
 | `72.48% <0.00%> (+0.38%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at 
Codecov](https://codecov.io/gh/apache/beam/pull/14306?src=pr&el=continue).
   > **Legend** - [Click here to learn 
more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by 
[Codecov](https://codecov.io/gh/apache/beam/pull/14306?src=pr&el=footer). Last 
update 
[adf43d9...e334045](https://codecov.io/gh/apache/beam/pull/14306?src=pr&el=lastupdated).
 Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 570233)
    Time Spent: 2h 50m  (was: 2h 40m)

> Write To Kafka does not work
> ----------------------------
>
>                 Key: BEAM-11862
>                 URL: https://issues.apache.org/jira/browse/BEAM-11862
>             Project: Beam
>          Issue Type: Bug
>          Components: cross-language, io-py-kafka
>    Affects Versions: 2.28.0
>            Reporter: Dénes Bartha
>            Assignee: Chamikara Madhusanka Jayalath
>            Priority: P1
>             Fix For: 2.29.0
>
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> I am trying to send data to a Kafka topic in Python using {{WriteToKafka}} 
> via Apache Beam using Dataflow as a runner.
> By running the following script:
> {code:java}
> with beam.Pipeline(options=beam_options) as p:
>         (p
>         | beam.Impulse()
>         | beam.Map(lambda input: (1, input))
>         | WriteToKafka(
>                     producer_config={
>                         'bootstrap.servers': 'ip:9092,',
>                     },
>                     topic='testclient',
>                     
> key_serializer='org.apache.kafka.common.serialization.LongSerializer',
>                     
> value_serializer='org.apache.kafka.common.serialization.ByteArraySerializer',
>                 )
>          )
> {code}
> I am getting this error:
>  
> {code:java}
> Traceback (most recent call last):
>  File "/home/denes/data-science/try_write_to_kafka.py", line 75, in <module>
>  run_pipeline(beam_options)
>  File "/home/denes/data-science/try_write_to_kafka.py", line 38, in 
> run_pipeline
>  (p
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
>  line 582, in _exit_
>  self.result = self.run()
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
>  line 529, in run
>  return Pipeline.from_runner_api(
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
>  line 904, in from_runner_api
>  p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",
>  line 115, in get_by_id
>  self._id_to_obj[id] = self._obj_type.from_runner_api(
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
>  line 1259, in from_runner_api
>  part = context.transforms.get_by_id(transform_id)
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",
>  line 115, in get_by_id
>  self._id_to_obj[id] = self._obj_type.from_runner_api(
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
>  line 1259, in from_runner_api
>  part = context.transforms.get_by_id(transform_id)
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",
>  line 115, in get_by_id
>  self._id_to_obj[id] = self._obj_type.from_runner_api(
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
>  line 1259, in from_runner_api
>  part = context.transforms.get_by_id(transform_id)
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",
>  line 115, in get_by_id
>  self._id_to_obj[id] = self._obj_type.from_runner_api(
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
>  line 1259, in from_runner_api
>  part = context.transforms.get_by_id(transform_id)
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",
>  line 115, in get_by_id
>  self._id_to_obj[id] = self._obj_type.from_runner_api(
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
>  line 1236, in from_runner_api
>  transform = ptransform.PTransform.from_runner_api(proto, context)
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py",
>  line 700, in from_runner_api
>  return constructor(
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/core.py",
>  line 1419, in from_runner_api_parameter
>  DoFnInfo.from_runner_api(
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/core.py",
>  line 1493, in from_runner_api
>  raise ValueError('Unexpected DoFn type: %s' % spec.urn)
>  ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1
> {code}
>  
> If I am not wrong, the problem is with the serialization methods. I have 
> tried all sorts of combinations that I have found on 
> [this|https://kafka.apache.org/26/javadoc/org/apache/kafka/common/serialization/]
>  page.
> When I do not specify the serializers then I get this error: {{RuntimeError:}}
> {code:java}
> Traceback (most recent call last):Traceback (most recent call last):  File 
> "/home/denes/data-science/try_write_to_kafka.py", line 48, in <module>    
> run_pipeline(beam_options)  File 
> "/home/denes/data-science/try_write_to_kafka.py", line 14, in run_pipeline    
> WriteToKafka(  File 
> "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/pvalue.py",
>  line 141, in __or__    return self.pipeline.apply(ptransform, self)  File 
> "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
>  line 689, in apply    pvalueish_result = self.runner.apply(transform, 
> pvalueish, self._options)  File 
> "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/runners/runner.py",
>  line 188, in apply    return m(transform, input, options)  File 
> "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/runners/runner.py",
>  line 218, in apply_PTransform    return transform.expand(input)  File 
> "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/transforms/external.py",
>  line 318, in expand    raise RuntimeError(response.error)RuntimeError: 
> java.lang.ClassCastException: class org.apache.beam.sdk.coders.VarLongCoder 
> cannot be cast to class org.apache.beam.sdk.coders.KvCoder 
> (org.apache.beam.sdk.coders.VarLongCoder and 
> org.apache.beam.sdk.coders.KvCoder are in unnamed module of loader 'app') at 
> org.apache.beam.sdk.io.kafka.KafkaIO$Write.expand(KafkaIO.java:2295) at 
> org.apache.beam.sdk.io.kafka.KafkaIO$Write.expand(KafkaIO.java:2088) at 
> org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:547) at 
> org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:498) at 
> org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(ExpansionService.java:360)
>  at 
> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:436)
>  at 
> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:491)
>  at 
> org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:232)
>  at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
>  at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
>  at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
>  at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>  at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:834)
> {code}
> {{Note that I have installed the latest apache-beam version  via `pip install 
> 'apache-beam'`}}.
> apache-beam==2.28.0



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to