Author: rajith
Date: Mon Jun 16 16:09:14 2014
New Revision: 1602913

URL: http://svn.apache.org/r1602913
Log:
PROTON-589
1. Removed dependency on the driver code.
2. Changed the code to use the Collector/Event interface from engine.
3. Added code to support non passive mode.

Added:
    
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/IoConnection.java
    
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/ListenerImpl.java
    
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/SelectableImpl.java
    
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/SocketListener.java
Modified:
    
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java

Added: 
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/IoConnection.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/IoConnection.java?rev=1602913&view=auto
==============================================================================
--- 
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/IoConnection.java
 (added)
+++ 
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/IoConnection.java
 Mon Jun 16 16:09:14 2014
@@ -0,0 +1,154 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.messenger.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+
+import org.apache.qpid.proton.messenger.MessengerException;
+import org.apache.qpid.proton.messenger.Selectable;
+
+class IoConnection
+{
+    private Selector _selector;
+
+    private SelectionKey _key;
+
+    private SocketChannel _channel;
+
+    IoConnection(Selector selector, String host, int port)
+    {
+        try
+        {
+            _selector = selector;
+            _channel = SocketChannel.open();
+            _channel.connect(new InetSocketAddress(host, port));
+            configureSocket(_channel);
+        }
+        catch (Exception e)
+        {
+            e.printStackTrace();
+            throw new MessengerException(String.format("Error connecting to 
%s:%s", host, port), e);
+        }
+    }
+
+    IoConnection(Selector selector, SocketChannel channel)
+    {
+        try
+        {
+            _selector = selector;
+            _channel = channel;
+            configureSocket(_channel);
+        }
+        catch (Exception e)
+        {
+            throw new MessengerException(String.format("Error configuring 
socket channel"), e);
+        }
+    }
+
+    void configureSocket(SocketChannel channel)
+    {
+        try
+        {
+            channel.configureBlocking(false);
+            channel.socket().setTcpNoDelay(true);
+        }
+        catch (Exception e)
+        {
+            throw new MessengerException(String.format("Error configuring 
socket channel"), e);
+        }
+    }
+
+    void setSelectable(Selectable selectable)
+    {
+        try
+        {
+            _key = _channel.register(_selector, SelectionKey.OP_READ | 
SelectionKey.OP_WRITE);
+            _key.attach(selectable);
+        }
+        catch (Exception e)
+        {
+            throw new MessengerException(String.format("Error registering 
socket channel"), e);
+        }
+    }
+
+    public int read(ByteBuffer buf) throws IOException
+    {
+        return _channel.read(buf);
+    }
+
+    public void registerForReadEvents(boolean b)
+    {
+        if (!_channel.isOpen())
+        {
+            return;
+        }
+
+        int interest = _key.interestOps();
+        if (b)
+        {
+            interest |= SelectionKey.OP_READ;
+        }
+        else
+        {
+            interest &= ~SelectionKey.OP_READ;
+        }
+        _key.interestOps(interest);
+    }
+
+    public int write(ByteBuffer buf) throws IOException
+    {
+        return _channel.write(buf);
+    }
+
+    public void registerForWriteEvents(boolean b)
+    {
+        if (!_channel.isOpen())
+        {
+            return;
+        }
+
+        int interest = _key.interestOps();
+        if (b)
+        {
+            interest |= SelectionKey.OP_WRITE;
+        }
+        else
+        {
+            interest &= ~SelectionKey.OP_WRITE;
+        }
+        _key.interestOps(interest);
+    }
+
+    public void close() throws IOException
+    {
+        _channel.close();
+    }
+
+    public boolean isClosed()
+    {
+        return !_channel.isOpen();
+    }
+}
\ No newline at end of file

Added: 
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/ListenerImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/ListenerImpl.java?rev=1602913&view=auto
==============================================================================
--- 
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/ListenerImpl.java
 (added)
+++ 
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/ListenerImpl.java
 Mon Jun 16 16:09:14 2014
@@ -0,0 +1,87 @@
+package org.apache.qpid.proton.messenger.impl;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.qpid.proton.messenger.Listener;
+
+public class ListenerImpl implements Listener
+{
+    private String _hostName;
+
+    private int _port;
+
+    private AtomicBoolean _closed = new AtomicBoolean(false);
+
+    private AtomicBoolean _completed = new AtomicBoolean(false);
+
+    private Object _ctx;
+
+    private SocketListener _socketListener;
+
+    ListenerImpl(String hostName, int port)
+    {
+        _hostName = hostName;
+        _port = port;
+    }
+
+    @Override
+    public String getHost()
+    {
+        return _hostName;
+    }
+
+    @Override
+    public int getPort()
+    {
+        return _port;
+    }
+
+    @Override
+    public void close()
+    {
+        _closed.set(true);
+    }
+
+    @Override
+    public boolean isClosed()
+    {
+        return _closed.get();
+    }
+
+    @Override
+    public void markCompleted()
+    {
+        _completed.set(true);
+    }
+
+    @Override
+    public boolean isCompleted()
+    {
+        return _completed.get();
+    }
+
+    void setContext(Object ctx)
+    {
+        _ctx = ctx;
+    }
+
+    Object getContext()
+    {
+        return _ctx;
+    }
+
+    /*----------------------------------------
+     * Used in active mode (passive == false)
+     * Visible only to the impl classes
+     * ---------------------------------------
+     */
+    void setSocketListener(SocketListener socket)
+    {
+        _socketListener = socket;
+    }
+
+    SocketListener getSocketListener()
+    {
+        return _socketListener;
+    }
+}
\ No newline at end of file

Modified: 
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1602913&r1=1602912&r2=1602913&view=diff
==============================================================================
--- 
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
 (original)
+++ 
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
 Mon Jun 16 16:09:14 2014
@@ -21,43 +21,48 @@
 package org.apache.qpid.proton.messenger.impl;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.InterruptException;
+import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.TimeoutException;
-import org.apache.qpid.proton.driver.Connector;
-import org.apache.qpid.proton.driver.Driver;
-import org.apache.qpid.proton.driver.Listener;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Collector;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Event;
 import org.apache.qpid.proton.engine.Link;
 import org.apache.qpid.proton.engine.Receiver;
 import org.apache.qpid.proton.engine.Sasl;
 import org.apache.qpid.proton.engine.Sender;
 import org.apache.qpid.proton.engine.Session;
-import org.apache.qpid.proton.engine.SslDomain;
 import org.apache.qpid.proton.engine.Ssl;
+import org.apache.qpid.proton.engine.SslDomain;
 import org.apache.qpid.proton.engine.Transport;
+import org.apache.qpid.proton.engine.TransportException;
 import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.messenger.Listener;
 import org.apache.qpid.proton.messenger.Messenger;
 import org.apache.qpid.proton.messenger.MessengerException;
+import org.apache.qpid.proton.messenger.Selectable;
 import org.apache.qpid.proton.messenger.Status;
 import org.apache.qpid.proton.messenger.Tracker;
-import org.apache.qpid.proton.amqp.messaging.Source;
-import org.apache.qpid.proton.amqp.messaging.Target;
-import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
-import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
-
-import org.apache.qpid.proton.amqp.Binary;
 
 public class MessengerImpl implements Messenger
+
 {
     private enum LinkCreditMode
     {
@@ -76,7 +81,6 @@ public class MessengerImpl implements Me
     private long _timeout = -1;
     private boolean _blocking = true;
     private long _nextTag = 1;
-    private Driver _driver;
     private LinkCreditMode _credit_mode = LinkCreditMode.LINK_CREDIT_EXPLICIT;
     private final int _credit_batch = 1024;   // credit_mode == 
LINK_CREDIT_AUTO
     private int _credit;        // available
@@ -90,7 +94,6 @@ public class MessengerImpl implements Me
     private TrackerImpl _outgoingTracker;
     private Store _incomingStore = new Store();
     private Store _outgoingStore = new Store();
-    private List<Connector> _awaitingDestruction = new ArrayList<Connector>();
     private int _sendThreshold;
 
     private Transform _routes = new Transform();
@@ -101,7 +104,14 @@ public class MessengerImpl implements Me
     private String _password;
     private String _trustedDb;
 
-
+    private Collector _collector = Proton.collector();
+    private boolean _passive = false;
+    private final List<SelectableImpl> _selectables = new 
ArrayList<SelectableImpl>();
+    private final List<ListenerImpl> _listeners = new 
ArrayList<ListenerImpl>();
+    private Selector _selector;
+    private AtomicBoolean _closed = new AtomicBoolean(true);
+    private AtomicBoolean _interrupted = new AtomicBoolean(false);
+    
     /**
      * @deprecated This constructor's visibility will be reduced to the 
default scope in a future release.
      * Client code outside this module should use a {@link MessengerFactory} 
instead
@@ -118,6 +128,14 @@ public class MessengerImpl implements Me
     @Deprecated public MessengerImpl(String name)
     {
         _name = name;
+        try
+        {
+            _selector = Selector.open();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException("Unable to create selector",e);
+        }
     }
 
     public void setTimeout(long timeInMillis)
@@ -182,36 +200,56 @@ public class MessengerImpl implements Me
 
     public void start() throws IOException
     {
-        _driver = Proton.driver();
+        _closed.set(false); 
     }
 
     public void stop()
     {
-        if (_driver != null) {
-            if(_logger.isLoggable(Level.FINE))
-            {
-                _logger.fine(this + " about to stop");
-            }
-            //close all connections
-            for (Connector<?> c : _driver.connectors())
-            {
-                Connection connection = c.getConnection();
-                connection.close();
-            }
-            //stop listeners
-            for (Listener<?> l : _driver.listeners())
+        if (_closed.get())
+        {
+            return;
+        }
+
+        _closed.set(true);
+        if(_logger.isLoggable(Level.FINE))
+        {
+            _logger.fine(this + " about to stop");
+        }
+
+        //stop listeners
+        for (ListenerImpl l : _listeners)
+        {
+            l.close();
+            if (!_passive)
             {
                 try
                 {
-                    l.close();
+                    l.getSocketListener().close();
+                    l.markCompleted();
                 }
                 catch (IOException e)
                 {
-                    _logger.log(Level.WARNING, "Error while closing listener", 
e);
+                    _logger.log(Level.SEVERE, String.format("Error closing 
socket listener : %s", e), e);
                 }
             }
-            waitUntil(_allClosed);
+        }        
+
+        //close all connections.
+        for (SelectableImpl sel : _selectables)
+        {
+            SelectableImpl s = (SelectableImpl)sel;
+            Connection connection = s.getConnection();
+            connection.close();
+            if (!_passive && !s.getNetworkConnection().isClosed())
+            {
+                s.getNetworkConnection().registerForWriteEvents(true);
+            }
+            s.markClosed();
         }
+
+        waitUntil(_allClosed);
+        _listeners.clear();
+        _selectables.clear();
     }
 
     public boolean stopped()
@@ -221,15 +259,17 @@ public class MessengerImpl implements Me
 
     public boolean work(long timeout) throws TimeoutException
     {
-        if (_driver == null) { return false; }
+        if (_closed.get()) { return false; }
         _worked = false;
         return waitUntil(_workPred, timeout);
     }
 
     public void interrupt()
     {
-        if (_driver != null) {
-            _driver.wakeup();
+        _interrupted.set(true);
+        if (!_passive)
+        {
+            _selector.wakeup();
         }
     }
 
@@ -289,7 +329,7 @@ public class MessengerImpl implements Me
 
     public void put(Message m) throws MessengerException
     {
-        if (_driver == null) {
+        if (_closed.get()) {
             throw new IllegalStateException("cannot put while messenger is 
stopped");
         }
 
@@ -396,13 +436,13 @@ public class MessengerImpl implements Me
 
     public void send(int n) throws TimeoutException
     {
-        if (_driver == null) {
+        if (_closed.get()) {
             throw new IllegalStateException("cannot send while messenger is 
stopped");
         }
 
         if(_logger.isLoggable(Level.FINE))
         {
-            _logger.fine(this + " about to send");
+            _logger.fine(this + " about to send");            
         }
 
         if (n == -1)
@@ -419,7 +459,7 @@ public class MessengerImpl implements Me
 
     public void recv(int n) throws TimeoutException
     {
-        if (_driver == null) {
+        if (_closed.get()) {
             throw new IllegalStateException("cannot recv while messenger is 
stopped");
         }
 
@@ -527,7 +567,7 @@ public class MessengerImpl implements Me
 
     public void subscribe(String source) throws MessengerException
     {
-        if (_driver == null) {
+        if (_closed.get()) {
             throw new IllegalStateException("messenger is stopped");
         }
 
@@ -544,7 +584,22 @@ public class MessengerImpl implements Me
                 _logger.fine(this + " about to subscribe to source " + source 
+ " using address " + hostName + ":" + port);
             }
             ListenerContext ctx = new ListenerContext(address);
-            _driver.createListener(hostName, port, ctx);
+            ListenerImpl listener = new ListenerImpl(hostName, port);
+            listener.setContext(ctx);
+            if (!_passive)
+            {
+                try
+                {
+                    SocketListener socket = new SocketListener(_selector, 
listener, hostName , port);
+                    listener.setSocketListener(socket);
+                }
+                catch (IOException e)
+                {
+                    _logger.log(Level.SEVERE, String.format("Error subscribing 
to message source : %s", e), e);
+                    throw new MessengerException(String.format("Error 
subscribing to message source : %s", e),e);
+                }
+            }
+            _listeners.add(listener);
         }
         else
         {
@@ -644,13 +699,38 @@ public class MessengerImpl implements Me
         _rewrites.rule(pattern, address);
     }
 
+    @Override
+    public void setPassive(boolean b)
+    {
+        _passive = b;
+    }
+
+    @Override
+    public boolean isPassive()
+    {
+        return _passive;
+    }
+
+    @Override
+    public List<? extends Selectable> getSelectables()
+    {
+        return _selectables;
+    }
+
+    @Override
+    public List<? extends Listener> getListeners()
+    {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
     private int queued(boolean outgoing)
     {
         int count = 0;
-        if (_driver != null) {
-            for (Connector<?> c : _driver.connectors())
+        if (!_closed.get()) {
+            for (SelectableImpl sel : _selectables)
             {
-                Connection connection = c.getConnection();
+                Connection connection = sel.getConnection();
                 for (Link link : new Links(connection, ACTIVE, ANY))
                 {
                     if (outgoing)
@@ -667,169 +747,152 @@ public class MessengerImpl implements Me
         return count;
     }
 
-    private void bringDestruction()
+    private void processEvents()
     {
-        for (Connector<?> c : _awaitingDestruction)
+        for (Event event = _collector.peek(); event != null; event = 
_collector.peek())
         {
-            c.destroy();
+            switch (event.getType())
+            {
+            case CONNECTION_REMOTE_STATE:
+            case CONNECTION_LOCAL_STATE:
+                processConnection(event.getConnection());
+                break;
+            case SESSION_REMOTE_STATE:
+            case SESSION_LOCAL_STATE:
+                processSession(event.getSession());
+                break;
+            case LINK_REMOTE_STATE:
+            case LINK_LOCAL_STATE:
+                processLink(event.getLink());
+                break;
+            case LINK_FLOW:
+                processFlow(event.getLink());
+                break;
+            case DELIVERY:
+                processDelivery(event.getDelivery());
+                break;
+            case TRANSPORT:
+                distributeCredit();
+                break;
+            }
+            _collector.pop();
         }
-        _awaitingDestruction.clear();
     }
 
-    private void processAllConnectors()
+    private void processConnection(Connection connection)
     {
-        distributeCredit();
-        for (Connector<?> c : _driver.connectors())
+        if ( connection.getRemoteState() == EndpointState.ACTIVE)
         {
-            processEndpoints(c);
-            try
+            if (connection.getLocalState() == EndpointState.UNINITIALIZED)
             {
-                if (c.process()) {
-                    _worked = true;
-                }
+                connection.open();                
             }
-            catch (IOException e)
+        }
+        if (connection.getRemoteState() == EndpointState.CLOSED)
+        {
+            if (connection.getLocalState() == EndpointState.ACTIVE)
             {
-                _logger.log(Level.SEVERE, "Error processing connection", e);
+                connection.close();
             }
         }
-        bringDestruction();
-        distributeCredit();
+        ConnectionContext ctx = (ConnectionContext)connection.getContext();
+        if (!_passive)
+        {
+            
((SelectableImpl)ctx.getSelectable()).getNetworkConnection().registerForWriteEvents(true);
+        }
     }
 
-    private void processActive()
+    private void processSession(Session session)
     {
-        //process active listeners
-        for (Listener<?> l = _driver.listener(); l != null; l = 
_driver.listener())
+        if (session.getRemoteState() == EndpointState.ACTIVE)
         {
-            _worked = true;
-            Connector<?> c = l.accept();
-            Connection connection = Proton.connection();
-            connection.setContainer(_name);
-            ListenerContext ctx = (ListenerContext) l.getContext();
-            connection.setContext(new ConnectionContext(ctx.getAddress(), c));
-            c.setConnection(connection);
-            Transport transport = c.getTransport();
-            //TODO: full SASL
-            Sasl sasl = c.sasl();
-            if (sasl != null)
+            if (session.getLocalState() == EndpointState.UNINITIALIZED)
             {
-                sasl.server();
-                sasl.setMechanisms(new String[]{"ANONYMOUS"});
-                sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
+                session.open();
+                _logger.log(Level.FINE, "Opened session " + session);
             }
-            transport.ssl(ctx.getDomain());
-            connection.open();
         }
-        // process connectors, reclaiming credit on closed connectors
-        for (Connector<?> c = _driver.connector(); c != null; c = 
_driver.connector())
+        if (session.getRemoteState() == EndpointState.CLOSED)
         {
-            _worked = true;
-            if (c.isClosed())
-            {
-                _awaitingDestruction.add(c);
-                reclaimCredit(c.getConnection());
-            }
-            else
+            if (session.getLocalState() == EndpointState.ACTIVE)
             {
-                _logger.log(Level.FINE, "Processing active connector " + c);
-                try
-                {
-                    c.process();
-                    processEndpoints(c);
-                    c.process();
-                }
-                catch (IOException e)
-                {
-                    _logger.log(Level.SEVERE, "Error processing connection", 
e);
-                }
+                session.close();
             }
         }
-        bringDestruction();
-        distributeCredit();
+        ConnectionContext ctx = 
(ConnectionContext)session.getConnection().getContext();
+        if (!_passive)
+        {
+            
((SelectableImpl)ctx.getSelectable()).getNetworkConnection().registerForWriteEvents(true);
+        }
     }
 
-    private void processEndpoints(Connector c)
+    private void processLink(Link link)
     {
-        Connection connection = c.getConnection();
-
-        if (connection.getLocalState() == EndpointState.UNINITIALIZED)
+        if (link.getRemoteState() == EndpointState.ACTIVE)
         {
-            connection.open();
+            if (link.getLocalState() == EndpointState.UNINITIALIZED)
+            {
+                link.setSource(link.getRemoteSource());
+                link.setTarget(link.getRemoteTarget());
+                linkAdded(link);
+                link.open();
+                _logger.log(Level.FINE, "Opened link " + link);
+            }
         }
-
-        Delivery delivery = connection.getWorkHead();
-        while (delivery != null)
+        if (link.getRemoteState() == EndpointState.CLOSED)
         {
-            Link link = delivery.getLink();
-            if (delivery.isUpdated())
+            if (link.getLocalState() == EndpointState.ACTIVE)
             {
-                if (link instanceof Sender)
-                {
-                    delivery.disposition(delivery.getRemoteState());
-                }
-                StoreEntry e = (StoreEntry) delivery.getContext();
-                if (e != null) e.updated();
+                link.close();
             }
-
-            if (delivery.isReadable())
+            else
             {
-                pumpIn( link.getSource().getAddress(), (Receiver)link );
+                reclaimLink(link);
             }
-
-            Delivery next = delivery.getWorkNext();
-            delivery.clear();
-            delivery = next;
         }
+        ConnectionContext ctx = 
(ConnectionContext)link.getSession().getConnection().getContext();
+        if (!_passive)
+        {
+            
((SelectableImpl)ctx.getSelectable()).getNetworkConnection().registerForWriteEvents(true);
+        }
+    }
 
-        for (Session session : new Sessions(connection, UNINIT, ANY))
+    private void processFlow(Link link)
+    {
+        if (link instanceof Sender)
         {
-            session.open();
-            _logger.log(Level.FINE, "Opened session " + session);
+            pumpOut(link.getTarget().getAddress(), (Sender)link);
         }
-        for (Link link : new Links(connection, UNINIT, ANY))
+        ConnectionContext ctx = 
(ConnectionContext)link.getSession().getConnection().getContext();
+        if (!_passive)
         {
-            //TODO: the following is not correct; should only copy those 
properties that we understand
-            link.setSource(link.getRemoteSource());
-            link.setTarget(link.getRemoteTarget());
-            linkAdded(link);
-            link.open();
-            _logger.log(Level.FINE, "Opened link " + link);
+            
((SelectableImpl)ctx.getSelectable()).getNetworkConnection().registerForWriteEvents(true);
         }
+    }
 
-        distributeCredit();
-
-        for (Link link : new Links(connection, ACTIVE, ACTIVE))
+    private void processDelivery(Delivery delivery)
+    {
+        Link link = delivery.getLink();
+        if (delivery.isUpdated())
         {
             if (link instanceof Sender)
             {
-                pumpOut(link.getTarget().getAddress(), (Sender)link);
+                delivery.disposition(delivery.getRemoteState());
             }
+            StoreEntry e = (StoreEntry) delivery.getContext();
+            if (e != null) e.updated();
         }
 
-        for (Session session : new Sessions(connection, ACTIVE, CLOSED))
-        {
-            session.close();
-        }
-
-        for (Link link : new Links(connection, ANY, CLOSED))
+        if (delivery.isReadable())
         {
-            if (link.getLocalState() == EndpointState.ACTIVE)
-            {
-                link.close();
-            }
-            else
-            {
-                reclaimLink(link);
-            }
+            pumpIn( link.getSource().getAddress(), (Receiver)link );
         }
 
-        if (connection.getRemoteState() == EndpointState.CLOSED)
+        delivery.clear();
+        ConnectionContext ctx = 
(ConnectionContext)link.getSession().getConnection().getContext();
+        if (!_passive)
         {
-            if (connection.getLocalState() == EndpointState.ACTIVE)
-            {
-                connection.close();
-            }
+            
((SelectableImpl)ctx.getSelectable()).getNetworkConnection().registerForWriteEvents(true);
         }
     }
 
@@ -851,11 +914,12 @@ public class MessengerImpl implements Me
 
     private boolean waitUntil(Predicate condition, long timeout)
     {
-        if (_driver == null) {
-            throw new IllegalStateException("cannot wait while messenger is 
stopped");
-        }
+        processEvents();
 
-        processAllConnectors();
+        if (_passive)
+        {
+            return condition.test();
+        }
 
         // wait until timeout expires or until test is true
         long now = System.currentTimeMillis();
@@ -865,7 +929,8 @@ public class MessengerImpl implements Me
         while (true)
         {
             done = condition.test();
-            if (done) break;
+            if (done)
+                break;
 
             long remaining;
             if (timeout < 0)
@@ -884,30 +949,98 @@ public class MessengerImpl implements Me
                 long wakeup = (_next_drain > now) ? _next_drain - now : 0;
                 remaining = (remaining == -1) ? wakeup : Math.min(remaining, 
wakeup);
             }
-
-            boolean woken;
-            woken = _driver.doWait(remaining);
-            processActive();
-            if (woken) {
+            processPendingSelectables(remaining);
+            processEvents();
+            if (_interrupted.get())
+            {
+                _interrupted.set(false);
                 throw new InterruptException();
             }
-            now = System.currentTimeMillis();
+            now = System.currentTimeMillis();            
         }
 
         return done;
     }
 
+    // Used when passive mode is false.
+    private void processPendingSelectables(long timeout)
+    {
+        try
+        {
+            if (timeout == 0)
+            {
+                _selector.selectNow();
+            }
+            else if (timeout < 0)
+            {
+                _selector.select();
+            }
+            else
+            {
+                _selector.select(timeout);
+            }
+        }
+        catch (IOException e)
+        {
+            _logger.log(Level.SEVERE, "Exception when waiting for IO Event", 
e);
+            throw new RuntimeException("Exception when waiting for IO Event", 
e);
+        }
+
+        for (SelectionKey key : _selector.selectedKeys())
+        {
+            if (key.isValid() && key.isAcceptable())
+            {
+                ListenerImpl listener = (ListenerImpl) key.attachment();
+                try
+                {
+                    inboundConnection(listener, 
listener.getSocketListener().accept());
+                }
+                catch (IOException e)
+                {
+                    _logger.log(Level.SEVERE, "Exception when accepting 
connection", e);
+                    throw new RuntimeException("Exception when accepting 
connection", e);
+                }
+            }
+            else if (key.isValid() && key.isReadable())
+            {
+                connectionReadable((SelectableImpl) key.attachment());
+            }
+            else if (key.isValid() && key.isWritable())
+            {
+                connectionWritable((SelectableImpl) key.attachment());
+            }
+        }
+        _selector.selectedKeys().clear();
+
+        if (_closed.get() && !_passive)
+        {
+            for(SelectableImpl sel : _selectables)
+            {
+                try
+                {
+                    sel.getNetworkConnection().close();
+                }
+                catch (IOException e)
+                {
+                    _logger.log(Level.WARNING, "Error while closing 
connection", e);
+                }
+                sel.markCompleted();
+            }
+        }
+    }
+
     private Connection lookup(Address address)
     {
-        for (Connector<?> c : _driver.connectors())
+        for (SelectableImpl sel : _selectables)
         {
-            Connection connection = c.getConnection();
+            Connection connection = sel.getConnection();
             ConnectionContext ctx = (ConnectionContext) 
connection.getContext();
             if (ctx.matches(address))
             {
                 return connection;
             }
         }
+
         return null;
     }
 
@@ -973,11 +1106,9 @@ public class MessengerImpl implements Me
 
             // flow changed, must process it
             ConnectionContext ctx = (ConnectionContext) 
link.getSession().getConnection().getContext();
-            try
+            if (!_passive)
             {
-                ctx.getConnector().process();
-            } catch (IOException e) {
-                _logger.log(Level.SEVERE, "Error processing connection", e);
+                
((SelectableImpl)ctx.getSelectable()).getNetworkConnection().registerForWriteEvents(true);
             }
         }
 
@@ -1009,11 +1140,9 @@ public class MessengerImpl implements Me
                             _draining++;
                             // drain requested on link, must process it
                             ConnectionContext ctx = (ConnectionContext) 
link.getSession().getConnection().getContext();
-                            try
+                            if (!_passive)
                             {
-                                ctx.getConnector().process();
-                            } catch (IOException e) {
-                                _logger.log(Level.SEVERE, "Error processing 
connection", e);
+                                
((SelectableImpl)ctx.getSelectable()).getNetworkConnection().registerForWriteEvents(true);
                             }
                             if (needed <= 0) break;
                         }
@@ -1035,7 +1164,7 @@ public class MessengerImpl implements Me
             //are all sent messages settled?
             int total = _outgoingStore.size();
 
-            for (Connector<?> c : _driver.connectors())
+            for (SelectableImpl sel : _selectables)
             {
                 // TBD
                 // check if transport is done generating output
@@ -1046,8 +1175,7 @@ public class MessengerImpl implements Me
                 //        return false;
                 //    }
                 // }
-
-                Connection connection = c.getConnection();
+                Connection connection = sel.getConnection();
                 for (Link link : new Links(connection, ACTIVE, ANY))
                 {
                     if (link instanceof Sender)
@@ -1086,9 +1214,9 @@ public class MessengerImpl implements Me
         {
             //do we have at least one pending message?
             if (_incomingStore.size() > 0) return true;
-            for (Connector<?> c : _driver.connectors())
+            for (SelectableImpl sel : _selectables)
             {
-                Connection connection = c.getConnection();
+                Connection connection = sel.getConnection();
                 Delivery delivery = connection.getWorkHead();
                 while (delivery != null)
                 {
@@ -1103,7 +1231,7 @@ public class MessengerImpl implements Me
                 }
             }
             // if no connections, or not listening, exit as there won't ever 
be a message
-            if (!_driver.listeners().iterator().hasNext() && 
!_driver.connectors().iterator().hasNext())
+            if (_closed.get() ||( _selectables.size() == 0 && 
_listeners.size() == 0))
                 return true;
 
             return false;
@@ -1114,19 +1242,20 @@ public class MessengerImpl implements Me
     {
         public boolean test()
         {
-            if (_driver == null) {
-                return true;
+            for (Listener l : _listeners)
+            {
+                if (!l.isCompleted())
+                {
+                    return false;
+                }
             }
-
-            for (Connector<?> c : _driver.connectors()) {
-                if (!c.isClosed()) {
+            for (Selectable sel : _selectables)
+            {
+                if (!sel.isCompleted())
+                {
                     return false;
                 }
             }
-
-            _driver.destroy();
-            _driver = null;
-
             return true;
         }
     }
@@ -1241,21 +1370,36 @@ public class MessengerImpl implements Me
         {
             String host = address.getHost();
             int port = Integer.valueOf(address.getImpliedPort());
-            Connector<?> connector = _driver.createConnector(host, port, null);
-            _logger.log(Level.FINE, "Connecting to " + host + ":" + port);
+            SelectableImpl sel = new SelectableImpl(host, port);
+
             connection = Proton.connection();
+            connection.collect(_collector);
             connection.setContainer(_name);
             connection.setHostname(host);
-            connection.setContext(new ConnectionContext(address, connector));
-            connector.setConnection(connection);
-            Sasl sasl = connector.sasl();
+            connection.setContext(new ConnectionContext(address, sel));
+            Transport transport = Proton.transport();
+
+            if (!_passive)
+            {
+                IoConnection networkConnection = new IoConnection(_selector, 
host, port);
+                _logger.log(Level.FINE, "Connecting to " + host + ":" + port);
+                networkConnection.setSelectable(sel);
+                sel.setNetworkConnection(networkConnection);
+                sel.markConnected();
+                networkConnection.registerForReadEvents(true);
+                networkConnection.registerForWriteEvents(true);
+            }
+            sel.setTransport(transport);
+            sel.setConnection(connection);
+            _selectables.add(sel);
+            transport.bind(connection);
+            Sasl sasl = transport.sasl();
             if (sasl != null)
             {
                 sasl.client();
                 sasl.setMechanisms(new String[]{"ANONYMOUS"});
             }
             if ("amqps".equalsIgnoreCase(address.getScheme())) {
-                Transport transport = connector.getTransport();
                 SslDomain domain = makeDomain(address, SslDomain.Mode.CLIENT);
                 if (_trustedDb != null) {
                     
domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER);
@@ -1267,6 +1411,7 @@ public class MessengerImpl implements Me
                 //ssl.setPeerHostname(host);
             }
             connection.open();
+            
         }
 
         for (Link link : new Links(connection, ACTIVE, ANY))
@@ -1473,12 +1618,12 @@ public class MessengerImpl implements Me
     private class ConnectionContext
     {
         private Address _address;
-        private Connector _connector;
+        private SelectableImpl _selectable;
 
-        public ConnectionContext(Address address, Connector connector)
+        public ConnectionContext(Address address, SelectableImpl selectable)
         {
             _address = address;
-            _connector = connector;
+            _selectable = selectable;
         }
 
         public Address getAddress()
@@ -1490,14 +1635,13 @@ public class MessengerImpl implements Me
         {
             String host = address.getHost();
             String port = address.getImpliedPort();
-            Connection conn = _connector.getConnection();
-            return host.equals(conn.getRemoteContainer()) ||
+            return 
host.equals(_selectable.getConnection().getRemoteContainer()) ||
                 (_address.getHost().equals(host) && 
_address.getImpliedPort().equals(port));
         }
 
-        public Connector getConnector()
+        public SelectableImpl getSelectable()
         {
-            return _connector;
+            return _selectable;
         }
     }
 
@@ -1544,4 +1688,154 @@ public class MessengerImpl implements Me
         }
 
     }
-}
+
+    /* ----------------------------------------
+     * IO events for non passive mode
+     * ----------------------------------------
+     */
+    void inboundConnection(Listener listener, IoConnection networkConnection)
+    {
+        //System.out.println("inboundConnection 
............................................"+ _name);
+
+        _worked = true;
+        Connection connection = Proton.connection();
+        connection.collect(_collector);
+        connection.setContainer(_name);
+        
+        Transport transport = Proton.transport();
+        transport.bind(connection);
+        SelectableImpl selectable = new SelectableImpl();
+        selectable.markConnected();
+        selectable.setTransport(transport);
+        selectable.setConnection(connection);
+        selectable.setNetworkConnection(networkConnection);
+        _selectables.add(selectable);
+        
+        ListenerContext ctx = (ListenerContext) 
((ListenerImpl)listener).getContext();
+        connection.setContext(new ConnectionContext(ctx.getAddress(), 
selectable));
+        networkConnection.setSelectable(selectable);
+        
+        //TODO: full SASL
+        Sasl sasl = transport.sasl();
+        if (sasl != null)
+        {
+            sasl.server();
+            sasl.setMechanisms(new String[]{"ANONYMOUS"});
+            sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
+        }
+        transport.ssl(ctx.getDomain());
+        connection.open();
+        networkConnection.registerForReadEvents(true);
+        networkConnection.registerForWriteEvents(true);
+    }
+    
+    void connectionReadable(SelectableImpl selectable)
+    {
+        //System.out.println("connectionReadable 
............................................" + _name);
+        
+        _worked = true;
+        IoConnection networkConnection = selectable.getNetworkConnection();
+        SelectableImpl sel = (SelectableImpl) selectable;
+        Transport transport = sel.getTransport();
+        ByteBuffer tail = transport.tail();
+        try
+        {
+            int read = networkConnection.read(tail);
+            if (read < 0)
+            {
+                connectionClosed(sel);
+                return;
+            }
+        }
+        catch (IOException e)
+        {
+            _logger.log(Level.SEVERE, this + " error reading from file 
descriptor", e);
+            // Need to throw the exception as well.
+        }
+        try
+        {
+            transport.process();
+        }
+        catch (TransportException e)
+        {
+            _logger.log(Level.SEVERE, this + " error processing input", e);
+        }
+        networkConnection.registerForReadEvents(transport.capacity() > 0);
+        networkConnection.registerForWriteEvents(true);
+    }
+
+   void connectionWritable(SelectableImpl selectable)
+    {
+       //System.out.println("connectionWritable 
............................................" + _name);
+       
+        _worked = true;
+        IoConnection networkConnection = selectable.getNetworkConnection();
+        SelectableImpl sel = (SelectableImpl) selectable;
+        Transport transport = sel.getTransport();
+        boolean writeBlocked = false;
+        try
+        {
+            while (transport.pending() > 0 && !writeBlocked)
+            {
+                ByteBuffer head = transport.head();
+                int wrote = 0;
+                try
+                {
+                    wrote = networkConnection.write(head);
+                }
+                catch (IOException e)
+                {
+                    _logger.log(Level.SEVERE, this + " error writing to the 
file descriptor", e);
+                    // Need to throw the exception as well.
+                }
+                if (wrote > 0)
+                {
+                    transport.pop(wrote);
+                }
+                else
+                {
+                    writeBlocked = true;                    
+                }
+            }
+            networkConnection.registerForWriteEvents(transport.pending() > 0);
+
+            if (selectable.isClosed() && !_passive)
+            {
+                try
+                {
+                    selectable.getNetworkConnection().close();
+                }
+                catch (IOException e)
+                {
+                    _logger.log(Level.SEVERE, String.format("Error closing 
connection : %s", e), e);
+                }
+                selectable.markCompleted();
+            }
+            
+        }
+        catch (TransportException e)
+        {
+            _logger.log(Level.SEVERE, this + " error", e);
+            networkConnection.registerForWriteEvents(false);
+        }
+    }
+
+    void connectionClosed(SelectableImpl sel)
+    {
+        reclaimCredit(sel.getConnection());
+        sel.markClosed();
+        if (!_passive)
+        {
+            try
+            {
+                sel.getNetworkConnection().close();
+            }
+            catch (IOException e)
+            {
+                _logger.log(Level.SEVERE, String.format("Error closing 
connection : %s", e), e);
+            }
+        }
+        sel.markCompleted();
+        _selectables.remove(sel);
+    }
+}
\ No newline at end of file

Added: 
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/SelectableImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/SelectableImpl.java?rev=1602913&view=auto
==============================================================================
--- 
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/SelectableImpl.java
 (added)
+++ 
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/SelectableImpl.java
 Mon Jun 16 16:09:14 2014
@@ -0,0 +1,148 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.messenger.impl;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Transport;
+import org.apache.qpid.proton.messenger.Selectable;
+
+public class SelectableImpl implements Selectable
+{
+    private final AtomicBoolean _connected = new AtomicBoolean(false);
+
+    private final AtomicBoolean _completed = new AtomicBoolean(false);
+
+    private final AtomicBoolean _closed = new AtomicBoolean(false);
+
+    private Object _ctx;
+
+    private Transport _transport;
+
+    private Connection _connection;
+
+    private String _host = null;
+
+    private int _port = -1;
+
+    private IoConnection _networkConnection;
+
+    SelectableImpl(String host, int port)
+    {
+        _host = host;
+        _port = port;
+    }
+
+    SelectableImpl()
+    {
+    }
+
+    Object getContext()
+    {
+        return _ctx;
+    }
+
+    void setContext(Object ctx)
+    {
+        _ctx = ctx;
+    }
+
+    @Override
+    public Transport getTransport()
+    {
+        return _transport;
+    }
+
+    void setTransport(Transport t)
+    {
+        _transport = t;
+    }
+
+    Connection getConnection()
+    {
+        return _connection;
+    }
+
+    void setConnection(Connection c)
+    {
+        _connection = c;
+    }
+
+    void markClosed()
+    {
+        _closed.set(true);
+    }
+
+    @Override
+    public String getHost()
+    {
+        return _host;
+    }
+
+    @Override
+    public int getPort()
+    {
+        return _port;
+    }
+
+    @Override
+    public boolean isClosed()
+    {
+        return _closed.get();
+    }
+
+    @Override
+    public void markCompleted()
+    {
+        _completed.set(true);
+    }
+
+    @Override
+    public boolean isCompleted()
+    {
+        return _completed.get();
+    }
+
+    @Override
+    public boolean isConnected()
+    {
+        return _connected.get();
+    }
+
+    @Override
+    public void markConnected()
+    {
+        _connected.set(true);
+    }
+
+    // Used in non passive mode.
+
+    void setNetworkConnection(IoConnection con)
+    {
+        _networkConnection = con;
+    }
+
+    IoConnection getNetworkConnection()
+    {
+        return _networkConnection;
+    }
+}
\ No newline at end of file

Added: 
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/SocketListener.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/SocketListener.java?rev=1602913&view=auto
==============================================================================
--- 
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/SocketListener.java
 (added)
+++ 
qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/SocketListener.java
 Mon Jun 16 16:09:14 2014
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.messenger.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+import org.apache.qpid.proton.messenger.Listener;
+
+class SocketListener
+{
+    private Selector _selector;
+
+    private SelectionKey _key;
+
+    private ServerSocketChannel _serverSocketChannel;
+
+    SocketListener(Selector selector, Listener listener, String host, int 
port) throws IOException
+    {
+        _selector = selector;
+        _serverSocketChannel = ServerSocketChannel.open();
+        ServerSocket serverSocket = _serverSocketChannel.socket();
+        serverSocket.bind(new InetSocketAddress(host, port));
+        _serverSocketChannel.configureBlocking(false);
+        _key = _serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+        _key.attach(listener);
+    }
+
+    public void close() throws IOException
+    {
+        _serverSocketChannel.close();
+    }
+
+    IoConnection accept() throws IOException
+    {
+        SocketChannel channel = _serverSocketChannel.accept();
+        return new IoConnection(_selector, channel);
+    }
+
+    @Override
+    public String toString()
+    {
+        return _serverSocketChannel.socket().toString();
+    }
+}
\ No newline at end of file



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to