Author: rhs
Date: Thu Aug 29 18:07:49 2013
New Revision: 1518740
URL: http://svn.apache.org/r1518740
Log:
implemented route and rewrite for java Messenger
Added:
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/Transform.java
Modified:
qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/ProtonUnsupportedOperationException.java
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
qpid/proton/trunk/tests/python/proton-test
Modified:
qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java?rev=1518740&r1=1518739&r2=1518740&view=diff
==============================================================================
---
qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java
(original)
+++
qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java
Thu Aug 29 18:07:49 2013
@@ -279,6 +279,20 @@ class JNIMessenger implements Messenger
return Status.UNKNOWN; //TODO - is this correct?
}
+ @Override
+ public void route(String pattern, String address)
+ {
+ int err = Proton.pn_messenger_route(_impl, pattern, address);
+ check(err);
+ }
+
+ @Override
+ public void rewrite(String pattern, String address)
+ {
+ int err = Proton.pn_messenger_rewrite(_impl, pattern, address);
+ check(err);
+ }
+
private void check(int errorCode) throws ProtonException
{
if(errorCode != 0 && errorCode != Proton.PN_INPROGRESS)
Modified:
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/ProtonUnsupportedOperationException.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/ProtonUnsupportedOperationException.java?rev=1518740&r1=1518739&r2=1518740&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/ProtonUnsupportedOperationException.java
(original)
+++
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/ProtonUnsupportedOperationException.java
Thu Aug 29 18:07:49 2013
@@ -26,7 +26,7 @@ package org.apache.qpid.proton;
public class ProtonUnsupportedOperationException extends
UnsupportedOperationException
{
/** Used by the Python test layer to detect an unsupported operation */
- public static final boolean protonUnsupportedOperation = true;
+ public static final boolean skipped = true;
public ProtonUnsupportedOperationException()
{
Modified:
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java?rev=1518740&r1=1518739&r2=1518740&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
(original)
+++
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
Thu Aug 29 18:07:49 2013
@@ -204,4 +204,9 @@ public interface Messenger
* with the given tracker.
*/
Status getStatus(Tracker tracker);
+
+ public void route(String pattern, String address);
+
+ public void rewrite(String pattern, String address);
+
}
Modified: qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py?rev=1518740&r1=1518739&r2=1518740&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
(original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py Thu Aug
29 18:07:49 2013
@@ -1289,11 +1289,11 @@ class Messenger(object):
else:
self.impl = Proton.messenger()
- def route(self, *args, **kwargs):
- raise Skipped()
+ def route(self, pattern, address):
+ self.impl.route(pattern, address)
- def rewrite(self, *args, **kwargs):
- raise Skipped()
+ def rewrite(self, pattern, address):
+ self.impl.rewrite(pattern, address)
def start(self):
self.impl.start()
Modified:
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java?rev=1518740&r1=1518739&r2=1518740&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
(original)
+++
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
Thu Aug 29 18:07:49 2013
@@ -158,9 +158,9 @@ public class DriverImpl implements Drive
try
{
ServerSocketChannel serverSocketChannel =
ServerSocketChannel.open();
- serverSocketChannel.configureBlocking(false);
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(host, port));
+ serverSocketChannel.configureBlocking(false);
Listener<C> listener = createListener(serverSocketChannel,
context);
_logger.fine("Created listener on " + host + ":" + port + ": " +
context);
Added:
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java?rev=1518740&view=auto
==============================================================================
---
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java
(added)
+++
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java
Thu Aug 29 18:07:49 2013
@@ -0,0 +1,140 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.messenger.impl;
+
+
+/**
+ * Address
+ *
+ */
+
+class Address
+{
+
+ private String _address;
+ private boolean _passive;
+ private String _scheme;
+ private String _user;
+ private String _pass;
+ private String _host;
+ private String _port;
+ private String _name;
+
+ public Address(String address)
+ {
+ _address = address;
+ parse();
+ }
+
+ private void parse()
+ {
+ _passive = false;
+ _scheme = null;
+ _user = null;
+ _pass = null;
+ _host = null;
+ _port = null;
+ _name = null;
+
+ int start = 0;
+ int schemeEnd = _address.indexOf("://", start);
+ if (schemeEnd > 0) {
+ _scheme = _address.substring(start, schemeEnd);
+ start = schemeEnd + 3;
+ }
+
+ int at = _address.indexOf('@', start);
+ if (at > 0) {
+ String up = _address.substring(start, at);
+ int colon = up.indexOf(':');
+ if (colon > 0) {
+ _user = up.substring(0, colon);
+ _pass = up.substring(colon + 1);
+ } else {
+ _user = up;
+ }
+ start = at + 1;
+ }
+
+ int slash = _address.indexOf("/", start);
+ String hp;
+ if (slash > 0) {
+ hp = _address.substring(start, slash);
+ _name = _address.substring(slash + 1);
+ } else {
+ hp = _address.substring(start);
+ }
+
+ int colon = hp.indexOf(':');
+ if (colon > 0) {
+ _host = hp.substring(0, colon);
+ _port = hp.substring(colon + 1);
+ } else {
+ _host = hp;
+ }
+
+ if (_host.startsWith("~")) {
+ _host = _host.substring(1);
+ _passive = true;
+ }
+ }
+
+ public String toString()
+ {
+ return _address;
+ }
+
+ public boolean isPassive()
+ {
+ return _passive;
+ }
+
+ public String getScheme()
+ {
+ return _scheme;
+ }
+
+ public String getUser()
+ {
+ return _user;
+ }
+
+ public String getPass()
+ {
+ return _pass;
+ }
+
+ public String getHost()
+ {
+ return _host;
+ }
+
+ public String getPort()
+ {
+ return _port;
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+}
Modified:
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1518740&r1=1518739&r2=1518740&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
(original)
+++
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
Thu Aug 29 18:07:49 2013
@@ -21,8 +21,6 @@
package org.apache.qpid.proton.messenger.impl;
import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Iterator;
@@ -82,6 +80,9 @@ public class MessengerImpl implements Me
private TrackerQueue _outgoing = new TrackerQueue();
private List<Connector> _awaitingDestruction = new ArrayList<Connector>();
+ private Transform _routes = new Transform();
+ private Transform _rewrites = new Transform();
+
/**
* @deprecated This constructor's visibility will be reduced to the
default scope in a future release.
@@ -174,6 +175,60 @@ public class MessengerImpl implements Me
}
}
+ private String defaultRewrite(String address) {
+ if (address != null && address.contains("@")) {
+ Address addr = new Address(address);
+ String scheme = addr.getScheme();
+ String host = addr.getHost();
+ String port = addr.getPort();
+ String name = addr.getName();
+
+ StringBuilder sb = new StringBuilder();
+ if (scheme != null) {
+ sb.append(scheme).append("://");
+ }
+ if (host != null) {
+ sb.append(host);
+ }
+ if (port != null) {
+ sb.append(":").append(port);
+ }
+ if (name != null) {
+ sb.append("/").append(name);
+ }
+ return sb.toString();
+ } else {
+ return address;
+ }
+ }
+
+
+ private String _original;
+
+ private void rewriteMessage(Message m)
+ {
+ _original = m.getAddress();
+ if (_rewrites.apply(_original)) {
+ m.setAddress(_rewrites.result());
+ } else {
+ m.setAddress(defaultRewrite(_original));
+ }
+ }
+
+ private void restoreMessage(Message m)
+ {
+ m.setAddress(_original);
+ }
+
+ private String routeAddress(String addr)
+ {
+ if (_routes.apply(addr)) {
+ return _routes.result();
+ } else {
+ return addr;
+ }
+ }
+
public void put(Message m) throws MessengerException
{
if (_driver == null) {
@@ -184,15 +239,19 @@ public class MessengerImpl implements Me
{
_logger.fine(this + " about to put message: " + m);
}
- try
- {
- URI address = new URI(m.getAddress());
+
+ String addr = routeAddress(m.getAddress());
+ rewriteMessage(m);
+
+ try {
+ Address address = new Address(addr);
if (address.getHost() == null)
{
- throw new MessengerException("unable to send to address: " +
m.getAddress());
+ throw new MessengerException("unable to send to address: " +
addr);
}
- int port = address.getPort() < 0 ?
defaultPort(address.getScheme()) : address.getPort();
- Sender sender = getLink(address.getHost(), port, new
SenderFinder(cleanPath(address.getPath())));
+ String ports = address.getPort() == null ?
defaultPort(address.getScheme()) : address.getPort();
+ int port = Integer.valueOf(ports);
+ Sender sender = getLink(address.getHost(), port, new
SenderFinder(address.getName()));
adjustReplyTo(m);
@@ -213,9 +272,9 @@ public class MessengerImpl implements Me
_outgoing.add(delivery);
sender.advance();
}
- catch (URISyntaxException e)
+ finally
{
- throw new MessengerException("Invalid address: " + m.getAddress(),
e);
+ restoreMessage(m);
}
}
@@ -302,39 +361,30 @@ public class MessengerImpl implements Me
throw new IllegalStateException("messenger is stopped");
}
- //the following is not safe or accurate, but it appears '~' is
- //invalid as the start of the hostname and URI can't handle
- //it, so this is a quick hack to avoid rewriting the parsing
- //logic for URLs right now...
- boolean listen = source.contains("~");
- try
- {
- URI address = new URI(listen ? source.replace("~", "") : source);
- String hostName = address.getHost();
- if (hostName == null) throw new MessengerException("Invalid source
address (hostname cannot be null): " + source);
- int port = address.getPort() < 0 ?
defaultPort(address.getScheme()) : address.getPort();
- if (listen)
- {
- if(_logger.isLoggable(Level.FINE))
- {
- _logger.fine(this + " about to subscribe to source " +
source + " using address " + hostName + ":" + port);
- }
- _driver.createListener(hostName, port, null);
- }
- else
+
+ String routed = routeAddress(source);
+ Address address = new Address(routed);
+
+ String hostName = address.getHost();
+ if (hostName == null) throw new MessengerException("Invalid address
(hostname cannot be null): " + routed);
+ String ports = address.getPort() == null ?
defaultPort(address.getScheme()) : address.getPort();
+ int port = Integer.valueOf(ports);
+ if (address.isPassive())
+ {
+ if(_logger.isLoggable(Level.FINE))
{
- if(_logger.isLoggable(Level.FINE))
- {
- _logger.fine(this + " about to subscribe to source " +
source);
- }
- getLink(hostName, port, new
ReceiverFinder(cleanPath(address.getPath())));
+ _logger.fine(this + " about to subscribe to source " + source
+ " using address " + hostName + ":" + port);
}
+ _driver.createListener(hostName, port, null);
}
- catch (URISyntaxException e)
+ else
{
- throw new MessengerException("Invalid source: " + source, e);
+ if(_logger.isLoggable(Level.FINE))
+ {
+ _logger.fine(this + " about to subscribe to source " + source);
+ }
+ getLink(hostName, port, new ReceiverFinder(address.getName()));
}
-
}
public int outgoing()
@@ -379,14 +429,20 @@ public class MessengerImpl implements Me
{
return TrackerQueue.isOutgoing(tracker) ? _outgoing : _incoming;
}
+
+ @Override
public void reject(Tracker tracker, int flags)
{
getTrackerQueue(tracker).reject(tracker, flags);
}
+
+ @Override
public void accept(Tracker tracker, int flags)
{
getTrackerQueue(tracker).accept(tracker, flags);
}
+
+ @Override
public void settle(Tracker tracker, int flags)
{
getTrackerQueue(tracker).settle(tracker, flags);
@@ -397,6 +453,18 @@ public class MessengerImpl implements Me
return getTrackerQueue(tracker).getStatus(tracker);
}
+ @Override
+ public void route(String pattern, String address)
+ {
+ _routes.rule(pattern, address);
+ }
+
+ @Override
+ public void rewrite(String pattern, String address)
+ {
+ _rewrites.rule(pattern, address);
+ }
+
private int queued(boolean outgoing)
{
int count = 0;
@@ -841,7 +909,7 @@ public class MessengerImpl implements Me
SenderFinder(String path)
{
- _path = path;
+ _path = path == null ? "" : path;
}
public Sender test(Link link)
@@ -872,7 +940,7 @@ public class MessengerImpl implements Me
ReceiverFinder(String path)
{
- _path = path;
+ _path = path == null ? "" : path;
}
public Receiver test(Link link)
@@ -1054,19 +1122,6 @@ public class MessengerImpl implements Me
}
}
- private static String cleanPath(String path)
- {
- //remove leading '/'
- if (path != null && path.length() > 0 && path.charAt(0) == '/')
- {
- return path.substring(1);
- }
- else
- {
- return path;
- }
- }
-
private static boolean matchTarget(Target target, String path)
{
if (target == null) return path.isEmpty();
@@ -1079,10 +1134,10 @@ public class MessengerImpl implements Me
else return path.equals(source.getAddress());
}
- private static int defaultPort(String scheme)
+ private static String defaultPort(String scheme)
{
- if ("amqps".equals(scheme)) return 5671;
- else return 5672;
+ if ("amqps".equals(scheme)) return "5671";
+ else return "5672";
}
@Override
Added:
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/Transform.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/Transform.java?rev=1518740&view=auto
==============================================================================
---
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/Transform.java
(added)
+++
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/Transform.java
Thu Aug 29 18:07:49 2013
@@ -0,0 +1,150 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.messenger.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+/**
+ * Transform
+ *
+ */
+
+class Transform
+{
+
+ private class Rule {
+
+ String _pattern;
+ String _substitution;
+
+ Pattern _compiled;
+ StringBuilder _sb = new StringBuilder();
+ boolean _matched = false;
+ String _result = null;
+
+ Rule(String pattern, String substitution)
+ {
+ _pattern = pattern;
+ _substitution = substitution;
+ _compiled = Pattern.compile(_pattern.replace("*",
"(.*)").replace("%", "([^/]*)"));
+ }
+
+ boolean apply(String src) {
+ _matched = false;
+ _result = null;
+ Matcher m = _compiled.matcher(src);
+ if (m.matches()) {
+ _matched = true;
+ if (_substitution != null) {
+ _sb.setLength(0);
+ int limit = _substitution.length();
+ int idx = 0;
+ while (idx < limit) {
+ char c = _substitution.charAt(idx);
+ switch (c) {
+ case '$':
+ idx++;
+ if (idx < limit) {
+ c = _substitution.charAt(idx);
+ } else {
+ throw new IllegalStateException("substition
index truncated");
+ }
+
+ if (c == '$') {
+ _sb.append(c);
+ idx++;
+ } else {
+ int num = 0;
+ while (Character.isDigit(c)) {
+ num *= 10;
+ num += c - '0';
+ idx++;
+ c = idx < limit ?
_substitution.charAt(idx) : '\0';
+ }
+ if (num > 0) {
+ _sb.append(m.group(num));
+ } else {
+ throw new IllegalStateException
+ ("bad substitution index at
character[" +
+ idx + "]: " + _substitution);
+ }
+ }
+ break;
+ default:
+ _sb.append(c);
+ idx++;
+ break;
+ }
+ }
+ _result = _sb.toString();
+ }
+ }
+
+ return _matched;
+ }
+
+ boolean matched() {
+ return _matched;
+ }
+
+ String result() {
+ return _result;
+ }
+
+ }
+
+ private List<Rule> _rules = new ArrayList<Rule>();
+ private Rule _matched = null;
+
+ public void rule(String pattern, String substitution)
+ {
+ _rules.add(new Rule(pattern, substitution));
+ }
+
+ public boolean apply(String src)
+ {
+ _matched = null;
+
+ for (Rule rule: _rules) {
+ if (rule.apply(src)) {
+ _matched = rule;
+ break;
+ }
+ }
+
+ return _matched != null;
+ }
+
+ public boolean matched()
+ {
+ return _matched != null;
+ }
+
+ public String result()
+ {
+ return _matched != null ? _matched.result() : null;
+ }
+
+}
Modified: qpid/proton/trunk/tests/python/proton-test
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton-test?rev=1518740&r1=1518739&r2=1518740&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton-test (original)
+++ qpid/proton/trunk/tests/python/proton-test Thu Aug 29 18:07:49 2013
@@ -346,7 +346,7 @@ class Runner:
self.exception_phase_name = phase_name
self.exception = sys.exc_info()
exception_type = self.exception[0]
- self.skip = getattr(exception_type, "skipped", False) or
getattr(exception_type, "protonUnsupportedOperation", False)
+ self.skip = getattr(exception_type, "skipped", False)
def status(self):
if self.passed():
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]