[
https://issues.apache.org/jira/browse/BEAM-4409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17010819#comment-17010819
]
Ismaël Mejía edited comment on BEAM-4409 at 1/8/20 4:44 PM:
------------------------------------------------------------
[~kolban] This is related to BEAM-7427 so let's follow the discussion there
since there seems to be more audience on the other ticket.
was (Author: iemejia):
This is related to BEAM-7427 so let's follow the discussion there since there
seems to be more audience on the other ticket.
> NoSuchMethodException reading from JmsIO
> ----------------------------------------
>
> Key: BEAM-4409
> URL: https://issues.apache.org/jira/browse/BEAM-4409
> Project: Beam
> Issue Type: Bug
> Components: io-java-jms
> Affects Versions: 2.4.0
> Environment: Linux, Java 1.8, Beam 2.4, Direct Runner, ActiveMQ
> Reporter: Edward Pricer
> Priority: Major
>
> Running with the DirectRunner, and reading from a queue with JmsIO as an
> unbounded source will produce a NoSuchMethodException. This occurs as the
> UnboundedReadEvaluatorFactory.UnboundedReadEvaluator attempts to clone the
> JmsCheckpointMark with the default (Avro) coder.
> The following trivial code on the reader side reproduces the error
> (DirectRunner must be in path). The messages on the queue for this test case
> were simple TextMessages. I found this exception is triggered more readily
> when messages are published rapidly (~200/second)
> {code:java}
> Pipeline p =
> Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
> // read from the queue
> ConnectionFactory factory = new
> ActiveMQConnectionFactory("tcp://localhost:61616");
> PCollection<String> inputStrings = p.apply("Read from queue",
> JmsIO.<String>readMessage() .withConnectionFactory(factory)
> .withQueue("somequeue") .withCoder(StringUtf8Coder.of())
> .withMessageMapper((JmsIO.MessageMapper<String>) message ->
> ((TextMessage) message).getText()));
> // decode
> PCollection<String> asStrings = inputStrings.apply("Decode Message",
> ParDo.of(new DoFn<String, String>() { @ProcessElement public
> void processElement(ProcessContext context) {
> System.out.println(context.element());
> context.output(context.element()); } }));
> p.run();
> {code}
> Stack trace:
> {code:java}
> Exception in thread "main" java.lang.RuntimeException:
> java.lang.NoSuchMethodException: javax.jms.Message.<init>() at
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353) at
> org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369) at
> org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901) at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at
> org.apache.avro.reflect.ReflectDatumReader.readCollection(ReflectDatumReader.java:219)
> at
> org.apache.avro.reflect.ReflectDatumReader.readArray(ReflectDatumReader.java:137)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177)
> at
> org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:302)
> at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
> at org.apache.beam.sdk.coders.AvroCoder.decode(AvroCoder.java:318) at
> org.apache.beam.sdk.coders.Coder.decode(Coder.java:170) at
> org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:122)
> at
> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:105)
> at
> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:99)
> at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:148) at
> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader(UnboundedReadEvaluatorFactory.java:194)
> at
> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:124)
> at
> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:161)
> at
> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:125)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748) Caused by:
> java.lang.NoSuchMethodException: javax.jms.Message.<init>() at
> java.lang.Class.getConstructor0(Class.java:3082) at
> java.lang.Class.getDeclaredConstructor(Class.java:2178) at
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
> {code}
>
> And a more contrived example of how to produce the exception:
> {code:java}
> package org.apache.beam.sdk.io.jms;
> import org.apache.activemq.command.ActiveMQTextMessage;
> import org.apache.beam.sdk.coders.Coder; import
> org.apache.beam.sdk.util.CoderUtils;
> final class CoderErrorExample { public static void main(String[] args) throws
> Exception {
> Coder coder = new JmsIO.UnboundedJmsSource(null).getCheckpointMarkCoder();
> JmsCheckpointMark checkpointMark = new JmsCheckpointMark();
> checkpointMark.addMessage(new ActiveMQTextMessage());
> CoderUtils.clone(coder, checkpointMark); // from
> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory.UnboundedReadEvaluator#getReader
> }
> }
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)