[
https://issues.apache.org/jira/browse/BEAM-11862?focusedWorklogId=570233&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-570233
]
ASF GitHub Bot logged work on BEAM-11862:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 23/Mar/21 05:00
Start Date: 23/Mar/21 05:00
Worklog Time Spent: 10m
Work Description: codecov[bot] edited a comment on pull request #14306:
URL: https://github.com/apache/beam/pull/14306#issuecomment-804613009
# [Codecov](https://codecov.io/gh/apache/beam/pull/14306?src=pr&el=h1) Report
> Merging
[#14306](https://codecov.io/gh/apache/beam/pull/14306?src=pr&el=desc) (e334045)
into
[release-2.29.0](https://codecov.io/gh/apache/beam/commit/9a2fbcb7eba6348795b8801162d8afcff5013123?el=desc)
(9a2fbcb) will **decrease** coverage by `0.00%`.
> The diff coverage is `n/a`.
[](https://codecov.io/gh/apache/beam/pull/14306?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## release-2.29.0 #14306 +/- ##
==================================================
- Coverage 83.16% 83.15% -0.01%
==================================================
Files 469 469
Lines 58977 58983 +6
==================================================
+ Hits 49046 49049 +3
- Misses 9931 9934 +3
```
| [Impacted
Files](https://codecov.io/gh/apache/beam/pull/14306?src=pr&el=tree) | Coverage
Δ | |
|---|---|---|
|
[...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/14306/diff?src=pr&el=tree#diff-YmVhbV9QcmVDb21taXRfUHl0aG9uX0NvbW1pdC9zcmMvc2Rrcy9weXRob24vdGVzdC1zdWl0ZXMvdG94L3B5MzgvYnVpbGQvc3Jjcy9zZGtzL3B5dGhvbi9hcGFjaGVfYmVhbS9ydW5uZXJzL3dvcmtlci9kYXRhX3BsYW5lLnB5)
| `89.52% <0.00%> (-1.80%)` | :arrow_down: |
|
[...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/14306/diff?src=pr&el=tree#diff-YmVhbV9QcmVDb21taXRfUHl0aG9uX0NvbW1pdC9zcmMvc2Rrcy9weXRob24vdGVzdC1zdWl0ZXMvdG94L3B5MzgvYnVpbGQvc3Jjcy9zZGtzL3B5dGhvbi9hcGFjaGVfYmVhbS9ydW5uZXJzL3dvcmtlci9idW5kbGVfcHJvY2Vzc29yLnB5)
| `93.74% <0.00%> (-0.13%)` | :arrow_down: |
|
[...y38/build/srcs/sdks/python/apache\_beam/pipeline.py](https://codecov.io/gh/apache/beam/pull/14306/diff?src=pr&el=tree#diff-YmVhbV9QcmVDb21taXRfUHl0aG9uX0NvbW1pdC9zcmMvc2Rrcy9weXRob24vdGVzdC1zdWl0ZXMvdG94L3B5MzgvYnVpbGQvc3Jjcy9zZGtzL3B5dGhvbi9hcGFjaGVfYmVhbS9waXBlbGluZS5weQ==)
| `91.31% <0.00%> (+0.08%)` | :arrow_up: |
|
[...ks/python/apache\_beam/runners/worker/sdk\_worker.py](https://codecov.io/gh/apache/beam/pull/14306/diff?src=pr&el=tree#diff-YmVhbV9QcmVDb21taXRfUHl0aG9uX0NvbW1pdC9zcmMvc2Rrcy9weXRob24vdGVzdC1zdWl0ZXMvdG94L3B5MzgvYnVpbGQvc3Jjcy9zZGtzL3B5dGhvbi9hcGFjaGVfYmVhbS9ydW5uZXJzL3dvcmtlci9zZGtfd29ya2VyLnB5)
| `89.69% <0.00%> (+0.15%)` | :arrow_up: |
|
[...38/build/srcs/sdks/python/apache\_beam/io/iobase.py](https://codecov.io/gh/apache/beam/pull/14306/diff?src=pr&el=tree#diff-YmVhbV9QcmVDb21taXRfUHl0aG9uX0NvbW1pdC9zcmMvc2Rrcy9weXRob24vdGVzdC1zdWl0ZXMvdG94L3B5MzgvYnVpbGQvc3Jjcy9zZGtzL3B5dGhvbi9hcGFjaGVfYmVhbS9pby9pb2Jhc2UucHk=)
| `84.81% <0.00%> (+0.26%)` | :arrow_up: |
|
[...eam/runners/interactive/interactive\_environment.py](https://codecov.io/gh/apache/beam/pull/14306/diff?src=pr&el=tree#diff-YmVhbV9QcmVDb21taXRfUHl0aG9uX0NvbW1pdC9zcmMvc2Rrcy9weXRob24vdGVzdC1zdWl0ZXMvdG94L3B5MzgvYnVpbGQvc3Jjcy9zZGtzL3B5dGhvbi9hcGFjaGVfYmVhbS9ydW5uZXJzL2ludGVyYWN0aXZlL2ludGVyYWN0aXZlX2Vudmlyb25tZW50LnB5)
| `90.74% <0.00%> (+0.37%)` | :arrow_up: |
|
[...rcs/sdks/python/apache\_beam/transforms/external.py](https://codecov.io/gh/apache/beam/pull/14306/diff?src=pr&el=tree#diff-YmVhbV9QcmVDb21taXRfUHl0aG9uX0NvbW1pdC9zcmMvc2Rrcy9weXRob24vdGVzdC1zdWl0ZXMvdG94L3B5MzgvYnVpbGQvc3Jjcy9zZGtzL3B5dGhvbi9hcGFjaGVfYmVhbS90cmFuc2Zvcm1zL2V4dGVybmFsLnB5)
| `72.48% <0.00%> (+0.38%)` | :arrow_up: |
------
[Continue to review full report at
Codecov](https://codecov.io/gh/apache/beam/pull/14306?src=pr&el=continue).
> **Legend** - [Click here to learn
more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by
[Codecov](https://codecov.io/gh/apache/beam/pull/14306?src=pr&el=footer). Last
update
[adf43d9...e334045](https://codecov.io/gh/apache/beam/pull/14306?src=pr&el=lastupdated).
Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 570233)
Time Spent: 2h 50m (was: 2h 40m)
> Write To Kafka does not work
> ----------------------------
>
> Key: BEAM-11862
> URL: https://issues.apache.org/jira/browse/BEAM-11862
> Project: Beam
> Issue Type: Bug
> Components: cross-language, io-py-kafka
> Affects Versions: 2.28.0
> Reporter: Dénes Bartha
> Assignee: Chamikara Madhusanka Jayalath
> Priority: P1
> Fix For: 2.29.0
>
> Time Spent: 2h 50m
> Remaining Estimate: 0h
>
> I am trying to send data to a Kafka topic in Python using {{WriteToKafka}}
> via Apache Beam using Dataflow as a runner.
> By running the following script:
> {code:java}
> with beam.Pipeline(options=beam_options) as p:
> (p
> | beam.Impulse()
> | beam.Map(lambda input: (1, input))
> | WriteToKafka(
> producer_config={
> 'bootstrap.servers': 'ip:9092,',
> },
> topic='testclient',
>
> key_serializer='org.apache.kafka.common.serialization.LongSerializer',
>
> value_serializer='org.apache.kafka.common.serialization.ByteArraySerializer',
> )
> )
> {code}
> I am getting this error:
>
> {code:java}
> Traceback (most recent call last):
> File "/home/denes/data-science/try_write_to_kafka.py", line 75, in <module>
> run_pipeline(beam_options)
> File "/home/denes/data-science/try_write_to_kafka.py", line 38, in
> run_pipeline
> (p
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
> line 582, in _exit_
> self.result = self.run()
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
> line 529, in run
> return Pipeline.from_runner_api(
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
> line 904, in from_runner_api
> p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",
> line 115, in get_by_id
> self._id_to_obj[id] = self._obj_type.from_runner_api(
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
> line 1259, in from_runner_api
> part = context.transforms.get_by_id(transform_id)
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",
> line 115, in get_by_id
> self._id_to_obj[id] = self._obj_type.from_runner_api(
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
> line 1259, in from_runner_api
> part = context.transforms.get_by_id(transform_id)
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",
> line 115, in get_by_id
> self._id_to_obj[id] = self._obj_type.from_runner_api(
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
> line 1259, in from_runner_api
> part = context.transforms.get_by_id(transform_id)
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",
> line 115, in get_by_id
> self._id_to_obj[id] = self._obj_type.from_runner_api(
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
> line 1259, in from_runner_api
> part = context.transforms.get_by_id(transform_id)
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",
> line 115, in get_by_id
> self._id_to_obj[id] = self._obj_type.from_runner_api(
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
> line 1236, in from_runner_api
> transform = ptransform.PTransform.from_runner_api(proto, context)
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py",
> line 700, in from_runner_api
> return constructor(
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/core.py",
> line 1419, in from_runner_api_parameter
> DoFnInfo.from_runner_api(
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/core.py",
> line 1493, in from_runner_api
> raise ValueError('Unexpected DoFn type: %s' % spec.urn)
> ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1
> {code}
>
> If I am not wrong, the problem is with the serialization methods. I have
> tried all sorts of combinations that I have found on
> [this|https://kafka.apache.org/26/javadoc/org/apache/kafka/common/serialization/]
> page.
> When I do not specify the serializers then I get this error: {{RuntimeError:}}
> {code:java}
> Traceback (most recent call last):Traceback (most recent call last): File
> "/home/denes/data-science/try_write_to_kafka.py", line 48, in <module>
> run_pipeline(beam_options) File
> "/home/denes/data-science/try_write_to_kafka.py", line 14, in run_pipeline
> WriteToKafka( File
> "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/pvalue.py",
> line 141, in __or__ return self.pipeline.apply(ptransform, self) File
> "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
> line 689, in apply pvalueish_result = self.runner.apply(transform,
> pvalueish, self._options) File
> "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/runners/runner.py",
> line 188, in apply return m(transform, input, options) File
> "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/runners/runner.py",
> line 218, in apply_PTransform return transform.expand(input) File
> "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/transforms/external.py",
> line 318, in expand raise RuntimeError(response.error)RuntimeError:
> java.lang.ClassCastException: class org.apache.beam.sdk.coders.VarLongCoder
> cannot be cast to class org.apache.beam.sdk.coders.KvCoder
> (org.apache.beam.sdk.coders.VarLongCoder and
> org.apache.beam.sdk.coders.KvCoder are in unnamed module of loader 'app') at
> org.apache.beam.sdk.io.kafka.KafkaIO$Write.expand(KafkaIO.java:2295) at
> org.apache.beam.sdk.io.kafka.KafkaIO$Write.expand(KafkaIO.java:2088) at
> org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:547) at
> org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:498) at
> org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(ExpansionService.java:360)
> at
> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:436)
> at
> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:491)
> at
> org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:232)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:834)
> {code}
> {{Note that I have installed the latest apache-beam version via `pip install
> 'apache-beam'`}}.
> apache-beam==2.28.0
--
This message was sent by Atlassian Jira
(v8.3.4#803005)