[ 
https://issues.apache.org/jira/browse/FLINK-4051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15326810#comment-15326810
 ] 

Subhankar Biswas edited comment on FLINK-4051 at 6/13/16 4:43 AM:
------------------------------------------------------------------

nextDelivery() use take() method of BlockingQueue, while nextDelivery(timeout) 
use poll() method. 
{code:java}
//Code of take
   lock.lock();
   try {
         E x;
         while ( (x = unlinkFirst()) == null)
             notEmpty.await();
         return x;
    } finally {
        lock.unlock();
    }
{code}
{code:java}
//Code of Poll
 lock.lockInterruptibly();
 try {
       E x;
       while ( (x = unlinkFirst()) == null) {
            if (nanos <= 0)
                    return null;
        nanos = notEmpty.awaitNanos(nanos);
       }
     return x;
   } finally {
    lock.unlock();
   }
{code}
So if the queue is empty the thread will be await mode. IMO a good 
implementation is to use the poll(timeout, unit).


was (Author: neo20iitkgp):
nextDelivery() use take() method of BlockingQueue, while nextDelivery(timeout) 
use poll() method. 
{code:java}
//Code of take
   lock.lock();
   try {
         E x;
         while ( (x = unlinkFirst()) == null)
             notEmpty.await();
         return x;
    } finally {
        lock.unlock();
    }
{code}
{code:java}
//Code of Poll
 lock.lockInterruptibly();
 try {
       E x;
       while ( (x = unlinkFirst()) == null) {
            if (nanos <= 0)
                    return null;
        nanos = notEmpty.awaitNanos(nanos);
       }
     return x;
   } finally {
    lock.unlock();
   }
{code}
So if the queue is empty the thread will be await mode.

> RabbitMQ Source might not react to cancel signal
> ------------------------------------------------
>
>                 Key: FLINK-4051
>                 URL: https://issues.apache.org/jira/browse/FLINK-4051
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>            Reporter: Robert Metzger
>            Assignee: Subhankar Biswas
>
> As reported here 
> https://issues.apache.org/jira/browse/FLINK-3763?focusedCommentId=15322517&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15322517,
>  the RabbitMQ source might block forever / ignore the cancelling signal, if 
> its listening to an empty queue.
> Fix: call nextDelivery() with a timeout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to