[ https://issues.apache.org/jira/browse/BAHIR-190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16851345#comment-16851345 ]
ASF GitHub Bot commented on BAHIR-190: -------------------------------------- lresende commented on issue #53: [BAHIR-190] [activemq] Fixed premature exit on empty queue URL: https://github.com/apache/bahir-flink/pull/53#issuecomment-497126213 @Krystex Nothing wrong, your PR has been merged (manually) and you can see the info above: `asfgit closed this in e5b1cae 12 days ago`. Github sometimes displays `closed` versus `merged` particularly when using some local way for merging. Also, if you want to create a jira account, so I can assign the issue to yourself, that might be useful towards committership in the project. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ActiveMQ connector stops on empty queue > --------------------------------------- > > Key: BAHIR-190 > URL: https://issues.apache.org/jira/browse/BAHIR-190 > Project: Bahir > Issue Type: Bug > Components: Flink Streaming Connectors > Affects Versions: Flink-1.0 > Reporter: Stephan Brosinski > Priority: Critical > Fix For: Flink-Next > > > I tried the ActiveMQ Flink Connector. Reading from an ActiveMQ queue, it > seems to connector exits once there are no more messages in the queue. This > ends the Flink job processing the stream. > To me it seems, that the while loop inside the run method (AMQSource.java, > line 222) should not do a return, but a continue if the message is no > instance of ByteMessage, e.g. null. > If I'm right, I can create a pull request showing the change. > To reproduce: > > {code:java} > ActiveMQConnectionFactory connectionFactory = new > ActiveMQConnectionFactory("xxx", "xxx", > "tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1"); > AMQSourceConfig<String> amqConfig = new > AMQSourceConfig.AMQSourceConfigBuilder<String>() > .setConnectionFactory(connectionFactory) > .setDestinationName("test") > .setDestinationType(DestinationType.QUEUE) > .setDeserializationSchema(new SimpleStringSchema()) > .build(); > AMQSource<String> amqSource = new AMQSource<>(amqConfig); > env.addSource(amqSource).print() > env.setParallelism(1).execute("ActiveMQ Consumer");{code} > Then point the Flink job at an empty ActiveMQ queue. > > Not sure if this is a bug, but it's not what I expected when I used the > connector. -- This message was sent by Atlassian JIRA (v7.6.3#76005)