[ https://issues.apache.org/jira/browse/FLINK-4051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15326810#comment-15326810 ]
Subhankar Biswas edited comment on FLINK-4051 at 6/13/16 4:31 AM: ------------------------------------------------------------------ nextDelivery() use take() method of BlockingQueue, while nextDelivery(timeout) use poll() method. {code:java} //Code of take public E takeFirst() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { E x; while ( (x = unlinkFirst()) == null) notEmpty.await(); return x; } finally { lock.unlock(); } } {code} {code:java} //Code of Poll public E pollFirst(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { E x; while ( (x = unlinkFirst()) == null) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } return x; } finally { lock.unlock(); } } {code} So if the queue is empty the thread will be await mode. was (Author: neo20iitkgp): {code:java}nextDelivery(){code} use take() method of BlockingQueue, while nextDelivery(timeout) use poll() method. {code:java} //Code of take public E takeFirst() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { E x; while ( (x = unlinkFirst()) == null) notEmpty.await(); return x; } finally { lock.unlock(); } } {code} {code:java} //Code of Poll public E pollFirst(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { E x; while ( (x = unlinkFirst()) == null) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } return x; } finally { lock.unlock(); } } {code} So if the queue is empty the thread will be await mode. > RabbitMQ Source might not react to cancel signal > ------------------------------------------------ > > Key: FLINK-4051 > URL: https://issues.apache.org/jira/browse/FLINK-4051 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors > Reporter: Robert Metzger > Assignee: Subhankar Biswas > > As reported here > https://issues.apache.org/jira/browse/FLINK-3763?focusedCommentId=15322517&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15322517, > the RabbitMQ source might block forever / ignore the cancelling signal, if > its listening to an empty queue. > Fix: call nextDelivery() with a timeout. -- This message was sent by Atlassian JIRA (v6.3.4#6332)