[
https://issues.apache.org/jira/browse/HADOOP-10278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13901017#comment-13901017
]
Hiroshi Ikeda commented on HADOOP-10278:
----------------------------------------
Added here my image before I forget:
{code}
class CallQueue {
final AtomicInteger sizeRef = new ...
final Semaphore putSemaphore;
final Semaphore takeSemaphore = new Semaphore(0);
final ReadWriteLock internalQueueLock = new ...
/** The reference is guarded by internalQueueLock. */
InternalQueue internalQueue;
CallQueue(int maxQueueSize, IntenralQueue initInternalQueue) {
putSemaphore = new Semaphore(maxQueueSize);
intenralQueue = initInteranlQueue;
}
/** Implementation must be thread safe. */
interface InternalQueue {
/** Returns null if no element. */
Call poll();
void offer(Call);
}
void replaceInternalQueue(InternalQueue internalQueue) {
internalQueueLock.writeLock().lock();
try {
Call call;
while((call = this.internalQueue.poll()) != null) {
interanlQueue.offer(call);
}
this.internalQueue = internalQueue;
} finally {
intenralQueueLock.writeLock.release();
}
}
void put(Call call) throws InterruptedException {
putSemaphore.aquire();
interalQueueLock.readLock().lock();
try {
internalQueue.offer(call);
} finally {
internalQueueLock.readLock().release();
}
sizeRef.incrementAndGet();
takeSemaphore.release();
}
Call take() throws InterruptedException {
Call result;
takeSemaphore.aquire();
interalQueueLock.readLock().lock();
try {
result = internalQueue.poll();
} finally {
internalQueueLock.readLock().release();
}
sizeRef.decrementAndGet();
putSemaphore.release();
return result;
}
int size() {
return sizeRef.get();
}
}
class InternalQueueSimpleImpl implements InternalQueue {
final ConcurrentLinkedQueue queue = new ...
@Override Call poll() { return queue.poll(); }
@Override void offer(Call call) { queue.offer(call); }
}
{code}
> Refactor to make CallQueue pluggable
> ------------------------------------
>
> Key: HADOOP-10278
> URL: https://issues.apache.org/jira/browse/HADOOP-10278
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: ipc
> Reporter: Chris Li
> Attachments: HADOOP-10278-atomicref-adapter.patch,
> HADOOP-10278-atomicref-rwlock.patch, HADOOP-10278-atomicref.patch,
> HADOOP-10278-atomicref.patch, HADOOP-10278-atomicref.patch,
> HADOOP-10278-atomicref.patch, HADOOP-10278.patch, HADOOP-10278.patch
>
>
> * Refactor CallQueue into an interface, base, and default implementation that
> matches today's behavior
> * Make the call queue impl configurable, keyed on port so that we minimize
> coupling
--
This message was sent by Atlassian JIRA
(v6.1.5#6160)