Author: rhs
Date: Thu Aug 29 18:07:49 2013
New Revision: 1518740

URL: http://svn.apache.org/r1518740
Log:
implemented route and rewrite for java Messenger

Added:
    
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java
    
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/Transform.java
Modified:
    
qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java
    
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/ProtonUnsupportedOperationException.java
    
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
    qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
    
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
    
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
    qpid/proton/trunk/tests/python/proton-test

Modified: 
qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java?rev=1518740&r1=1518739&r2=1518740&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java
 (original)
+++ 
qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java
 Thu Aug 29 18:07:49 2013
@@ -279,6 +279,20 @@ class JNIMessenger implements Messenger
         return Status.UNKNOWN;  //TODO - is this correct?
     }
 
+    @Override
+    public void route(String pattern, String address)
+    {
+        int err = Proton.pn_messenger_route(_impl, pattern, address);
+        check(err);
+    }
+
+    @Override
+    public void rewrite(String pattern, String address)
+    {
+        int err = Proton.pn_messenger_rewrite(_impl, pattern, address);
+        check(err);
+    }
+
     private void check(int errorCode) throws ProtonException
     {
         if(errorCode != 0 && errorCode != Proton.PN_INPROGRESS)

Modified: 
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/ProtonUnsupportedOperationException.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/ProtonUnsupportedOperationException.java?rev=1518740&r1=1518739&r2=1518740&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/ProtonUnsupportedOperationException.java
 (original)
+++ 
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/ProtonUnsupportedOperationException.java
 Thu Aug 29 18:07:49 2013
@@ -26,7 +26,7 @@ package org.apache.qpid.proton;
 public class ProtonUnsupportedOperationException extends 
UnsupportedOperationException
 {
     /** Used by the Python test layer to detect an unsupported operation */
-    public static final boolean protonUnsupportedOperation = true;
+    public static final boolean skipped = true;
 
     public ProtonUnsupportedOperationException()
     {

Modified: 
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java?rev=1518740&r1=1518739&r2=1518740&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
 (original)
+++ 
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
 Thu Aug 29 18:07:49 2013
@@ -204,4 +204,9 @@ public interface Messenger
      * with the given tracker.
      */
     Status getStatus(Tracker tracker);
+
+    public void route(String pattern, String address);
+
+    public void rewrite(String pattern, String address);
+
 }

Modified: qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py?rev=1518740&r1=1518739&r2=1518740&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py 
(original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py Thu Aug 
29 18:07:49 2013
@@ -1289,11 +1289,11 @@ class Messenger(object):
     else:
       self.impl = Proton.messenger()
 
-  def route(self, *args, **kwargs):
-    raise Skipped()
+  def route(self, pattern, address):
+    self.impl.route(pattern, address)
 
-  def rewrite(self, *args, **kwargs):
-    raise Skipped()
+  def rewrite(self, pattern, address):
+    self.impl.rewrite(pattern, address)
 
   def start(self):
     self.impl.start()

Modified: 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java?rev=1518740&r1=1518739&r2=1518740&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
 (original)
+++ 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
 Thu Aug 29 18:07:49 2013
@@ -158,9 +158,9 @@ public class DriverImpl implements Drive
         try
         {
             ServerSocketChannel serverSocketChannel = 
ServerSocketChannel.open();
-            serverSocketChannel.configureBlocking(false);
             ServerSocket serverSocket = serverSocketChannel.socket();
             serverSocket.bind(new InetSocketAddress(host, port));
+            serverSocketChannel.configureBlocking(false);
             Listener<C> listener = createListener(serverSocketChannel, 
context);
             _logger.fine("Created listener on " + host + ":" + port + ": " + 
context);
 

Added: 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java?rev=1518740&view=auto
==============================================================================
--- 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java
 (added)
+++ 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java
 Thu Aug 29 18:07:49 2013
@@ -0,0 +1,140 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.messenger.impl;
+
+
+/**
+ * Address
+ *
+ */
+
+class Address
+{
+
+    private String _address;
+    private boolean _passive;
+    private String _scheme;
+    private String _user;
+    private String _pass;
+    private String _host;
+    private String _port;
+    private String _name;
+
+    public Address(String address)
+    {
+        _address = address;
+        parse();
+    }
+
+    private void parse()
+    {
+        _passive = false;
+        _scheme = null;
+        _user = null;
+        _pass = null;
+        _host = null;
+        _port = null;
+        _name = null;
+
+        int start = 0;
+        int schemeEnd = _address.indexOf("://", start);
+        if (schemeEnd > 0) {
+            _scheme = _address.substring(start, schemeEnd);
+            start = schemeEnd + 3;
+        }
+
+        int at = _address.indexOf('@', start);
+        if (at > 0) {
+            String up = _address.substring(start, at);
+            int colon = up.indexOf(':');
+            if (colon > 0) {
+                _user = up.substring(0, colon);
+                _pass = up.substring(colon + 1);
+            } else {
+                _user = up;
+            }
+            start = at + 1;
+        }
+
+        int slash = _address.indexOf("/", start);
+        String hp;
+        if (slash > 0) {
+            hp = _address.substring(start, slash);
+            _name = _address.substring(slash + 1);
+        } else {
+            hp = _address.substring(start);
+        }
+
+        int colon = hp.indexOf(':');
+        if (colon > 0) {
+            _host = hp.substring(0, colon);
+            _port = hp.substring(colon + 1);
+        } else {
+            _host = hp;
+        }
+
+        if (_host.startsWith("~")) {
+            _host = _host.substring(1);
+            _passive = true;
+        }
+    }
+
+    public String toString()
+    {
+        return _address;
+    }
+
+    public boolean isPassive()
+    {
+        return _passive;
+    }
+
+    public String getScheme()
+    {
+        return _scheme;
+    }
+
+    public String getUser()
+    {
+        return _user;
+    }
+
+    public String getPass()
+    {
+        return _pass;
+    }
+
+    public String getHost()
+    {
+        return _host;
+    }
+
+    public String getPort()
+    {
+        return _port;
+    }
+
+    public String getName()
+    {
+        return _name;
+    }
+
+}

Modified: 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1518740&r1=1518739&r2=1518740&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
 (original)
+++ 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
 Thu Aug 29 18:07:49 2013
@@ -21,8 +21,6 @@
 package org.apache.qpid.proton.messenger.impl;
 
 import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.Iterator;
@@ -82,6 +80,9 @@ public class MessengerImpl implements Me
     private TrackerQueue _outgoing = new TrackerQueue();
     private List<Connector> _awaitingDestruction = new ArrayList<Connector>();
 
+    private Transform _routes = new Transform();
+    private Transform _rewrites = new Transform();
+
 
     /**
      * @deprecated This constructor's visibility will be reduced to the 
default scope in a future release.
@@ -174,6 +175,60 @@ public class MessengerImpl implements Me
         }
     }
 
+    private String defaultRewrite(String address) {
+        if (address != null && address.contains("@")) {
+            Address addr = new Address(address);
+            String scheme = addr.getScheme();
+            String host = addr.getHost();
+            String port = addr.getPort();
+            String name = addr.getName();
+
+            StringBuilder sb = new StringBuilder();
+            if (scheme != null) {
+                sb.append(scheme).append("://");
+            }
+            if (host != null) {
+                sb.append(host);
+            }
+            if (port != null) {
+                sb.append(":").append(port);
+            }
+            if (name != null) {
+                sb.append("/").append(name);
+            }
+            return sb.toString();
+        } else {
+            return address;
+        }
+    }
+
+
+    private String _original;
+
+    private void rewriteMessage(Message m)
+    {
+        _original = m.getAddress();
+        if (_rewrites.apply(_original)) {
+            m.setAddress(_rewrites.result());
+        } else {
+            m.setAddress(defaultRewrite(_original));
+        }
+    }
+
+    private void restoreMessage(Message m)
+    {
+        m.setAddress(_original);
+    }
+
+    private String routeAddress(String addr)
+    {
+        if (_routes.apply(addr)) {
+            return _routes.result();
+        } else {
+            return addr;
+        }
+    }
+
     public void put(Message m) throws MessengerException
     {
         if (_driver == null) {
@@ -184,15 +239,19 @@ public class MessengerImpl implements Me
         {
             _logger.fine(this + " about to put message: " + m);
         }
-        try
-        {
-            URI address = new URI(m.getAddress());
+
+        String addr = routeAddress(m.getAddress());
+        rewriteMessage(m);
+
+        try {
+            Address address = new Address(addr);
             if (address.getHost() == null)
             {
-                throw new MessengerException("unable to send to address: " + 
m.getAddress());
+                throw new MessengerException("unable to send to address: " + 
addr);
             }
-            int port = address.getPort() < 0 ? 
defaultPort(address.getScheme()) : address.getPort();
-            Sender sender = getLink(address.getHost(), port, new 
SenderFinder(cleanPath(address.getPath())));
+            String ports = address.getPort() == null ? 
defaultPort(address.getScheme()) : address.getPort();
+            int port = Integer.valueOf(ports);
+            Sender sender = getLink(address.getHost(), port, new 
SenderFinder(address.getName()));
 
             adjustReplyTo(m);
 
@@ -213,9 +272,9 @@ public class MessengerImpl implements Me
             _outgoing.add(delivery);
             sender.advance();
         }
-        catch (URISyntaxException e)
+        finally
         {
-            throw new MessengerException("Invalid address: " + m.getAddress(), 
e);
+            restoreMessage(m);
         }
     }
 
@@ -302,39 +361,30 @@ public class MessengerImpl implements Me
             throw new IllegalStateException("messenger is stopped");
         }
 
-        //the following is not safe or accurate, but it appears '~' is
-        //invalid as the start of the hostname and URI can't handle
-        //it, so this is a quick hack to avoid rewriting the parsing
-        //logic for URLs right now...
-        boolean listen = source.contains("~");
-        try
-        {
-            URI address = new URI(listen ? source.replace("~", "") : source);
-            String hostName = address.getHost();
-            if (hostName == null) throw new MessengerException("Invalid source 
address (hostname cannot be null): " + source);
-            int port = address.getPort() < 0 ? 
defaultPort(address.getScheme()) : address.getPort();
-            if (listen)
-            {
-                if(_logger.isLoggable(Level.FINE))
-                {
-                    _logger.fine(this + " about to subscribe to source " + 
source + " using address " + hostName + ":" + port);
-                }
-                _driver.createListener(hostName, port, null);
-            }
-            else
+
+        String routed = routeAddress(source);
+        Address address = new Address(routed);
+
+        String hostName = address.getHost();
+        if (hostName == null) throw new MessengerException("Invalid address 
(hostname cannot be null): " + routed);
+        String ports = address.getPort() == null ? 
defaultPort(address.getScheme()) : address.getPort();
+        int port = Integer.valueOf(ports);
+        if (address.isPassive())
+        {
+            if(_logger.isLoggable(Level.FINE))
             {
-                if(_logger.isLoggable(Level.FINE))
-                {
-                    _logger.fine(this + " about to subscribe to source " + 
source);
-                }
-                getLink(hostName, port, new 
ReceiverFinder(cleanPath(address.getPath())));
+                _logger.fine(this + " about to subscribe to source " + source 
+ " using address " + hostName + ":" + port);
             }
+            _driver.createListener(hostName, port, null);
         }
-        catch (URISyntaxException e)
+        else
         {
-            throw new MessengerException("Invalid source: " + source, e);
+            if(_logger.isLoggable(Level.FINE))
+            {
+                _logger.fine(this + " about to subscribe to source " + source);
+            }
+            getLink(hostName, port, new ReceiverFinder(address.getName()));
         }
-
     }
 
     public int outgoing()
@@ -379,14 +429,20 @@ public class MessengerImpl implements Me
     {
         return TrackerQueue.isOutgoing(tracker) ? _outgoing : _incoming;
     }
+
+    @Override
     public void reject(Tracker tracker, int flags)
     {
         getTrackerQueue(tracker).reject(tracker, flags);
     }
+
+    @Override
     public void accept(Tracker tracker, int flags)
     {
         getTrackerQueue(tracker).accept(tracker, flags);
     }
+
+    @Override
     public void settle(Tracker tracker, int flags)
     {
         getTrackerQueue(tracker).settle(tracker, flags);
@@ -397,6 +453,18 @@ public class MessengerImpl implements Me
         return getTrackerQueue(tracker).getStatus(tracker);
     }
 
+    @Override
+    public void route(String pattern, String address)
+    {
+        _routes.rule(pattern, address);
+    }
+
+    @Override
+    public void rewrite(String pattern, String address)
+    {
+        _rewrites.rule(pattern, address);
+    }
+
     private int queued(boolean outgoing)
     {
         int count = 0;
@@ -841,7 +909,7 @@ public class MessengerImpl implements Me
 
         SenderFinder(String path)
         {
-            _path = path;
+            _path = path == null ? "" : path;
         }
 
         public Sender test(Link link)
@@ -872,7 +940,7 @@ public class MessengerImpl implements Me
 
         ReceiverFinder(String path)
         {
-            _path = path;
+            _path = path == null ? "" : path;
         }
 
         public Receiver test(Link link)
@@ -1054,19 +1122,6 @@ public class MessengerImpl implements Me
         }
     }
 
-    private static String cleanPath(String path)
-    {
-        //remove leading '/'
-        if (path != null && path.length() > 0 && path.charAt(0) == '/')
-        {
-            return path.substring(1);
-        }
-        else
-        {
-            return path;
-        }
-    }
-
     private static boolean matchTarget(Target target, String path)
     {
         if (target == null) return path.isEmpty();
@@ -1079,10 +1134,10 @@ public class MessengerImpl implements Me
         else return path.equals(source.getAddress());
     }
 
-    private static int defaultPort(String scheme)
+    private static String defaultPort(String scheme)
     {
-        if ("amqps".equals(scheme)) return 5671;
-        else return 5672;
+        if ("amqps".equals(scheme)) return "5671";
+        else return "5672";
     }
 
     @Override

Added: 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/Transform.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/Transform.java?rev=1518740&view=auto
==============================================================================
--- 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/Transform.java
 (added)
+++ 
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/Transform.java
 Thu Aug 29 18:07:49 2013
@@ -0,0 +1,150 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.messenger.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+/**
+ * Transform
+ *
+ */
+
+class Transform
+{
+
+    private class Rule {
+
+        String _pattern;
+        String _substitution;
+
+        Pattern _compiled;
+        StringBuilder _sb = new StringBuilder();
+        boolean _matched = false;
+        String _result = null;
+
+        Rule(String pattern, String substitution)
+        {
+            _pattern = pattern;
+            _substitution = substitution;
+            _compiled = Pattern.compile(_pattern.replace("*", 
"(.*)").replace("%", "([^/]*)"));
+        }
+
+        boolean apply(String src) {
+            _matched = false;
+            _result = null;
+            Matcher m = _compiled.matcher(src);
+            if (m.matches()) {
+                _matched = true;
+                if (_substitution != null) {
+                    _sb.setLength(0);
+                    int limit = _substitution.length();
+                    int idx = 0;
+                    while (idx < limit) {
+                        char c = _substitution.charAt(idx);
+                        switch (c) {
+                        case '$':
+                            idx++;
+                            if (idx < limit) {
+                                c = _substitution.charAt(idx);
+                            } else {
+                                throw new IllegalStateException("substition 
index truncated");
+                            }
+
+                            if (c == '$') {
+                                _sb.append(c);
+                                idx++;
+                            } else {
+                                int num = 0;
+                                while (Character.isDigit(c)) {
+                                    num *= 10;
+                                    num += c - '0';
+                                    idx++;
+                                    c = idx < limit ? 
_substitution.charAt(idx) : '\0';
+                                }
+                                if (num > 0) {
+                                    _sb.append(m.group(num));
+                                } else {
+                                    throw new IllegalStateException
+                                        ("bad substitution index at 
character[" +
+                                         idx + "]: " + _substitution);
+                                }
+                            }
+                            break;
+                        default:
+                            _sb.append(c);
+                            idx++;
+                            break;
+                        }
+                    }
+                    _result = _sb.toString();
+                }
+            }
+
+            return _matched;
+        }
+
+        boolean matched() {
+            return _matched;
+        }
+
+        String result() {
+            return _result;
+        }
+
+    }
+
+    private List<Rule> _rules = new ArrayList<Rule>();
+    private Rule _matched = null;
+
+    public void rule(String pattern, String substitution)
+    {
+        _rules.add(new Rule(pattern, substitution));
+    }
+
+    public boolean apply(String src)
+    {
+        _matched = null;
+
+        for (Rule rule: _rules) {
+            if (rule.apply(src)) {
+                _matched = rule;
+                break;
+            }
+        }
+
+        return _matched != null;
+    }
+
+    public boolean matched()
+    {
+        return _matched != null;
+    }
+
+    public String result()
+    {
+        return _matched != null ? _matched.result() : null;
+    }
+
+}

Modified: qpid/proton/trunk/tests/python/proton-test
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton-test?rev=1518740&r1=1518739&r2=1518740&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton-test (original)
+++ qpid/proton/trunk/tests/python/proton-test Thu Aug 29 18:07:49 2013
@@ -346,7 +346,7 @@ class Runner:
       self.exception_phase_name = phase_name
       self.exception = sys.exc_info()
       exception_type = self.exception[0]
-      self.skip = getattr(exception_type, "skipped", False) or 
getattr(exception_type, "protonUnsupportedOperation", False)
+      self.skip = getattr(exception_type, "skipped", False)
 
   def status(self):
     if self.passed():



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to