Author: chirino
Date: Fri Feb 26 17:14:40 2010
New Revision: 916762
URL: http://svn.apache.org/viewvc?rev=916762&view=rev
Log:
Better selector synchonization to help resolve AMQ-2440
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java?rev=916762&r1=916761&r2=916762&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
Fri Feb 26 17:14:40 2010
@@ -61,15 +61,25 @@
public synchronized SelectorSelection register(SocketChannel
socketChannel, Listener listener)
throws IOException {
- SelectorWorker worker = null;
- if (freeWorkers.size() > 0) {
- worker = freeWorkers.getFirst();
- } else {
- worker = new SelectorWorker(this);
- freeWorkers.addFirst(worker);
+ SelectorSelection selection = null;
+ while( selection == null ) {
+ if (freeWorkers.size() > 0) {
+ SelectorWorker worker = freeWorkers.getFirst();
+ if( worker.isReleased() ) {
+ freeWorkers.remove(worker);
+ } else {
+ worker.retain();
+ selection = new SelectorSelection(worker, socketChannel,
listener);
+ }
+
+ } else {
+ // Worker starts /w retain count of 1
+ SelectorWorker worker = new SelectorWorker(this);
+ freeWorkers.addFirst(worker);
+ selection = new SelectorSelection(worker, socketChannel,
listener);
+ }
}
-
- SelectorSelection selection = new SelectorSelection(worker,
socketChannel, listener);
+
return selection;
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java?rev=916762&r1=916761&r2=916762&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
Fri Feb 26 17:14:40 2010
@@ -16,10 +16,11 @@
*/
package org.apache.activemq.transport.nio;
-import java.io.IOException;
+import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.transport.nio.SelectorManager.Listener;
@@ -29,23 +30,23 @@
public final class SelectorSelection {
private final SelectorWorker worker;
- private final SelectionKey key;
private final Listener listener;
private int interest;
+ private SelectionKey key;
+ private AtomicBoolean closed = new AtomicBoolean();
- public SelectorSelection(SelectorWorker worker, SocketChannel
socketChannel, Listener listener) throws ClosedChannelException {
+ public SelectorSelection(final SelectorWorker worker, final SocketChannel
socketChannel, Listener listener) throws ClosedChannelException {
this.worker = worker;
this.listener = listener;
-
- // Lock when mutating state of the selector
- worker.lock();
-
- try {
- this.key = socketChannel.register(worker.selector, 0, this);
- worker.incrementUseCounter();
- } finally {
- worker.unlock();
- }
+ worker.addIoTask(new Runnable() {
+ public void run() {
+ try {
+ SelectorSelection.this.key =
socketChannel.register(worker.selector, 0, SelectorSelection.this);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
}
public void setInterestOps(int ops) {
@@ -53,29 +54,39 @@
}
public void enable() {
- key.interestOps(interest);
- worker.selector.wakeup();
+ worker.addIoTask(new Runnable() {
+ public void run() {
+ try {
+ key.interestOps(interest);
+ } catch (CancelledKeyException e) {
+ }
+ }
+ });
}
public void disable() {
- if (key.isValid()) {
- key.interestOps(0);
- }
+ worker.addIoTask(new Runnable() {
+ public void run() {
+ try {
+ key.interestOps(0);
+ } catch (CancelledKeyException e) {
+ }
+ }
+ });
}
public void close() {
- worker.decrementUseCounter();
-
- // Lock when mutating state of the selector
- worker.lock();
- try {
- key.cancel();
- if (!worker.isRunning()) {
- worker.close();
- }
- } catch (IOException e) {
- } finally {
- worker.unlock();
+ // guard against multiple closes.
+ if( closed.compareAndSet(false, true) ) {
+ worker.addIoTask(new Runnable() {
+ public void run() {
+ try {
+ key.cancel();
+ } catch (CancelledKeyException e) {
+ }
+ worker.release();
+ }
+ });
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java?rev=916762&r1=916761&r2=916762&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
Fri Feb 26 17:14:40 2010
@@ -17,14 +17,12 @@
package org.apache.activemq.transport.nio;
import java.io.IOException;
-import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
public class SelectorWorker implements Runnable {
@@ -33,56 +31,69 @@
final SelectorManager manager;
final Selector selector;
final int id = NEXT_ID.getAndIncrement();
- final AtomicInteger useCounter = new AtomicInteger();
private final int maxChannelsPerWorker;
- private final ReadWriteLock selectorLock = new ReentrantReadWriteLock();
+
+ final AtomicInteger retainCounter = new AtomicInteger(1);
+ private final ConcurrentLinkedQueue<Runnable> ioTasks = new
ConcurrentLinkedQueue<Runnable>();
public SelectorWorker(SelectorManager manager) throws IOException {
this.manager = manager;
selector = Selector.open();
maxChannelsPerWorker = manager.getMaxChannelsPerWorker();
+ manager.getSelectorExecutor().execute(this);
}
- void incrementUseCounter() {
- int use = useCounter.getAndIncrement();
- if (use == 0) {
- manager.getSelectorExecutor().execute(this);
- } else if (use + 1 == maxChannelsPerWorker) {
+ void retain() {
+ if (retainCounter.incrementAndGet() == maxChannelsPerWorker) {
manager.onWorkerFullEvent(this);
}
}
- void decrementUseCounter() {
- int use = useCounter.getAndDecrement();
- if (use == 1) {
+ void release() {
+ int use = retainCounter.decrementAndGet();
+ if (use == 0) {
manager.onWorkerEmptyEvent(this);
- } else if (use == maxChannelsPerWorker) {
+ } else if (use < maxChannelsPerWorker) {
manager.onWorkerNotFullEvent(this);
}
}
+
+ boolean isReleased() {
+ return retainCounter.get()==0;
+ }
+
- boolean isRunning() {
- return useCounter.get() != 0;
+ public void addIoTask(Runnable work) {
+ ioTasks.add(work);
+ selector.wakeup();
+ }
+
+ private void processIoTasks() {
+ Runnable task;
+ while( (task= ioTasks.poll()) !=null ) {
+ try {
+ task.run();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
}
+
+
public void run() {
String origName = Thread.currentThread().getName();
try {
Thread.currentThread().setName("Selector Worker: " + id);
- while (isRunning()) {
-
- lockBarrier();
+ while (!isReleased()) {
+ processIoTasks();
int count = selector.select(10);
if (count == 0) {
continue;
}
- if (!isRunning()) {
- return;
- }
-
// Get a java.util.Set containing the SelectionKey objects
// for all channels that are ready for I/O.
Set keys = selector.selectedKeys();
@@ -93,7 +104,9 @@
final SelectorSelection s =
(SelectorSelection)key.attachment();
try {
- s.disable();
+ if( key.isValid() ) {
+ key.interestOps(0);
+ }
// Kick off another thread to find newly selected keys
// while we process the
@@ -116,13 +129,8 @@
}
}
- } catch (ClosedSelectorException cse) {
- // Don't accept any more selections
- manager.onWorkerEmptyEvent(this);
- } catch (IOException e) {
- // Don't accept any more selections
- manager.onWorkerEmptyEvent(this);
-
+
+ } catch (Throwable e) {
// Notify all the selections that the error occurred.
Set keys = selector.keys();
for (Iterator i = keys.iterator(); i.hasNext();) {
@@ -132,25 +140,13 @@
}
} finally {
+ try {
+ manager.onWorkerEmptyEvent(this);
+ selector.close();
+ } catch (IOException ignore) {
+ }
Thread.currentThread().setName(origName);
}
}
- private void lockBarrier() {
- selectorLock.writeLock().lock();
- selectorLock.writeLock().unlock();
- }
-
- public void lock() {
- selectorLock.readLock().lock();
- selector.wakeup();
- }
-
- public void unlock() {
- selectorLock.readLock().unlock();
- }
-
- public void close() throws IOException {
- selector.close();
- }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java?rev=916762&r1=916761&r2=916762&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
Fri Feb 26 17:14:40 2010
@@ -25,10 +25,6 @@
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.HashMap;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.activemq.transport.stomp.Stomp.Headers.Subscribe;
public class StompConnection {