Author: rajith
Date: Mon Jun 16 16:09:14 2014
New Revision: 1602913
URL: http://svn.apache.org/r1602913
Log:
PROTON-589
1. Removed dependency on the driver code.
2. Changed the code to use the Collector/Event interface from engine.
3. Added code to support non passive mode.
Added:
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/IoConnection.java
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/ListenerImpl.java
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/SelectableImpl.java
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/SocketListener.java
Modified:
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
Added:
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/IoConnection.java
URL:
http://svn.apache.org/viewvc/qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/IoConnection.java?rev=1602913&view=auto
==============================================================================
---
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/IoConnection.java
(added)
+++
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/IoConnection.java
Mon Jun 16 16:09:14 2014
@@ -0,0 +1,154 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.messenger.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+
+import org.apache.qpid.proton.messenger.MessengerException;
+import org.apache.qpid.proton.messenger.Selectable;
+
+class IoConnection
+{
+ private Selector _selector;
+
+ private SelectionKey _key;
+
+ private SocketChannel _channel;
+
+ IoConnection(Selector selector, String host, int port)
+ {
+ try
+ {
+ _selector = selector;
+ _channel = SocketChannel.open();
+ _channel.connect(new InetSocketAddress(host, port));
+ configureSocket(_channel);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ throw new MessengerException(String.format("Error connecting to
%s:%s", host, port), e);
+ }
+ }
+
+ IoConnection(Selector selector, SocketChannel channel)
+ {
+ try
+ {
+ _selector = selector;
+ _channel = channel;
+ configureSocket(_channel);
+ }
+ catch (Exception e)
+ {
+ throw new MessengerException(String.format("Error configuring
socket channel"), e);
+ }
+ }
+
+ void configureSocket(SocketChannel channel)
+ {
+ try
+ {
+ channel.configureBlocking(false);
+ channel.socket().setTcpNoDelay(true);
+ }
+ catch (Exception e)
+ {
+ throw new MessengerException(String.format("Error configuring
socket channel"), e);
+ }
+ }
+
+ void setSelectable(Selectable selectable)
+ {
+ try
+ {
+ _key = _channel.register(_selector, SelectionKey.OP_READ |
SelectionKey.OP_WRITE);
+ _key.attach(selectable);
+ }
+ catch (Exception e)
+ {
+ throw new MessengerException(String.format("Error registering
socket channel"), e);
+ }
+ }
+
+ public int read(ByteBuffer buf) throws IOException
+ {
+ return _channel.read(buf);
+ }
+
+ public void registerForReadEvents(boolean b)
+ {
+ if (!_channel.isOpen())
+ {
+ return;
+ }
+
+ int interest = _key.interestOps();
+ if (b)
+ {
+ interest |= SelectionKey.OP_READ;
+ }
+ else
+ {
+ interest &= ~SelectionKey.OP_READ;
+ }
+ _key.interestOps(interest);
+ }
+
+ public int write(ByteBuffer buf) throws IOException
+ {
+ return _channel.write(buf);
+ }
+
+ public void registerForWriteEvents(boolean b)
+ {
+ if (!_channel.isOpen())
+ {
+ return;
+ }
+
+ int interest = _key.interestOps();
+ if (b)
+ {
+ interest |= SelectionKey.OP_WRITE;
+ }
+ else
+ {
+ interest &= ~SelectionKey.OP_WRITE;
+ }
+ _key.interestOps(interest);
+ }
+
+ public void close() throws IOException
+ {
+ _channel.close();
+ }
+
+ public boolean isClosed()
+ {
+ return !_channel.isOpen();
+ }
+}
\ No newline at end of file
Added:
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/ListenerImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/ListenerImpl.java?rev=1602913&view=auto
==============================================================================
---
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/ListenerImpl.java
(added)
+++
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/ListenerImpl.java
Mon Jun 16 16:09:14 2014
@@ -0,0 +1,87 @@
+package org.apache.qpid.proton.messenger.impl;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.qpid.proton.messenger.Listener;
+
+public class ListenerImpl implements Listener
+{
+ private String _hostName;
+
+ private int _port;
+
+ private AtomicBoolean _closed = new AtomicBoolean(false);
+
+ private AtomicBoolean _completed = new AtomicBoolean(false);
+
+ private Object _ctx;
+
+ private SocketListener _socketListener;
+
+ ListenerImpl(String hostName, int port)
+ {
+ _hostName = hostName;
+ _port = port;
+ }
+
+ @Override
+ public String getHost()
+ {
+ return _hostName;
+ }
+
+ @Override
+ public int getPort()
+ {
+ return _port;
+ }
+
+ @Override
+ public void close()
+ {
+ _closed.set(true);
+ }
+
+ @Override
+ public boolean isClosed()
+ {
+ return _closed.get();
+ }
+
+ @Override
+ public void markCompleted()
+ {
+ _completed.set(true);
+ }
+
+ @Override
+ public boolean isCompleted()
+ {
+ return _completed.get();
+ }
+
+ void setContext(Object ctx)
+ {
+ _ctx = ctx;
+ }
+
+ Object getContext()
+ {
+ return _ctx;
+ }
+
+ /*----------------------------------------
+ * Used in active mode (passive == false)
+ * Visible only to the impl classes
+ * ---------------------------------------
+ */
+ void setSocketListener(SocketListener socket)
+ {
+ _socketListener = socket;
+ }
+
+ SocketListener getSocketListener()
+ {
+ return _socketListener;
+ }
+}
\ No newline at end of file
Modified:
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1602913&r1=1602912&r2=1602913&view=diff
==============================================================================
---
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
(original)
+++
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
Mon Jun 16 16:09:14 2014
@@ -21,43 +21,48 @@
package org.apache.qpid.proton.messenger.impl;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.InterruptException;
+import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.TimeoutException;
-import org.apache.qpid.proton.driver.Connector;
-import org.apache.qpid.proton.driver.Driver;
-import org.apache.qpid.proton.driver.Listener;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
-import org.apache.qpid.proton.engine.SslDomain;
import org.apache.qpid.proton.engine.Ssl;
+import org.apache.qpid.proton.engine.SslDomain;
import org.apache.qpid.proton.engine.Transport;
+import org.apache.qpid.proton.engine.TransportException;
import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.messenger.Listener;
import org.apache.qpid.proton.messenger.Messenger;
import org.apache.qpid.proton.messenger.MessengerException;
+import org.apache.qpid.proton.messenger.Selectable;
import org.apache.qpid.proton.messenger.Status;
import org.apache.qpid.proton.messenger.Tracker;
-import org.apache.qpid.proton.amqp.messaging.Source;
-import org.apache.qpid.proton.amqp.messaging.Target;
-import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
-import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
-
-import org.apache.qpid.proton.amqp.Binary;
public class MessengerImpl implements Messenger
+
{
private enum LinkCreditMode
{
@@ -76,7 +81,6 @@ public class MessengerImpl implements Me
private long _timeout = -1;
private boolean _blocking = true;
private long _nextTag = 1;
- private Driver _driver;
private LinkCreditMode _credit_mode = LinkCreditMode.LINK_CREDIT_EXPLICIT;
private final int _credit_batch = 1024; // credit_mode ==
LINK_CREDIT_AUTO
private int _credit; // available
@@ -90,7 +94,6 @@ public class MessengerImpl implements Me
private TrackerImpl _outgoingTracker;
private Store _incomingStore = new Store();
private Store _outgoingStore = new Store();
- private List<Connector> _awaitingDestruction = new ArrayList<Connector>();
private int _sendThreshold;
private Transform _routes = new Transform();
@@ -101,7 +104,14 @@ public class MessengerImpl implements Me
private String _password;
private String _trustedDb;
-
+ private Collector _collector = Proton.collector();
+ private boolean _passive = false;
+ private final List<SelectableImpl> _selectables = new
ArrayList<SelectableImpl>();
+ private final List<ListenerImpl> _listeners = new
ArrayList<ListenerImpl>();
+ private Selector _selector;
+ private AtomicBoolean _closed = new AtomicBoolean(true);
+ private AtomicBoolean _interrupted = new AtomicBoolean(false);
+
/**
* @deprecated This constructor's visibility will be reduced to the
default scope in a future release.
* Client code outside this module should use a {@link MessengerFactory}
instead
@@ -118,6 +128,14 @@ public class MessengerImpl implements Me
@Deprecated public MessengerImpl(String name)
{
_name = name;
+ try
+ {
+ _selector = Selector.open();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Unable to create selector",e);
+ }
}
public void setTimeout(long timeInMillis)
@@ -182,36 +200,56 @@ public class MessengerImpl implements Me
public void start() throws IOException
{
- _driver = Proton.driver();
+ _closed.set(false);
}
public void stop()
{
- if (_driver != null) {
- if(_logger.isLoggable(Level.FINE))
- {
- _logger.fine(this + " about to stop");
- }
- //close all connections
- for (Connector<?> c : _driver.connectors())
- {
- Connection connection = c.getConnection();
- connection.close();
- }
- //stop listeners
- for (Listener<?> l : _driver.listeners())
+ if (_closed.get())
+ {
+ return;
+ }
+
+ _closed.set(true);
+ if(_logger.isLoggable(Level.FINE))
+ {
+ _logger.fine(this + " about to stop");
+ }
+
+ //stop listeners
+ for (ListenerImpl l : _listeners)
+ {
+ l.close();
+ if (!_passive)
{
try
{
- l.close();
+ l.getSocketListener().close();
+ l.markCompleted();
}
catch (IOException e)
{
- _logger.log(Level.WARNING, "Error while closing listener",
e);
+ _logger.log(Level.SEVERE, String.format("Error closing
socket listener : %s", e), e);
}
}
- waitUntil(_allClosed);
+ }
+
+ //close all connections.
+ for (SelectableImpl sel : _selectables)
+ {
+ SelectableImpl s = (SelectableImpl)sel;
+ Connection connection = s.getConnection();
+ connection.close();
+ if (!_passive && !s.getNetworkConnection().isClosed())
+ {
+ s.getNetworkConnection().registerForWriteEvents(true);
+ }
+ s.markClosed();
}
+
+ waitUntil(_allClosed);
+ _listeners.clear();
+ _selectables.clear();
}
public boolean stopped()
@@ -221,15 +259,17 @@ public class MessengerImpl implements Me
public boolean work(long timeout) throws TimeoutException
{
- if (_driver == null) { return false; }
+ if (_closed.get()) { return false; }
_worked = false;
return waitUntil(_workPred, timeout);
}
public void interrupt()
{
- if (_driver != null) {
- _driver.wakeup();
+ _interrupted.set(true);
+ if (!_passive)
+ {
+ _selector.wakeup();
}
}
@@ -289,7 +329,7 @@ public class MessengerImpl implements Me
public void put(Message m) throws MessengerException
{
- if (_driver == null) {
+ if (_closed.get()) {
throw new IllegalStateException("cannot put while messenger is
stopped");
}
@@ -396,13 +436,13 @@ public class MessengerImpl implements Me
public void send(int n) throws TimeoutException
{
- if (_driver == null) {
+ if (_closed.get()) {
throw new IllegalStateException("cannot send while messenger is
stopped");
}
if(_logger.isLoggable(Level.FINE))
{
- _logger.fine(this + " about to send");
+ _logger.fine(this + " about to send");
}
if (n == -1)
@@ -419,7 +459,7 @@ public class MessengerImpl implements Me
public void recv(int n) throws TimeoutException
{
- if (_driver == null) {
+ if (_closed.get()) {
throw new IllegalStateException("cannot recv while messenger is
stopped");
}
@@ -527,7 +567,7 @@ public class MessengerImpl implements Me
public void subscribe(String source) throws MessengerException
{
- if (_driver == null) {
+ if (_closed.get()) {
throw new IllegalStateException("messenger is stopped");
}
@@ -544,7 +584,22 @@ public class MessengerImpl implements Me
_logger.fine(this + " about to subscribe to source " + source
+ " using address " + hostName + ":" + port);
}
ListenerContext ctx = new ListenerContext(address);
- _driver.createListener(hostName, port, ctx);
+ ListenerImpl listener = new ListenerImpl(hostName, port);
+ listener.setContext(ctx);
+ if (!_passive)
+ {
+ try
+ {
+ SocketListener socket = new SocketListener(_selector,
listener, hostName , port);
+ listener.setSocketListener(socket);
+ }
+ catch (IOException e)
+ {
+ _logger.log(Level.SEVERE, String.format("Error subscribing
to message source : %s", e), e);
+ throw new MessengerException(String.format("Error
subscribing to message source : %s", e),e);
+ }
+ }
+ _listeners.add(listener);
}
else
{
@@ -644,13 +699,38 @@ public class MessengerImpl implements Me
_rewrites.rule(pattern, address);
}
+ @Override
+ public void setPassive(boolean b)
+ {
+ _passive = b;
+ }
+
+ @Override
+ public boolean isPassive()
+ {
+ return _passive;
+ }
+
+ @Override
+ public List<? extends Selectable> getSelectables()
+ {
+ return _selectables;
+ }
+
+ @Override
+ public List<? extends Listener> getListeners()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
private int queued(boolean outgoing)
{
int count = 0;
- if (_driver != null) {
- for (Connector<?> c : _driver.connectors())
+ if (!_closed.get()) {
+ for (SelectableImpl sel : _selectables)
{
- Connection connection = c.getConnection();
+ Connection connection = sel.getConnection();
for (Link link : new Links(connection, ACTIVE, ANY))
{
if (outgoing)
@@ -667,169 +747,152 @@ public class MessengerImpl implements Me
return count;
}
- private void bringDestruction()
+ private void processEvents()
{
- for (Connector<?> c : _awaitingDestruction)
+ for (Event event = _collector.peek(); event != null; event =
_collector.peek())
{
- c.destroy();
+ switch (event.getType())
+ {
+ case CONNECTION_REMOTE_STATE:
+ case CONNECTION_LOCAL_STATE:
+ processConnection(event.getConnection());
+ break;
+ case SESSION_REMOTE_STATE:
+ case SESSION_LOCAL_STATE:
+ processSession(event.getSession());
+ break;
+ case LINK_REMOTE_STATE:
+ case LINK_LOCAL_STATE:
+ processLink(event.getLink());
+ break;
+ case LINK_FLOW:
+ processFlow(event.getLink());
+ break;
+ case DELIVERY:
+ processDelivery(event.getDelivery());
+ break;
+ case TRANSPORT:
+ distributeCredit();
+ break;
+ }
+ _collector.pop();
}
- _awaitingDestruction.clear();
}
- private void processAllConnectors()
+ private void processConnection(Connection connection)
{
- distributeCredit();
- for (Connector<?> c : _driver.connectors())
+ if ( connection.getRemoteState() == EndpointState.ACTIVE)
{
- processEndpoints(c);
- try
+ if (connection.getLocalState() == EndpointState.UNINITIALIZED)
{
- if (c.process()) {
- _worked = true;
- }
+ connection.open();
}
- catch (IOException e)
+ }
+ if (connection.getRemoteState() == EndpointState.CLOSED)
+ {
+ if (connection.getLocalState() == EndpointState.ACTIVE)
{
- _logger.log(Level.SEVERE, "Error processing connection", e);
+ connection.close();
}
}
- bringDestruction();
- distributeCredit();
+ ConnectionContext ctx = (ConnectionContext)connection.getContext();
+ if (!_passive)
+ {
+
((SelectableImpl)ctx.getSelectable()).getNetworkConnection().registerForWriteEvents(true);
+ }
}
- private void processActive()
+ private void processSession(Session session)
{
- //process active listeners
- for (Listener<?> l = _driver.listener(); l != null; l =
_driver.listener())
+ if (session.getRemoteState() == EndpointState.ACTIVE)
{
- _worked = true;
- Connector<?> c = l.accept();
- Connection connection = Proton.connection();
- connection.setContainer(_name);
- ListenerContext ctx = (ListenerContext) l.getContext();
- connection.setContext(new ConnectionContext(ctx.getAddress(), c));
- c.setConnection(connection);
- Transport transport = c.getTransport();
- //TODO: full SASL
- Sasl sasl = c.sasl();
- if (sasl != null)
+ if (session.getLocalState() == EndpointState.UNINITIALIZED)
{
- sasl.server();
- sasl.setMechanisms(new String[]{"ANONYMOUS"});
- sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
+ session.open();
+ _logger.log(Level.FINE, "Opened session " + session);
}
- transport.ssl(ctx.getDomain());
- connection.open();
}
- // process connectors, reclaiming credit on closed connectors
- for (Connector<?> c = _driver.connector(); c != null; c =
_driver.connector())
+ if (session.getRemoteState() == EndpointState.CLOSED)
{
- _worked = true;
- if (c.isClosed())
- {
- _awaitingDestruction.add(c);
- reclaimCredit(c.getConnection());
- }
- else
+ if (session.getLocalState() == EndpointState.ACTIVE)
{
- _logger.log(Level.FINE, "Processing active connector " + c);
- try
- {
- c.process();
- processEndpoints(c);
- c.process();
- }
- catch (IOException e)
- {
- _logger.log(Level.SEVERE, "Error processing connection",
e);
- }
+ session.close();
}
}
- bringDestruction();
- distributeCredit();
+ ConnectionContext ctx =
(ConnectionContext)session.getConnection().getContext();
+ if (!_passive)
+ {
+
((SelectableImpl)ctx.getSelectable()).getNetworkConnection().registerForWriteEvents(true);
+ }
}
- private void processEndpoints(Connector c)
+ private void processLink(Link link)
{
- Connection connection = c.getConnection();
-
- if (connection.getLocalState() == EndpointState.UNINITIALIZED)
+ if (link.getRemoteState() == EndpointState.ACTIVE)
{
- connection.open();
+ if (link.getLocalState() == EndpointState.UNINITIALIZED)
+ {
+ link.setSource(link.getRemoteSource());
+ link.setTarget(link.getRemoteTarget());
+ linkAdded(link);
+ link.open();
+ _logger.log(Level.FINE, "Opened link " + link);
+ }
}
-
- Delivery delivery = connection.getWorkHead();
- while (delivery != null)
+ if (link.getRemoteState() == EndpointState.CLOSED)
{
- Link link = delivery.getLink();
- if (delivery.isUpdated())
+ if (link.getLocalState() == EndpointState.ACTIVE)
{
- if (link instanceof Sender)
- {
- delivery.disposition(delivery.getRemoteState());
- }
- StoreEntry e = (StoreEntry) delivery.getContext();
- if (e != null) e.updated();
+ link.close();
}
-
- if (delivery.isReadable())
+ else
{
- pumpIn( link.getSource().getAddress(), (Receiver)link );
+ reclaimLink(link);
}
-
- Delivery next = delivery.getWorkNext();
- delivery.clear();
- delivery = next;
}
+ ConnectionContext ctx =
(ConnectionContext)link.getSession().getConnection().getContext();
+ if (!_passive)
+ {
+
((SelectableImpl)ctx.getSelectable()).getNetworkConnection().registerForWriteEvents(true);
+ }
+ }
- for (Session session : new Sessions(connection, UNINIT, ANY))
+ private void processFlow(Link link)
+ {
+ if (link instanceof Sender)
{
- session.open();
- _logger.log(Level.FINE, "Opened session " + session);
+ pumpOut(link.getTarget().getAddress(), (Sender)link);
}
- for (Link link : new Links(connection, UNINIT, ANY))
+ ConnectionContext ctx =
(ConnectionContext)link.getSession().getConnection().getContext();
+ if (!_passive)
{
- //TODO: the following is not correct; should only copy those
properties that we understand
- link.setSource(link.getRemoteSource());
- link.setTarget(link.getRemoteTarget());
- linkAdded(link);
- link.open();
- _logger.log(Level.FINE, "Opened link " + link);
+
((SelectableImpl)ctx.getSelectable()).getNetworkConnection().registerForWriteEvents(true);
}
+ }
- distributeCredit();
-
- for (Link link : new Links(connection, ACTIVE, ACTIVE))
+ private void processDelivery(Delivery delivery)
+ {
+ Link link = delivery.getLink();
+ if (delivery.isUpdated())
{
if (link instanceof Sender)
{
- pumpOut(link.getTarget().getAddress(), (Sender)link);
+ delivery.disposition(delivery.getRemoteState());
}
+ StoreEntry e = (StoreEntry) delivery.getContext();
+ if (e != null) e.updated();
}
- for (Session session : new Sessions(connection, ACTIVE, CLOSED))
- {
- session.close();
- }
-
- for (Link link : new Links(connection, ANY, CLOSED))
+ if (delivery.isReadable())
{
- if (link.getLocalState() == EndpointState.ACTIVE)
- {
- link.close();
- }
- else
- {
- reclaimLink(link);
- }
+ pumpIn( link.getSource().getAddress(), (Receiver)link );
}
- if (connection.getRemoteState() == EndpointState.CLOSED)
+ delivery.clear();
+ ConnectionContext ctx =
(ConnectionContext)link.getSession().getConnection().getContext();
+ if (!_passive)
{
- if (connection.getLocalState() == EndpointState.ACTIVE)
- {
- connection.close();
- }
+
((SelectableImpl)ctx.getSelectable()).getNetworkConnection().registerForWriteEvents(true);
}
}
@@ -851,11 +914,12 @@ public class MessengerImpl implements Me
private boolean waitUntil(Predicate condition, long timeout)
{
- if (_driver == null) {
- throw new IllegalStateException("cannot wait while messenger is
stopped");
- }
+ processEvents();
- processAllConnectors();
+ if (_passive)
+ {
+ return condition.test();
+ }
// wait until timeout expires or until test is true
long now = System.currentTimeMillis();
@@ -865,7 +929,8 @@ public class MessengerImpl implements Me
while (true)
{
done = condition.test();
- if (done) break;
+ if (done)
+ break;
long remaining;
if (timeout < 0)
@@ -884,30 +949,98 @@ public class MessengerImpl implements Me
long wakeup = (_next_drain > now) ? _next_drain - now : 0;
remaining = (remaining == -1) ? wakeup : Math.min(remaining,
wakeup);
}
-
- boolean woken;
- woken = _driver.doWait(remaining);
- processActive();
- if (woken) {
+ processPendingSelectables(remaining);
+ processEvents();
+ if (_interrupted.get())
+ {
+ _interrupted.set(false);
throw new InterruptException();
}
- now = System.currentTimeMillis();
+ now = System.currentTimeMillis();
}
return done;
}
+ // Used when passive mode is false.
+ private void processPendingSelectables(long timeout)
+ {
+ try
+ {
+ if (timeout == 0)
+ {
+ _selector.selectNow();
+ }
+ else if (timeout < 0)
+ {
+ _selector.select();
+ }
+ else
+ {
+ _selector.select(timeout);
+ }
+ }
+ catch (IOException e)
+ {
+ _logger.log(Level.SEVERE, "Exception when waiting for IO Event",
e);
+ throw new RuntimeException("Exception when waiting for IO Event",
e);
+ }
+
+ for (SelectionKey key : _selector.selectedKeys())
+ {
+ if (key.isValid() && key.isAcceptable())
+ {
+ ListenerImpl listener = (ListenerImpl) key.attachment();
+ try
+ {
+ inboundConnection(listener,
listener.getSocketListener().accept());
+ }
+ catch (IOException e)
+ {
+ _logger.log(Level.SEVERE, "Exception when accepting
connection", e);
+ throw new RuntimeException("Exception when accepting
connection", e);
+ }
+ }
+ else if (key.isValid() && key.isReadable())
+ {
+ connectionReadable((SelectableImpl) key.attachment());
+ }
+ else if (key.isValid() && key.isWritable())
+ {
+ connectionWritable((SelectableImpl) key.attachment());
+ }
+ }
+ _selector.selectedKeys().clear();
+
+ if (_closed.get() && !_passive)
+ {
+ for(SelectableImpl sel : _selectables)
+ {
+ try
+ {
+ sel.getNetworkConnection().close();
+ }
+ catch (IOException e)
+ {
+ _logger.log(Level.WARNING, "Error while closing
connection", e);
+ }
+ sel.markCompleted();
+ }
+ }
+ }
+
private Connection lookup(Address address)
{
- for (Connector<?> c : _driver.connectors())
+ for (SelectableImpl sel : _selectables)
{
- Connection connection = c.getConnection();
+ Connection connection = sel.getConnection();
ConnectionContext ctx = (ConnectionContext)
connection.getContext();
if (ctx.matches(address))
{
return connection;
}
}
+
return null;
}
@@ -973,11 +1106,9 @@ public class MessengerImpl implements Me
// flow changed, must process it
ConnectionContext ctx = (ConnectionContext)
link.getSession().getConnection().getContext();
- try
+ if (!_passive)
{
- ctx.getConnector().process();
- } catch (IOException e) {
- _logger.log(Level.SEVERE, "Error processing connection", e);
+
((SelectableImpl)ctx.getSelectable()).getNetworkConnection().registerForWriteEvents(true);
}
}
@@ -1009,11 +1140,9 @@ public class MessengerImpl implements Me
_draining++;
// drain requested on link, must process it
ConnectionContext ctx = (ConnectionContext)
link.getSession().getConnection().getContext();
- try
+ if (!_passive)
{
- ctx.getConnector().process();
- } catch (IOException e) {
- _logger.log(Level.SEVERE, "Error processing
connection", e);
+
((SelectableImpl)ctx.getSelectable()).getNetworkConnection().registerForWriteEvents(true);
}
if (needed <= 0) break;
}
@@ -1035,7 +1164,7 @@ public class MessengerImpl implements Me
//are all sent messages settled?
int total = _outgoingStore.size();
- for (Connector<?> c : _driver.connectors())
+ for (SelectableImpl sel : _selectables)
{
// TBD
// check if transport is done generating output
@@ -1046,8 +1175,7 @@ public class MessengerImpl implements Me
// return false;
// }
// }
-
- Connection connection = c.getConnection();
+ Connection connection = sel.getConnection();
for (Link link : new Links(connection, ACTIVE, ANY))
{
if (link instanceof Sender)
@@ -1086,9 +1214,9 @@ public class MessengerImpl implements Me
{
//do we have at least one pending message?
if (_incomingStore.size() > 0) return true;
- for (Connector<?> c : _driver.connectors())
+ for (SelectableImpl sel : _selectables)
{
- Connection connection = c.getConnection();
+ Connection connection = sel.getConnection();
Delivery delivery = connection.getWorkHead();
while (delivery != null)
{
@@ -1103,7 +1231,7 @@ public class MessengerImpl implements Me
}
}
// if no connections, or not listening, exit as there won't ever
be a message
- if (!_driver.listeners().iterator().hasNext() &&
!_driver.connectors().iterator().hasNext())
+ if (_closed.get() ||( _selectables.size() == 0 &&
_listeners.size() == 0))
return true;
return false;
@@ -1114,19 +1242,20 @@ public class MessengerImpl implements Me
{
public boolean test()
{
- if (_driver == null) {
- return true;
+ for (Listener l : _listeners)
+ {
+ if (!l.isCompleted())
+ {
+ return false;
+ }
}
-
- for (Connector<?> c : _driver.connectors()) {
- if (!c.isClosed()) {
+ for (Selectable sel : _selectables)
+ {
+ if (!sel.isCompleted())
+ {
return false;
}
}
-
- _driver.destroy();
- _driver = null;
-
return true;
}
}
@@ -1241,21 +1370,36 @@ public class MessengerImpl implements Me
{
String host = address.getHost();
int port = Integer.valueOf(address.getImpliedPort());
- Connector<?> connector = _driver.createConnector(host, port, null);
- _logger.log(Level.FINE, "Connecting to " + host + ":" + port);
+ SelectableImpl sel = new SelectableImpl(host, port);
+
connection = Proton.connection();
+ connection.collect(_collector);
connection.setContainer(_name);
connection.setHostname(host);
- connection.setContext(new ConnectionContext(address, connector));
- connector.setConnection(connection);
- Sasl sasl = connector.sasl();
+ connection.setContext(new ConnectionContext(address, sel));
+ Transport transport = Proton.transport();
+
+ if (!_passive)
+ {
+ IoConnection networkConnection = new IoConnection(_selector,
host, port);
+ _logger.log(Level.FINE, "Connecting to " + host + ":" + port);
+ networkConnection.setSelectable(sel);
+ sel.setNetworkConnection(networkConnection);
+ sel.markConnected();
+ networkConnection.registerForReadEvents(true);
+ networkConnection.registerForWriteEvents(true);
+ }
+ sel.setTransport(transport);
+ sel.setConnection(connection);
+ _selectables.add(sel);
+ transport.bind(connection);
+ Sasl sasl = transport.sasl();
if (sasl != null)
{
sasl.client();
sasl.setMechanisms(new String[]{"ANONYMOUS"});
}
if ("amqps".equalsIgnoreCase(address.getScheme())) {
- Transport transport = connector.getTransport();
SslDomain domain = makeDomain(address, SslDomain.Mode.CLIENT);
if (_trustedDb != null) {
domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER);
@@ -1267,6 +1411,7 @@ public class MessengerImpl implements Me
//ssl.setPeerHostname(host);
}
connection.open();
+
}
for (Link link : new Links(connection, ACTIVE, ANY))
@@ -1473,12 +1618,12 @@ public class MessengerImpl implements Me
private class ConnectionContext
{
private Address _address;
- private Connector _connector;
+ private SelectableImpl _selectable;
- public ConnectionContext(Address address, Connector connector)
+ public ConnectionContext(Address address, SelectableImpl selectable)
{
_address = address;
- _connector = connector;
+ _selectable = selectable;
}
public Address getAddress()
@@ -1490,14 +1635,13 @@ public class MessengerImpl implements Me
{
String host = address.getHost();
String port = address.getImpliedPort();
- Connection conn = _connector.getConnection();
- return host.equals(conn.getRemoteContainer()) ||
+ return
host.equals(_selectable.getConnection().getRemoteContainer()) ||
(_address.getHost().equals(host) &&
_address.getImpliedPort().equals(port));
}
- public Connector getConnector()
+ public SelectableImpl getSelectable()
{
- return _connector;
+ return _selectable;
}
}
@@ -1544,4 +1688,154 @@ public class MessengerImpl implements Me
}
}
-}
+
+ /* ----------------------------------------
+ * IO events for non passive mode
+ * ----------------------------------------
+ */
+ void inboundConnection(Listener listener, IoConnection networkConnection)
+ {
+ //System.out.println("inboundConnection
............................................"+ _name);
+
+ _worked = true;
+ Connection connection = Proton.connection();
+ connection.collect(_collector);
+ connection.setContainer(_name);
+
+ Transport transport = Proton.transport();
+ transport.bind(connection);
+ SelectableImpl selectable = new SelectableImpl();
+ selectable.markConnected();
+ selectable.setTransport(transport);
+ selectable.setConnection(connection);
+ selectable.setNetworkConnection(networkConnection);
+ _selectables.add(selectable);
+
+ ListenerContext ctx = (ListenerContext)
((ListenerImpl)listener).getContext();
+ connection.setContext(new ConnectionContext(ctx.getAddress(),
selectable));
+ networkConnection.setSelectable(selectable);
+
+ //TODO: full SASL
+ Sasl sasl = transport.sasl();
+ if (sasl != null)
+ {
+ sasl.server();
+ sasl.setMechanisms(new String[]{"ANONYMOUS"});
+ sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
+ }
+ transport.ssl(ctx.getDomain());
+ connection.open();
+ networkConnection.registerForReadEvents(true);
+ networkConnection.registerForWriteEvents(true);
+ }
+
+ void connectionReadable(SelectableImpl selectable)
+ {
+ //System.out.println("connectionReadable
............................................" + _name);
+
+ _worked = true;
+ IoConnection networkConnection = selectable.getNetworkConnection();
+ SelectableImpl sel = (SelectableImpl) selectable;
+ Transport transport = sel.getTransport();
+ ByteBuffer tail = transport.tail();
+ try
+ {
+ int read = networkConnection.read(tail);
+ if (read < 0)
+ {
+ connectionClosed(sel);
+ return;
+ }
+ }
+ catch (IOException e)
+ {
+ _logger.log(Level.SEVERE, this + " error reading from file
descriptor", e);
+ // Need to throw the exception as well.
+ }
+ try
+ {
+ transport.process();
+ }
+ catch (TransportException e)
+ {
+ _logger.log(Level.SEVERE, this + " error processing input", e);
+ }
+ networkConnection.registerForReadEvents(transport.capacity() > 0);
+ networkConnection.registerForWriteEvents(true);
+ }
+
+ void connectionWritable(SelectableImpl selectable)
+ {
+ //System.out.println("connectionWritable
............................................" + _name);
+
+ _worked = true;
+ IoConnection networkConnection = selectable.getNetworkConnection();
+ SelectableImpl sel = (SelectableImpl) selectable;
+ Transport transport = sel.getTransport();
+ boolean writeBlocked = false;
+ try
+ {
+ while (transport.pending() > 0 && !writeBlocked)
+ {
+ ByteBuffer head = transport.head();
+ int wrote = 0;
+ try
+ {
+ wrote = networkConnection.write(head);
+ }
+ catch (IOException e)
+ {
+ _logger.log(Level.SEVERE, this + " error writing to the
file descriptor", e);
+ // Need to throw the exception as well.
+ }
+ if (wrote > 0)
+ {
+ transport.pop(wrote);
+ }
+ else
+ {
+ writeBlocked = true;
+ }
+ }
+ networkConnection.registerForWriteEvents(transport.pending() > 0);
+
+ if (selectable.isClosed() && !_passive)
+ {
+ try
+ {
+ selectable.getNetworkConnection().close();
+ }
+ catch (IOException e)
+ {
+ _logger.log(Level.SEVERE, String.format("Error closing
connection : %s", e), e);
+ }
+ selectable.markCompleted();
+ }
+
+ }
+ catch (TransportException e)
+ {
+ _logger.log(Level.SEVERE, this + " error", e);
+ networkConnection.registerForWriteEvents(false);
+ }
+ }
+
+ void connectionClosed(SelectableImpl sel)
+ {
+ reclaimCredit(sel.getConnection());
+ sel.markClosed();
+ if (!_passive)
+ {
+ try
+ {
+ sel.getNetworkConnection().close();
+ }
+ catch (IOException e)
+ {
+ _logger.log(Level.SEVERE, String.format("Error closing
connection : %s", e), e);
+ }
+ }
+ sel.markCompleted();
+ _selectables.remove(sel);
+ }
+}
\ No newline at end of file
Added:
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/SelectableImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/SelectableImpl.java?rev=1602913&view=auto
==============================================================================
---
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/SelectableImpl.java
(added)
+++
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/SelectableImpl.java
Mon Jun 16 16:09:14 2014
@@ -0,0 +1,148 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.messenger.impl;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Transport;
+import org.apache.qpid.proton.messenger.Selectable;
+
+public class SelectableImpl implements Selectable
+{
+ private final AtomicBoolean _connected = new AtomicBoolean(false);
+
+ private final AtomicBoolean _completed = new AtomicBoolean(false);
+
+ private final AtomicBoolean _closed = new AtomicBoolean(false);
+
+ private Object _ctx;
+
+ private Transport _transport;
+
+ private Connection _connection;
+
+ private String _host = null;
+
+ private int _port = -1;
+
+ private IoConnection _networkConnection;
+
+ SelectableImpl(String host, int port)
+ {
+ _host = host;
+ _port = port;
+ }
+
+ SelectableImpl()
+ {
+ }
+
+ Object getContext()
+ {
+ return _ctx;
+ }
+
+ void setContext(Object ctx)
+ {
+ _ctx = ctx;
+ }
+
+ @Override
+ public Transport getTransport()
+ {
+ return _transport;
+ }
+
+ void setTransport(Transport t)
+ {
+ _transport = t;
+ }
+
+ Connection getConnection()
+ {
+ return _connection;
+ }
+
+ void setConnection(Connection c)
+ {
+ _connection = c;
+ }
+
+ void markClosed()
+ {
+ _closed.set(true);
+ }
+
+ @Override
+ public String getHost()
+ {
+ return _host;
+ }
+
+ @Override
+ public int getPort()
+ {
+ return _port;
+ }
+
+ @Override
+ public boolean isClosed()
+ {
+ return _closed.get();
+ }
+
+ @Override
+ public void markCompleted()
+ {
+ _completed.set(true);
+ }
+
+ @Override
+ public boolean isCompleted()
+ {
+ return _completed.get();
+ }
+
+ @Override
+ public boolean isConnected()
+ {
+ return _connected.get();
+ }
+
+ @Override
+ public void markConnected()
+ {
+ _connected.set(true);
+ }
+
+ // Used in non passive mode.
+
+ void setNetworkConnection(IoConnection con)
+ {
+ _networkConnection = con;
+ }
+
+ IoConnection getNetworkConnection()
+ {
+ return _networkConnection;
+ }
+}
\ No newline at end of file
Added:
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/SocketListener.java
URL:
http://svn.apache.org/viewvc/qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/SocketListener.java?rev=1602913&view=auto
==============================================================================
---
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/SocketListener.java
(added)
+++
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/SocketListener.java
Mon Jun 16 16:09:14 2014
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.messenger.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+import org.apache.qpid.proton.messenger.Listener;
+
+class SocketListener
+{
+ private Selector _selector;
+
+ private SelectionKey _key;
+
+ private ServerSocketChannel _serverSocketChannel;
+
+ SocketListener(Selector selector, Listener listener, String host, int
port) throws IOException
+ {
+ _selector = selector;
+ _serverSocketChannel = ServerSocketChannel.open();
+ ServerSocket serverSocket = _serverSocketChannel.socket();
+ serverSocket.bind(new InetSocketAddress(host, port));
+ _serverSocketChannel.configureBlocking(false);
+ _key = _serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+ _key.attach(listener);
+ }
+
+ public void close() throws IOException
+ {
+ _serverSocketChannel.close();
+ }
+
+ IoConnection accept() throws IOException
+ {
+ SocketChannel channel = _serverSocketChannel.accept();
+ return new IoConnection(_selector, channel);
+ }
+
+ @Override
+ public String toString()
+ {
+ return _serverSocketChannel.socket().toString();
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]