[ 
https://issues.apache.org/jira/browse/FLINK-4051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15326810#comment-15326810
 ] 

Subhankar Biswas edited comment on FLINK-4051 at 6/13/16 4:31 AM:
------------------------------------------------------------------

nextDelivery() use take() method of BlockingQueue, while nextDelivery(timeout) 
use poll() method. 
{code:java}
//Code of take
public E takeFirst() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E x;
            while ( (x = unlinkFirst()) == null)
                notEmpty.await();
            return x;
        } finally {
            lock.unlock();
        }
    }
{code}
{code:java}
//Code of Poll 
public E pollFirst(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            E x;
            while ( (x = unlinkFirst()) == null) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return x;
        } finally {
            lock.unlock();
        }
    }
{code}
So if the queue is empty the thread will be await mode.


was (Author: neo20iitkgp):
 {code:java}nextDelivery(){code} use take() method of BlockingQueue, while 
nextDelivery(timeout) use poll() method. 
{code:java}
//Code of take
public E takeFirst() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E x;
            while ( (x = unlinkFirst()) == null)
                notEmpty.await();
            return x;
        } finally {
            lock.unlock();
        }
    }
{code}
{code:java}
//Code of Poll 
public E pollFirst(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            E x;
            while ( (x = unlinkFirst()) == null) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return x;
        } finally {
            lock.unlock();
        }
    }
{code}
So if the queue is empty the thread will be await mode.

> RabbitMQ Source might not react to cancel signal
> ------------------------------------------------
>
>                 Key: FLINK-4051
>                 URL: https://issues.apache.org/jira/browse/FLINK-4051
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>            Reporter: Robert Metzger
>            Assignee: Subhankar Biswas
>
> As reported here 
> https://issues.apache.org/jira/browse/FLINK-3763?focusedCommentId=15322517&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15322517,
>  the RabbitMQ source might block forever / ignore the cancelling signal, if 
> its listening to an empty queue.
> Fix: call nextDelivery() with a timeout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to