This is an automated email from the ASF dual-hosted git repository.

lresende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e5b1cae  [BAHIR-190] Fixed premature exit on empty queue
e5b1cae is described below

commit e5b1cae62f0fb47b15ff69d354b779b6072c27cf
Author: Krystex <[email protected]>
AuthorDate: Thu May 9 12:11:28 2019 +0200

    [BAHIR-190] Fixed premature exit on empty queue
    
    When the source queue has no more messages, the job
    doesn't exit anymore. This was a problem with ActiveMQ.
    
    Closes #53
---
 .../java/org/apache/flink/streaming/connectors/activemq/AMQSource.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
 
b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
index 0c43956..8b8c948 100644
--- 
a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
+++ 
b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
@@ -219,7 +219,7 @@ public class AMQSource<OUT> extends 
MessageAcknowledgingSourceBase<OUT, String>
             Message message = consumer.receive(1000);
             if (! (message instanceof BytesMessage)) {
                 LOG.warn("Active MQ source received non bytes message: {}", 
message);
-                return;
+                continue;
             }
             BytesMessage bytesMessage = (BytesMessage) message;
             byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];

Reply via email to