[
https://issues.apache.org/jira/browse/BEAM-7427?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mourad updated BEAM-7427:
-------------------------
Description:
I get the following exception when reading from unbounded JMS Source:
{code:java}
Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
at org.apache.avro.Schema.validateName(Schema.java:1151)
at org.apache.avro.Schema.access$200(Schema.java:81)
at org.apache.avro.Schema$Field.<init>(Schema.java:403)
at org.apache.avro.Schema$Field.<init>(Schema.java:396)
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
at
avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
at
avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
at
avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
at
avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
{code}
The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to
generate schema.
JmsIO config :
{code:java}
PCollection<DFAMessage> messages = pipeline.apply("read messages from the
events broker", JmsIO.<DFAMessage>readMessage()
.withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic())
.withMessageMapper(new DFAMessageMapper())
.withCoder(AvroCoder.of(DFAMessage.class)));
{code}
was:
I get the following exception when reading from unbounded JMS Source:
{code:java}
Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
at org.apache.avro.Schema.validateName(Schema.java:1151)
at org.apache.avro.Schema.access$200(Schema.java:81)
at org.apache.avro.Schema$Field.<init>(Schema.java:403)
at org.apache.avro.Schema$Field.<init>(Schema.java:396)
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
at
avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
at
avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
at
avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
at
avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
{code}
The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to
generate schema.
JmsIO config :
{{PCollection<DFAMessage> messages = pipeline.apply("read messages from the
events broker", JmsIO.<DFAMessage>readMessage()
.withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic())
.withMessageMapper(new DFAMessageMapper())
.withCoder(AvroCoder.of(DFAMessage.class)));}}
> JmsCheckpointMark AVRO Serialisation issue with unbounded Source
> ----------------------------------------------------------------
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
> Issue Type: Bug
> Components: io-java-jms
> Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
> Reporter: Mourad
> Priority: Major
>
> I get the following exception when reading from unbounded JMS Source:
>
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.<init>(Schema.java:403)
> at org.apache.avro.Schema$Field.<init>(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to
> generate schema.
> JmsIO config :
>
> {code:java}
> PCollection<DFAMessage> messages = pipeline.apply("read messages from the
> events broker", JmsIO.<DFAMessage>readMessage()
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic())
> .withMessageMapper(new DFAMessageMapper())
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>
>
>
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)