[ https://issues.apache.org/jira/browse/BEAM-7414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ismaël Mejía updated BEAM-7414: ------------------------------- Status: Open (was: Triage Needed) > RabbitMqMessage can't be serialized due to LongString in headers > ---------------------------------------------------------------- > > Key: BEAM-7414 > URL: https://issues.apache.org/jira/browse/BEAM-7414 > Project: Beam > Issue Type: Bug > Components: io-java-rabbitmq > Affects Versions: 2.12.0 > Environment: dataflow runner > Reporter: Nicolas Delsaux > Assignee: Nicolas Delsaux > Priority: Major > Labels: rabbitmq, serializable > Time Spent: 20m > Remaining Estimate: 0h > > When trying to read messages from RabbitMq, I get systematic > {{{color:#000000}java.lang.IllegalArgumentException: Unable to encode element > 'ValueWithRecordId\{id=[], > value=org.apache.beam.sdk.io.rabbitmq.RabbitMqMessage@234080e1}' with coder > 'ValueWithRecordId$ValueWithRecordIdCoder(org.apache.beam.sdk.coders.SerializableCoder@206641ef)'. > org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300) > org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291) > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:564) > > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:480) > > org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:400) > > org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:125) > > org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:64) > > org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43) > > org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201) > > org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) > > org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1283) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:147) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1020) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) Caused by: > java.io.NotSerializableException: > com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > java.util.HashMap.internalWriteEntries(HashMap.java:1785) > java.util.HashMap.writeObject(HashMap.java:1362) > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > java.lang.reflect.Method.invoke(Method.java:498) > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:183) > > org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:53) > > org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:105) > > org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:99) > > org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:81) > org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:297) > org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291) > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:564) > > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:480) > > org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:400) > > org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:125) > > org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:64) > > org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43) > > org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201) > > org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) > > org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1283) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:147) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1020) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745){color}}} > When using Dataflow runner, due to the fact that RabbitMqMessage headers may > contain LongString values, which don't implement the Serializable java > interface -- This message was sent by Atlassian JIRA (v7.6.3#76005)