[ https://issues.apache.org/jira/browse/BAHIR-190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Luciano Resende resolved BAHIR-190. ----------------------------------- Resolution: Fixed Fix Version/s: Flink-Next > ActiveMQ connector stops on empty queue > --------------------------------------- > > Key: BAHIR-190 > URL: https://issues.apache.org/jira/browse/BAHIR-190 > Project: Bahir > Issue Type: Bug > Components: Flink Streaming Connectors > Affects Versions: Flink-1.0 > Reporter: Stephan Brosinski > Priority: Critical > Fix For: Flink-Next > > > I tried the ActiveMQ Flink Connector. Reading from an ActiveMQ queue, it > seems to connector exits once there are no more messages in the queue. This > ends the Flink job processing the stream. > To me it seems, that the while loop inside the run method (AMQSource.java, > line 222) should not do a return, but a continue if the message is no > instance of ByteMessage, e.g. null. > If I'm right, I can create a pull request showing the change. > To reproduce: > > {code:java} > ActiveMQConnectionFactory connectionFactory = new > ActiveMQConnectionFactory("xxx", "xxx", > "tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1"); > AMQSourceConfig<String> amqConfig = new > AMQSourceConfig.AMQSourceConfigBuilder<String>() > .setConnectionFactory(connectionFactory) > .setDestinationName("test") > .setDestinationType(DestinationType.QUEUE) > .setDeserializationSchema(new SimpleStringSchema()) > .build(); > AMQSource<String> amqSource = new AMQSource<>(amqConfig); > env.addSource(amqSource).print() > env.setParallelism(1).execute("ActiveMQ Consumer");{code} > Then point the Flink job at an empty ActiveMQ queue. > > Not sure if this is a bug, but it's not what I expected when I used the > connector. -- This message was sent by Atlassian JIRA (v7.6.3#76005)