Stephan Brosinski created BAHIR-190:
---------------------------------------

             Summary: ActiveMQ connector stops on empty queue
                 Key: BAHIR-190
                 URL: https://issues.apache.org/jira/browse/BAHIR-190
             Project: Bahir
          Issue Type: Bug
          Components: Flink Streaming Connectors
    Affects Versions: Flink-1.0
            Reporter: Stephan Brosinski


I tried the ActiveMQ Flink Connector. Reading from an ActiveMQ queue, it seems 
to connector exits once there are no more messages in the queue. This ends the 
Flink job processing the stream.

To me it seems, that the while loop inside the run method (AMQSource.java, line 
222) should not do a return, but a continue if the message is no instance of 
ByteMessage, e.g. null.

If I'm right, I can create a pull request showing the change.

To reproduce:

 
{code:java}
ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory("xxx", "xxx", 
"tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");

AMQSourceConfig<String> amqConfig = new 
AMQSourceConfig.AMQSourceConfigBuilder<String>()
        .setConnectionFactory(connectionFactory)
        .setDestinationName("test")
        .setDestinationType(DestinationType.QUEUE)
        .setDeserializationSchema(new SimpleStringSchema())
        .build();
AMQSource<String> amqSource = new AMQSource<>(amqConfig);

env.addSource(amqSource).print()
env.setParallelism(1).execute("ActiveMQ Consumer");{code}
Then point the Flink job at an empty ActiveMQ queue.

 

Not sure if this is a bug, but it's not what I expected when I used the 
connector.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to