Stephan Brosinski created BAHIR-190:
---------------------------------------
Summary: ActiveMQ connector stops on empty queue
Key: BAHIR-190
URL: https://issues.apache.org/jira/browse/BAHIR-190
Project: Bahir
Issue Type: Bug
Components: Flink Streaming Connectors
Affects Versions: Flink-1.0
Reporter: Stephan Brosinski
I tried the ActiveMQ Flink Connector. Reading from an ActiveMQ queue, it seems
to connector exits once there are no more messages in the queue. This ends the
Flink job processing the stream.
To me it seems, that the while loop inside the run method (AMQSource.java, line
222) should not do a return, but a continue if the message is no instance of
ByteMessage, e.g. null.
If I'm right, I can create a pull request showing the change.
To reproduce:
{code:java}
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("xxx", "xxx",
"tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");
AMQSourceConfig<String> amqConfig = new
AMQSourceConfig.AMQSourceConfigBuilder<String>()
.setConnectionFactory(connectionFactory)
.setDestinationName("test")
.setDestinationType(DestinationType.QUEUE)
.setDeserializationSchema(new SimpleStringSchema())
.build();
AMQSource<String> amqSource = new AMQSource<>(amqConfig);
env.addSource(amqSource).print()
env.setParallelism(1).execute("ActiveMQ Consumer");{code}
Then point the Flink job at an empty ActiveMQ queue.
Not sure if this is a bug, but it's not what I expected when I used the
connector.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)