[ 
https://issues.apache.org/jira/browse/HADOOP-10278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13901017#comment-13901017
 ] 

Hiroshi Ikeda commented on HADOOP-10278:
----------------------------------------

Added here my image before I forget:

{code}
class CallQueue {
        final AtomicInteger sizeRef = new ...
        final Semaphore putSemaphore;
        final Semaphore takeSemaphore = new Semaphore(0);
        final ReadWriteLock internalQueueLock = new ...
        /** The reference is guarded by internalQueueLock. */
        InternalQueue internalQueue;

        CallQueue(int maxQueueSize, IntenralQueue initInternalQueue) {
                putSemaphore = new Semaphore(maxQueueSize);
                intenralQueue = initInteranlQueue;
        }

        /** Implementation must be thread safe. */
        interface InternalQueue {
                /** Returns null if no element. */
                Call poll();
                void offer(Call);
        }

        void replaceInternalQueue(InternalQueue internalQueue) {
                internalQueueLock.writeLock().lock();
                try {
                        Call call;
                        while((call = this.internalQueue.poll()) != null) {
                                interanlQueue.offer(call);
                        }
                        this.internalQueue = internalQueue;
                } finally {
                        intenralQueueLock.writeLock.release();
                }
        }

        void put(Call call) throws InterruptedException {
                putSemaphore.aquire();
                interalQueueLock.readLock().lock();
                try {
                        internalQueue.offer(call);
                } finally {
                        internalQueueLock.readLock().release();
                }
                sizeRef.incrementAndGet();
                takeSemaphore.release();
        }

        Call take() throws InterruptedException {
                Call result;
                takeSemaphore.aquire();
                interalQueueLock.readLock().lock();
                try {
                        result = internalQueue.poll();
                } finally {
                        internalQueueLock.readLock().release();
                }
                sizeRef.decrementAndGet();
                putSemaphore.release();
                return result;
        }
                
        int size() {
                return sizeRef.get();
        }
}

class InternalQueueSimpleImpl implements InternalQueue {
        final ConcurrentLinkedQueue queue = new ...
        @Override Call poll() { return queue.poll(); }
        @Override void offer(Call call) { queue.offer(call); }
}
{code}

> Refactor to make CallQueue pluggable
> ------------------------------------
>
>                 Key: HADOOP-10278
>                 URL: https://issues.apache.org/jira/browse/HADOOP-10278
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: ipc
>            Reporter: Chris Li
>         Attachments: HADOOP-10278-atomicref-adapter.patch, 
> HADOOP-10278-atomicref-rwlock.patch, HADOOP-10278-atomicref.patch, 
> HADOOP-10278-atomicref.patch, HADOOP-10278-atomicref.patch, 
> HADOOP-10278-atomicref.patch, HADOOP-10278.patch, HADOOP-10278.patch
>
>
> * Refactor CallQueue into an interface, base, and default implementation that 
> matches today's behavior
> * Make the call queue impl configurable, keyed on port so that we minimize 
> coupling



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)

Reply via email to