Author: gsim
Date: Mon Dec 10 22:14:29 2012
New Revision: 1419837
URL: http://svn.apache.org/viewvc?rev=1419837&view=rev
Log:
PROTON-118: align driver more closely with c equivalent
Modified:
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/Connector.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/Driver.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/Listener.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ListenerImpl.java
Modified:
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/Connector.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/Connector.java?rev=1419837&r1=1419836&r2=1419837&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/Connector.java
(original)
+++
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/Connector.java
Mon Dec 10 22:14:29 2012
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.proton.driver;
+import java.io.IOException;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Sasl;
@@ -36,7 +37,7 @@ public interface Connector<C>
* the connector.
*
*/
- void process();
+ void process() throws IOException;
/** Access the listener which opened this connector.
*
Modified:
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/Driver.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/Driver.java?rev=1419837&r1=1419836&r2=1419837&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/Driver.java
(original)
+++
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/Driver.java
Mon Dec 10 22:14:29 2012
@@ -51,7 +51,7 @@ public interface Driver
* @param timeout maximum time in milliseconds to wait.
* 0 means infinite wait
*/
- void doWait(int timeout);
+ void doWait(long timeout);
/**
* Get the next listener with pending data in the driver.
@@ -120,4 +120,14 @@ public interface Driver
* @return a new connector to the given host:port, NULL if error.
*/
<C> Connector<C> createConnector(SelectableChannel fd, C context);
+
+ /**
+ * Return an iterator over all listeners.
+ */
+ Iterable<Listener> listeners();
+ /**
+ * Return an iterator over all connectors.
+ */
+ Iterable<Connector> connectors();
+
}
Modified:
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/Listener.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/Listener.java?rev=1419837&r1=1419836&r2=1419837&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/Listener.java
(original)
+++
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/Listener.java
Mon Dec 10 22:14:29 2012
@@ -49,13 +49,5 @@ public interface Listener<C>
* Close the socket used by the listener.
*
*/
- void close();
-
- /**
- * Destructor for the given listener.
- *
- * Assumes the listener's socket has been closed prior to call.
- *
- */
- void destroy();
+ void close() throws java.io.IOException;
}
Modified:
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java?rev=1419837&r1=1419836&r2=1419837&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
(original)
+++
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
Mon Dec 10 22:14:29 2012
@@ -29,13 +29,11 @@ import org.apache.qpid.proton.driver.Con
import org.apache.qpid.proton.driver.Listener;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Sasl;
-import org.apache.qpid.proton.engine.Sasl.SaslState;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.impl.TransportFactory;
class ConnectorImpl<C> implements Connector<C>
{
- public static int END_OF_STREAM = -1;
private static int DEFAULT_BUFFER_SIZE = 64 * 1024;
private static int readBufferSize = Integer.getInteger
("pn.receive_buffer_size", DEFAULT_BUFFER_SIZE);
@@ -44,7 +42,6 @@ class ConnectorImpl<C> implements Connec
enum ConnectorState {UNINITIALIZED, OPENED, EOS, CLOSED};
- private final Sasl _sasl;
private final DriverImpl _driver;
private final Listener<C> _listener;
private final SocketChannel _channel;
@@ -52,218 +49,111 @@ class ConnectorImpl<C> implements Connec
private C _context;
private Connection _connection;
+ private Transport _transport = null;
private SelectionKey _key;
private ConnectorState _state = UNINITIALIZED;
private ByteBuffer _readBuffer = ByteBuffer.allocate(readBufferSize);
- private int _bytesNotRead = 0;
-
- private int _bytesNotWritten = 0;
private ByteBuffer _writeBuffer = ByteBuffer.allocate(writeBufferSize);
- private Transport _transport = null;
+ private boolean _readPending;
- ConnectorImpl(DriverImpl driver, Listener<C> listener, Sasl sasl,
SocketChannel c, C context, SelectionKey key)
+ ConnectorImpl(DriverImpl driver, Listener<C> listener, SocketChannel c, C
context, SelectionKey key)
{
_driver = driver;
_listener = listener;
_channel = c;
- _sasl = sasl;
_context = context;
_key = key;
}
- public void process()
+ void selected()
{
- if (_channel.isConnectionPending())
- {
- try
- {
- _channel.finishConnect();
- }
- catch (IOException io)
- {
- throw new RuntimeException("Exception will trying to complete
connection",io);
- }
- }
-
- if (!_channel.isOpen())
- {
- _state = ConnectorState.CLOSED;
- return;
- }
-
- if (_key.isReadable())
- {
- read();
- }
- write();
+ _readPending = true;
}
- void read()
+ public void process() throws IOException
{
- try
+ if (_channel.isOpen() && _channel.finishConnect())
{
- int bytesRead = _channel.read(_readBuffer);
- int consumed = 0;
- while (bytesRead > 0)
- {
- consumed = processInput(_readBuffer.array(), 0, bytesRead +
_bytesNotRead);
- if (consumed < bytesRead)
- {
- _readBuffer.compact();
- _bytesNotRead = bytesRead - consumed;
- }
- else
- {
- _readBuffer.rewind();
- _bytesNotRead = 0;
- }
- bytesRead = _channel.read(_readBuffer);
- }
- if (bytesRead == -1)
+ if (_readPending)
{
- _state = ConnectorState.EOS;
+ read();
+ _readPending = false;
+ if (isClosed()) return;
}
- }
- catch (IOException e)
- {
- _logger.log(Level.SEVERE, "Exception when reading from
connection",e);
+ write();
}
}
- void write()
+ void read() throws IOException
{
- try
+ int bytesRead = 0;
+ while ((bytesRead = _channel.read(_readBuffer)) > 0)
{
- processOutput();
- if (_bytesNotWritten > 0)
+ _readBuffer.flip();
+ int consumed = _transport.input(_readBuffer.array(),
_readBuffer.position(), _readBuffer.limit());
+ _readBuffer.position(consumed == Transport.END_OF_STREAM ?
_readBuffer.limit() : consumed);
+ if (_logger.isLoggable(Level.FINE))
{
- _writeBuffer.limit(_bytesNotWritten);
- int written = _channel.write(_writeBuffer);
- if (_writeBuffer.hasRemaining())
- {
- _writeBuffer.compact();
- _bytesNotWritten = _bytesNotWritten - written;
- }
- else
- {
- _writeBuffer.clear();
- _bytesNotWritten = 0;
- }
- if (_bytesNotWritten > 0) // couldn't write all the data, need
to know when we could write again.
- {
- _key.interestOps(_key.interestOps() |
SelectionKey.OP_WRITE);
- }
- else if ((_key.interestOps() & SelectionKey.OP_WRITE) != 0)
- {
- _key.interestOps(_key.interestOps() &
~SelectionKey.OP_WRITE);
- }
+ _logger.log(Level.FINE, "consumed " + consumed + " bytes, " +
_readBuffer.remaining() + " available");
}
+ _readBuffer.compact();
}
- catch (IOException e)
- {
- _logger.log(Level.SEVERE, "Exception when writing to
connection",e);
+ if (bytesRead == -1) {
+ close();
}
}
- int processInput(byte[] bytes, int offset, int size)
+ void write() throws IOException
{
- int read = 0;
- while (read < size)
+ int interest = _key.interestOps();
+ int start = _writeBuffer.position();
+ boolean done = false;
+ while (!done)
{
- offset = read;
- switch (_state)
+ int produced = _transport.output(_writeBuffer.array(),
_writeBuffer.position(), _writeBuffer.remaining());
+ _writeBuffer.position(_writeBuffer.position() + produced);
+ _writeBuffer.flip();
+ int wrote = _channel.write(_writeBuffer);
+ if (_logger.isLoggable(Level.FINE))
{
- case UNINITIALIZED:
- read += readSasl(bytes, offset, size - offset);
- if (isSaslDone())
- {
- _state = _sasl.getState() == SaslState.PN_SASL_PASS ?
ConnectorState.OPENED : ConnectorState.CLOSED;
- }
- break;
- case OPENED:
- read += readAMQPCommands(bytes, offset, size - offset);
- break;
- case EOS:
- case CLOSED:
- break;
+ _logger.log(Level.FINE, "wrote " + wrote + " bytes, " +
_writeBuffer.remaining() + " remaining");
}
- }
- return read;
- }
-
- void processOutput()
- {
- switch (_state)
- {
- case UNINITIALIZED:
- writeSasl();
- if (isSaslDone())
+ _writeBuffer.compact();
+ if (_writeBuffer.position() > 0)
{
- _state = _sasl.getState() == SaslState.PN_SASL_PASS ?
ConnectorState.OPENED : ConnectorState.CLOSED;
+ //weren't able to write all available data, ask to be notfied
when we can write again
+ interest |= SelectionKey.OP_WRITE;
+ done = true;
+ }
+ else
+ {
+ //we are done if buffer was empty to begin with and we did not
produce enough to fill it
+ interest &= ~SelectionKey.OP_WRITE;
+ done = start == 0 && produced < _writeBuffer.capacity();
+ start = 0;
}
- break;
- case OPENED:
- writeAMQPCommands();
- break;
- case EOS:
- writeAMQPCommands();
- case CLOSED: // not a valid option
- //TODO
- break;
- }
- }
-
- int readAMQPCommands(byte[] bytes, int offset, int size)
- {
- int consumed = _transport.input(bytes, offset, size);
- if (consumed == END_OF_STREAM)
- {
- return size;
- }
- else
- {
- return consumed;
}
+ _key.interestOps(interest);
}
- void writeAMQPCommands()
+ public Listener<C> listener()
{
- int size = _writeBuffer.array().length - _bytesNotWritten;
- _bytesNotWritten += _transport.output(_writeBuffer.array(),
- _bytesNotWritten, size);
+ return _listener;
}
- int readSasl(byte[] bytes, int offset, int size)
+ public Sasl sasl()
{
- int consumed = _sasl.input(bytes, offset, size);
- if (consumed == END_OF_STREAM)
+ if (_transport != null)
{
- return size;
+ return _transport.sasl();
}
else
{
- return consumed;
+ return null;
}
}
- void writeSasl()
- {
- int size = _writeBuffer.array().length - _bytesNotWritten;
- _bytesNotWritten += _sasl.output(_writeBuffer.array(),
- _bytesNotWritten, size);
- }
-
- public Listener<C> listener()
- {
- return _listener;
- }
-
- public Sasl sasl()
- {
- return _sasl;
- }
-
public Connection getConnection()
{
return _connection;
@@ -287,44 +177,29 @@ class ConnectorImpl<C> implements Connec
public void close()
{
- if (_state == ConnectorState.CLOSED)
+ if (!isClosed())
{
- return;
- }
-
- try
- {
- // If the connection was closed due to authentication error
- // then there might be data available to write on to the wire.
- writeSasl();
- writeAMQPCommands(); // write any closing commands
- _channel.close();
- _state = ConnectorState.CLOSED;
- }
- catch (IOException e)
- {
- _logger.log(Level.SEVERE, "Exception when closing connection",e);
+ try
+ {
+ write();
+ _channel.close();
+ }
+ catch (IOException e)
+ {
+ _logger.log(Level.SEVERE, "Exception when closing
connection",e);
+ }
}
}
public boolean isClosed()
{
- return _state == ConnectorState.EOS || _state == ConnectorState.CLOSED;
+ boolean result = !(_channel.isOpen() && _channel.isConnected());
+ return result;
}
public void destroy()
{
close(); // close if not closed already
- }
-
- private void setState(ConnectorState newState)
- {
- _state = newState;
- }
-
- private boolean isSaslDone()
- {
- SaslState state = _sasl.getState();
- return state == SaslState.PN_SASL_PASS || state ==
SaslState.PN_SASL_FAIL;
+ _driver.removeConnector(this);
}
}
Modified:
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java?rev=1419837&r1=1419836&r2=1419837&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
(original)
+++
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
Mon Dec 10 22:14:29 2012
@@ -29,8 +29,10 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
+import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -38,12 +40,13 @@ import java.util.logging.Logger;
import org.apache.qpid.proton.driver.Connector;
import org.apache.qpid.proton.driver.Driver;
import org.apache.qpid.proton.driver.Listener;
-import org.apache.qpid.proton.engine.impl.SaslImpl;
public class DriverImpl implements Driver
{
private Selector _selector;
private Set<SelectionKey> _selectedKeys = Collections.emptySet();
+ private Collection<Listener> _listeners = new LinkedList();
+ private Collection<Connector> _connectors = new LinkedList();
private Logger _logger = Logger.getLogger("proton.driver");
public DriverImpl() throws IOException
@@ -56,7 +59,7 @@ public class DriverImpl implements Drive
_selector.wakeup();
}
- public void doWait(int timeout)
+ public void doWait(long timeout)
{
try
{
@@ -146,8 +149,9 @@ public class DriverImpl implements Drive
selectedIter.remove();
if(key.isReadable() || key.isWritable())
{
- return (Connector) key.attachment();
-
+ ConnectorImpl c = (ConnectorImpl) key.attachment();
+ c.selected();
+ return c;
}
}
return null;
@@ -165,6 +169,8 @@ public class DriverImpl implements Drive
_logger.log(Level.SEVERE, "Exception when closing selector",e);
throw new RuntimeException(e);
}
+ _listeners.clear();
+ _connectors.clear();
}
public <C> Listener<C> createListener(String host, int port, C context)
@@ -193,6 +199,7 @@ public class DriverImpl implements Drive
Listener<C> l = new ListenerImpl<C>(this, c, context);
SelectionKey key = registerInterest(c,SelectionKey.OP_ACCEPT);
key.attach(l);
+ _listeners.add(l);
return l;
}
@@ -215,21 +222,34 @@ public class DriverImpl implements Drive
public <C> Connector<C> createConnector(SelectableChannel c, C context)
{
- SelectionKey key = registerInterest(c,SelectionKey.OP_READ);
- SaslImpl sasl = new SaslImpl();
- sasl.client();
- Connector<C> co = new ConnectorImpl<C>(this, null,
sasl,(SocketChannel)c, context, key);
+ SelectionKey key = registerInterest(c, SelectionKey.OP_READ |
SelectionKey.OP_WRITE);
+ Connector<C> co = new ConnectorImpl<C>(this, null, (SocketChannel)c,
context, key);
key.attach(co);
+ _connectors.add(co);
return co;
}
+ public <C> void removeConnector(Connector<C> c)
+ {
+ _connectors.remove(c);
+ }
+
+ public Iterable<Listener> listeners()
+ {
+ return _listeners;
+ }
+
+ public Iterable<Connector> connectors()
+ {
+ return _connectors;
+ }
+
protected <C> Connector<C> createServerConnector(SelectableChannel c, C
context, Listener<C> l)
{
- SelectionKey key = registerInterest(c,SelectionKey.OP_READ);
- SaslImpl sasl = new SaslImpl();
- sasl.server();
- Connector<C> co = new ConnectorImpl<C>(this, l, sasl,(SocketChannel)c,
context, key);
+ SelectionKey key = registerInterest(c, SelectionKey.OP_READ |
SelectionKey.OP_WRITE);
+ Connector<C> co = new ConnectorImpl<C>(this, l, (SocketChannel)c,
context, key);
key.attach(co);
+ _connectors.add(co);
return co;
}
Modified:
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ListenerImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ListenerImpl.java?rev=1419837&r1=1419836&r2=1419837&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ListenerImpl.java
(original)
+++
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ListenerImpl.java
Mon Dec 10 22:14:29 2012
@@ -66,20 +66,8 @@ class ListenerImpl<C> implements Listene
return _context;
}
- public void close()
+ public void close() throws IOException
{
- try
- {
- _channel.close();
- }
- catch (IOException e)
- {
- _logger.log(Level.SEVERE, "Exception when closing listener",e);
- }
- }
-
- public void destroy()
- {
- close();
+ _channel.socket().close();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]