Mourad created BEAM-7427:
----------------------------
Summary: JmsCheckpointMark AVRO Serialisation issue with unbounded
Source
Key: BEAM-7427
URL: https://issues.apache.org/jira/browse/BEAM-7427
Project: Beam
Issue Type: Bug
Components: io-java-jms
Affects Versions: 2.12.0
Environment: Message Broker : solace
JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
Reporter: Mourad
I get the following exception when reading from unbounded JMS Source:
{code:java}
Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
at org.apache.avro.Schema.validateName(Schema.java:1151)
at org.apache.avro.Schema.access$200(Schema.java:81)
at org.apache.avro.Schema$Field.<init>(Schema.java:403)
at org.apache.avro.Schema$Field.<init>(Schema.java:396)
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
at
avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
at
avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
at
avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
at
avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
{code}
The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to
generate schema.
JmsIO config :
{{PCollection<DFAMessage> messages = pipeline.apply("read messages from the
events broker", JmsIO.<DFAMessage>readMessage()
.withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic())
.withMessageMapper(new DFAMessageMapper())
.withCoder(AvroCoder.of(DFAMessage.class)));}}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)