[
https://issues.apache.org/jira/browse/FLINK-4051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15326810#comment-15326810
]
Subhankar Biswas commented on FLINK-4051:
-----------------------------------------
nextDelivery() use take() method of BlockingQueue, while nextDelivery(timeout)
use poll() method.
{code:java}
//Code of take
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
{code}
{code:java}
//Code of Poll
public E pollFirst(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
E x;
while ( (x = unlinkFirst()) == null) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return x;
} finally {
lock.unlock();
}
}
{code}
So if the queue is empty the thread will be await mode.
> RabbitMQ Source might not react to cancel signal
> ------------------------------------------------
>
> Key: FLINK-4051
> URL: https://issues.apache.org/jira/browse/FLINK-4051
> Project: Flink
> Issue Type: Bug
> Components: Streaming Connectors
> Reporter: Robert Metzger
> Assignee: Subhankar Biswas
>
> As reported here
> https://issues.apache.org/jira/browse/FLINK-3763?focusedCommentId=15322517&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15322517,
> the RabbitMQ source might block forever / ignore the cancelling signal, if
> its listening to an empty queue.
> Fix: call nextDelivery() with a timeout.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)