[ 
https://issues.apache.org/jira/browse/BAHIR-190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luciano Resende resolved BAHIR-190.
-----------------------------------
       Resolution: Fixed
    Fix Version/s: Flink-Next

> ActiveMQ connector stops on empty queue
> ---------------------------------------
>
>                 Key: BAHIR-190
>                 URL: https://issues.apache.org/jira/browse/BAHIR-190
>             Project: Bahir
>          Issue Type: Bug
>          Components: Flink Streaming Connectors
>    Affects Versions: Flink-1.0
>            Reporter: Stephan Brosinski
>            Priority: Critical
>             Fix For: Flink-Next
>
>
> I tried the ActiveMQ Flink Connector. Reading from an ActiveMQ queue, it 
> seems to connector exits once there are no more messages in the queue. This 
> ends the Flink job processing the stream.
> To me it seems, that the while loop inside the run method (AMQSource.java, 
> line 222) should not do a return, but a continue if the message is no 
> instance of ByteMessage, e.g. null.
> If I'm right, I can create a pull request showing the change.
> To reproduce:
>  
> {code:java}
> ActiveMQConnectionFactory connectionFactory = new 
> ActiveMQConnectionFactory("xxx", "xxx", 
> "tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");
> AMQSourceConfig<String> amqConfig = new 
> AMQSourceConfig.AMQSourceConfigBuilder<String>()
>         .setConnectionFactory(connectionFactory)
>         .setDestinationName("test")
>         .setDestinationType(DestinationType.QUEUE)
>         .setDeserializationSchema(new SimpleStringSchema())
>         .build();
> AMQSource<String> amqSource = new AMQSource<>(amqConfig);
> env.addSource(amqSource).print()
> env.setParallelism(1).execute("ActiveMQ Consumer");{code}
> Then point the Flink job at an empty ActiveMQ queue.
>  
> Not sure if this is a bug, but it's not what I expected when I used the 
> connector.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to