[
https://issues.apache.org/jira/browse/BEAM-7414?focusedWorklogId=248006&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-248006
]
ASF GitHub Bot logged work on BEAM-7414:
----------------------------------------
Author: ASF GitHub Bot
Created on: 24/May/19 12:25
Start Date: 24/May/19 12:25
Worklog Time Spent: 10m
Work Description: Riduidel commented on pull request #8677: [BEAM-7414]
fix for message being not serializable due to LongString in headers
URL: https://github.com/apache/beam/pull/8677
This is a fix for https://issues.apache.org/jira/browse/BEAM-7414
Basically, when receiving a RabbitMqMessage, I copy all its delivery fields
into a new delivery object, which allows me to replace LongString by String.
The try/catch loop is there for cases where LongString objects are too long
to be transformed into String
Code can be reviewed by @jbonofre or @aromanenko-dev which are both aware of
the issue (but i guesse @jbonofre would be a better reviewer)
Notice that i'm trying to communicate with RabbitMq team in parallel to have
issue fixed in a more elegant way on their side
------------------------
Thank you for your contribution! Follow this checklist to help us
incorporate your contribution quickly and easily:
- [ ] [**Choose
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA
issue, if applicable. This will automatically link the pull request to the
issue.
- [ ] If this contribution is large, please file an Apache [Individual
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
Post-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
--- | --- | --- | --- | --- | --- | --- | ---
Go | [](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
| --- | --- | --- | --- | --- | ---
Java | [](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)
Python | [](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/)
| --- | [](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
<br> [](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)
| --- | --- | ---
Pre-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
--- |Java | Python | Go | Website
--- | --- | --- | --- | ---
Non-portable | [](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
Portable | --- | [](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/)
| --- | ---
See
[.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md)
for trigger phrase, status and link of all Jenkins jobs.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 248006)
Time Spent: 10m
Remaining Estimate: 0h
> RabbitMqMessage can't be serialized due to LongString in headers
> ----------------------------------------------------------------
>
> Key: BEAM-7414
> URL: https://issues.apache.org/jira/browse/BEAM-7414
> Project: Beam
> Issue Type: Bug
> Components: io-java-rabbitmq
> Affects Versions: 2.12.0
> Environment: dataflow runner
> Reporter: Nicolas Delsaux
> Priority: Major
> Labels: rabbitmq, serializable
> Time Spent: 10m
> Remaining Estimate: 0h
>
> When trying to read messages from RabbitMq, I get systematic
> {{{color:#000000}java.lang.IllegalArgumentException: Unable to encode element
> 'ValueWithRecordId\{id=[],
> value=org.apache.beam.sdk.io.rabbitmq.RabbitMqMessage@234080e1}' with coder
> 'ValueWithRecordId$ValueWithRecordIdCoder(org.apache.beam.sdk.coders.SerializableCoder@206641ef)'.
> org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300)
> org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:564)
>
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:480)
>
> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:400)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:125)
>
> org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:64)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1283)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:147)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1020)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745) Caused by:
> java.io.NotSerializableException:
> com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> java.util.HashMap.internalWriteEntries(HashMap.java:1785)
> java.util.HashMap.writeObject(HashMap.java:1362)
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> java.lang.reflect.Method.invoke(Method.java:498)
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:183)
>
> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:53)
>
> org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:105)
>
> org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:99)
>
> org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:81)
> org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:297)
> org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:564)
>
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:480)
>
> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:400)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:125)
>
> org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:64)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1283)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:147)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1020)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745){color}}}
> When using Dataflow runner, due to the fact that RabbitMqMessage headers may
> contain LongString values, which don't implement the Serializable java
> interface
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)