[
https://issues.apache.org/jira/browse/BEAM-7414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847407#comment-16847407
]
Nicolas Delsaux commented on BEAM-7414:
---------------------------------------
I have a working solution in
[https://github.com/Riduidel/beam/tree/fix/rabbitmq-message-not-serializable]
> RabbitMqMessage can't be serialized due to LongString in headers
> ----------------------------------------------------------------
>
> Key: BEAM-7414
> URL: https://issues.apache.org/jira/browse/BEAM-7414
> Project: Beam
> Issue Type: Bug
> Components: io-java-rabbitmq
> Affects Versions: 2.12.0
> Environment: dataflow runner
> Reporter: Nicolas Delsaux
> Priority: Major
> Labels: rabbitmq, serializable
>
> When trying to read messages from RabbitMq, I get systematic
> {{{color:#000000}java.lang.IllegalArgumentException: Unable to encode element
> 'ValueWithRecordId\{id=[],
> value=org.apache.beam.sdk.io.rabbitmq.RabbitMqMessage@234080e1}' with coder
> 'ValueWithRecordId$ValueWithRecordIdCoder(org.apache.beam.sdk.coders.SerializableCoder@206641ef)'.
> org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300)
> org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:564)
>
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:480)
>
> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:400)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:125)
>
> org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:64)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1283)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:147)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1020)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745) Caused by:
> java.io.NotSerializableException:
> com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> java.util.HashMap.internalWriteEntries(HashMap.java:1785)
> java.util.HashMap.writeObject(HashMap.java:1362)
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> java.lang.reflect.Method.invoke(Method.java:498)
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:183)
>
> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:53)
>
> org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:105)
>
> org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:99)
>
> org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:81)
> org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:297)
> org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:564)
>
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:480)
>
> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:400)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:125)
>
> org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:64)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1283)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:147)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1020)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745){color}}}
> When using Dataflow runner, due to the fact that RabbitMqMessage headers may
> contain LongString values, which don't implement the Serializable java
> interface
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)