[ https://issues.apache.org/jira/browse/BAHIR-190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16842621#comment-16842621 ]
ASF subversion and git services commented on BAHIR-190: ------------------------------------------------------- Commit e5b1cae62f0fb47b15ff69d354b779b6072c27cf in bahir-flink's branch refs/heads/master from Krystex [ https://gitbox.apache.org/repos/asf?p=bahir-flink.git;h=e5b1cae ] [BAHIR-190] Fixed premature exit on empty queue When the source queue has no more messages, the job doesn't exit anymore. This was a problem with ActiveMQ. Closes #53 > ActiveMQ connector stops on empty queue > --------------------------------------- > > Key: BAHIR-190 > URL: https://issues.apache.org/jira/browse/BAHIR-190 > Project: Bahir > Issue Type: Bug > Components: Flink Streaming Connectors > Affects Versions: Flink-1.0 > Reporter: Stephan Brosinski > Priority: Critical > > I tried the ActiveMQ Flink Connector. Reading from an ActiveMQ queue, it > seems to connector exits once there are no more messages in the queue. This > ends the Flink job processing the stream. > To me it seems, that the while loop inside the run method (AMQSource.java, > line 222) should not do a return, but a continue if the message is no > instance of ByteMessage, e.g. null. > If I'm right, I can create a pull request showing the change. > To reproduce: > > {code:java} > ActiveMQConnectionFactory connectionFactory = new > ActiveMQConnectionFactory("xxx", "xxx", > "tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1"); > AMQSourceConfig<String> amqConfig = new > AMQSourceConfig.AMQSourceConfigBuilder<String>() > .setConnectionFactory(connectionFactory) > .setDestinationName("test") > .setDestinationType(DestinationType.QUEUE) > .setDeserializationSchema(new SimpleStringSchema()) > .build(); > AMQSource<String> amqSource = new AMQSource<>(amqConfig); > env.addSource(amqSource).print() > env.setParallelism(1).execute("ActiveMQ Consumer");{code} > Then point the Flink job at an empty ActiveMQ queue. > > Not sure if this is a bug, but it's not what I expected when I used the > connector. -- This message was sent by Atlassian JIRA (v7.6.3#76005)