[
https://issues.apache.org/jira/browse/BEAM-11862?focusedWorklogId=569546&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-569546
]
ASF GitHub Bot logged work on BEAM-11862:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 22/Mar/21 06:40
Start Date: 22/Mar/21 06:40
Worklog Time Spent: 10m
Work Description: chamikaramj opened a new pull request #14293:
URL: https://github.com/apache/beam/pull/14293
This fixes a bug where some pipelines with external transforms would not
correctly be identified as multi-language pipelines.
Dataflow needs to correctly identify multi-language pipelines to enable
portable job submission (which is required to execute such pipelines).
------------------------
Thank you for your contribution! Follow this checklist to help us
incorporate your contribution quickly and easily:
- [ ] [**Choose
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA
issue, if applicable. This will automatically link the pull request to the
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
See the [Contributor Guide](https://beam.apache.org/contribute) for more
tips on [how to make review process
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
Post-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
--- | --- | --- | --- | --- | --- | ---
Go | [](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
| --- | [](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
| --- | [](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
| ---
Java | [](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
| [](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/)<br>[](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
| [](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
| [](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
| [](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
| [](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
Python | [](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/)
| [](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
| [](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/)
| --- | [](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
| ---
XLang | [](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/)
| [](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/)
| [](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/)
| --- | [](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
| ---
Pre-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
--- |Java | Python | Go | Website | Whitespace | Typescript
--- | --- | --- | --- | --- | --- | ---
Non-portable | [](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/)
| [](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/)
<br>[](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/)
| [](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/)
| [](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
| [](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/)
| [](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
Portable | --- | [](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/)
| --- | --- | --- | ---
See
[.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md)
for trigger phrase, status and link of all Jenkins jobs.
GitHub Actions Tests Status (on master branch)
------------------------------------------------------------------------------------------------
[](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
[](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
[](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more
information about GitHub Actions CI.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 569546)
Remaining Estimate: 0h
Time Spent: 10m
> Write To Kafka does not work
> ----------------------------
>
> Key: BEAM-11862
> URL: https://issues.apache.org/jira/browse/BEAM-11862
> Project: Beam
> Issue Type: Bug
> Components: cross-language, io-py-kafka
> Affects Versions: 2.28.0
> Reporter: Dénes Bartha
> Assignee: Chamikara Madhusanka Jayalath
> Priority: P1
> Fix For: 2.29.0
>
> Time Spent: 10m
> Remaining Estimate: 0h
>
> I am trying to send data to a Kafka topic in Python using {{WriteToKafka}}
> via Apache Beam using Dataflow as a runner.
> By running the following script:
> {code:java}
> with beam.Pipeline(options=beam_options) as p:
> (p
> | beam.Impulse()
> | beam.Map(lambda input: (1, input))
> | WriteToKafka(
> producer_config={
> 'bootstrap.servers': 'ip:9092,',
> },
> topic='testclient',
>
> key_serializer='org.apache.kafka.common.serialization.LongSerializer',
>
> value_serializer='org.apache.kafka.common.serialization.ByteArraySerializer',
> )
> )
> {code}
> I am getting this error:
>
> {code:java}
> Traceback (most recent call last):
> File "/home/denes/data-science/try_write_to_kafka.py", line 75, in <module>
> run_pipeline(beam_options)
> File "/home/denes/data-science/try_write_to_kafka.py", line 38, in
> run_pipeline
> (p
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
> line 582, in _exit_
> self.result = self.run()
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
> line 529, in run
> return Pipeline.from_runner_api(
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
> line 904, in from_runner_api
> p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",
> line 115, in get_by_id
> self._id_to_obj[id] = self._obj_type.from_runner_api(
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
> line 1259, in from_runner_api
> part = context.transforms.get_by_id(transform_id)
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",
> line 115, in get_by_id
> self._id_to_obj[id] = self._obj_type.from_runner_api(
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
> line 1259, in from_runner_api
> part = context.transforms.get_by_id(transform_id)
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",
> line 115, in get_by_id
> self._id_to_obj[id] = self._obj_type.from_runner_api(
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
> line 1259, in from_runner_api
> part = context.transforms.get_by_id(transform_id)
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",
> line 115, in get_by_id
> self._id_to_obj[id] = self._obj_type.from_runner_api(
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
> line 1259, in from_runner_api
> part = context.transforms.get_by_id(transform_id)
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",
> line 115, in get_by_id
> self._id_to_obj[id] = self._obj_type.from_runner_api(
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
> line 1236, in from_runner_api
> transform = ptransform.PTransform.from_runner_api(proto, context)
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py",
> line 700, in from_runner_api
> return constructor(
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/core.py",
> line 1419, in from_runner_api_parameter
> DoFnInfo.from_runner_api(
> File
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/core.py",
> line 1493, in from_runner_api
> raise ValueError('Unexpected DoFn type: %s' % spec.urn)
> ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1
> {code}
>
> If I am not wrong, the problem is with the serialization methods. I have
> tried all sorts of combinations that I have found on
> [this|https://kafka.apache.org/26/javadoc/org/apache/kafka/common/serialization/]
> page.
> When I do not specify the serializers then I get this error: {{RuntimeError:}}
> {code:java}
> Traceback (most recent call last):Traceback (most recent call last): File
> "/home/denes/data-science/try_write_to_kafka.py", line 48, in <module>
> run_pipeline(beam_options) File
> "/home/denes/data-science/try_write_to_kafka.py", line 14, in run_pipeline
> WriteToKafka( File
> "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/pvalue.py",
> line 141, in __or__ return self.pipeline.apply(ptransform, self) File
> "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
> line 689, in apply pvalueish_result = self.runner.apply(transform,
> pvalueish, self._options) File
> "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/runners/runner.py",
> line 188, in apply return m(transform, input, options) File
> "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/runners/runner.py",
> line 218, in apply_PTransform return transform.expand(input) File
> "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/transforms/external.py",
> line 318, in expand raise RuntimeError(response.error)RuntimeError:
> java.lang.ClassCastException: class org.apache.beam.sdk.coders.VarLongCoder
> cannot be cast to class org.apache.beam.sdk.coders.KvCoder
> (org.apache.beam.sdk.coders.VarLongCoder and
> org.apache.beam.sdk.coders.KvCoder are in unnamed module of loader 'app') at
> org.apache.beam.sdk.io.kafka.KafkaIO$Write.expand(KafkaIO.java:2295) at
> org.apache.beam.sdk.io.kafka.KafkaIO$Write.expand(KafkaIO.java:2088) at
> org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:547) at
> org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:498) at
> org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(ExpansionService.java:360)
> at
> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:436)
> at
> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:491)
> at
> org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:232)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:834)
> {code}
> {{Note that I have installed the latest apache-beam version via `pip install
> 'apache-beam'`}}.
> apache-beam==2.28.0
--
This message was sent by Atlassian Jira
(v8.3.4#803005)