Author: gsim
Date: Tue Dec 11 13:11:00 2012
New Revision: 1420140
URL: http://svn.apache.org/viewvc?rev=1420140&view=rev
Log:
PROTON-118: add initial implementation of messenger
Added:
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/AcceptMode.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/MessengerException.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/Status.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/Tracker.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerQueue.java
Modified:
qpid/proton/trunk/proton-j/proton/src/main/scripts/proton.py
Added:
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/AcceptMode.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/AcceptMode.java?rev=1420140&view=auto
==============================================================================
---
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/AcceptMode.java
(added)
+++
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/AcceptMode.java
Tue Dec 11 13:11:00 2012
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.qpid.proton.messenger;
+
+public enum AcceptMode
+{
+ AUTO,
+ MANUAL
+}
+
Added:
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/Messenger.java?rev=1420140&view=auto
==============================================================================
---
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
(added)
+++
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
Tue Dec 11 13:11:00 2012
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.qpid.proton.messenger;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+import org.apache.qpid.proton.message.Message;
+
+public interface Messenger
+{
+ static final int CUMULATIVE = 0x01;
+
+ void put(Message message) throws MessengerException;
+ void send() throws TimeoutException;
+
+ void subscribe(String source) throws MessengerException;
+ void recv(int count) throws TimeoutException;
+ Message get();
+
+ void start() throws IOException;
+ void stop();
+
+ void setTimeout(long timeInMillis);
+ long getTimeout();
+
+ int outgoing();
+ int incoming();
+
+ AcceptMode getAcceptMode();
+ void setAcceptMode(AcceptMode mode);
+
+ int getIncomingWindow();
+ void setIncomingWindow(int window);
+
+ int getOutgoingWindow();
+ void setOutgoingWindow(int window);
+
+ Tracker incomingTracker();
+ Tracker outgoingTracker();
+
+ void reject(Tracker tracker, int flags);
+ void accept(Tracker tracker, int flags);
+ void settle(Tracker tracker, int flags);
+
+ Status getStatus(Tracker tracker);
+}
Added:
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/MessengerException.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/MessengerException.java?rev=1420140&view=auto
==============================================================================
---
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/MessengerException.java
(added)
+++
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/MessengerException.java
Tue Dec 11 13:11:00 2012
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.messenger;
+
+public class MessengerException extends Exception
+{
+ public MessengerException()
+ {
+ }
+
+ public MessengerException(String message)
+ {
+ super(message);
+ }
+
+ public MessengerException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ public MessengerException(Throwable cause)
+ {
+ super(cause);
+ }
+
+}
Added:
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/Status.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/Status.java?rev=1420140&view=auto
==============================================================================
---
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/Status.java
(added)
+++
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/Status.java
Tue Dec 11 13:11:00 2012
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.qpid.proton.messenger;
+
+public enum Status
+{
+ UNKNOWN,
+ PENDING,
+ ACCEPTED,
+ REJECTED
+}
Added:
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/Tracker.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/Tracker.java?rev=1420140&view=auto
==============================================================================
---
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/Tracker.java
(added)
+++
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/Tracker.java
Tue Dec 11 13:11:00 2012
@@ -0,0 +1,24 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.qpid.proton.messenger;
+
+public interface Tracker { }
+
Added:
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1420140&view=auto
==============================================================================
---
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
(added)
+++
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
Tue Dec 11 13:11:00 2012
@@ -0,0 +1,877 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.qpid.proton.messenger.impl;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.qpid.proton.driver.Connector;
+import org.apache.qpid.proton.driver.Driver;
+import org.apache.qpid.proton.driver.Listener;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.driver.impl.DriverImpl;
+import org.apache.qpid.proton.engine.impl.ConnectionImpl;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.messenger.AcceptMode;
+import org.apache.qpid.proton.messenger.Messenger;
+import org.apache.qpid.proton.messenger.MessengerException;
+import org.apache.qpid.proton.messenger.Status;
+import org.apache.qpid.proton.messenger.Tracker;
+import org.apache.qpid.proton.type.messaging.Accepted;
+import org.apache.qpid.proton.type.messaging.Source;
+import org.apache.qpid.proton.type.messaging.Target;
+
+public class MessengerImpl implements Messenger
+{
+ private static final EnumSet<EndpointState> UNINIT =
EnumSet.of(EndpointState.UNINITIALIZED);
+ private static final EnumSet<EndpointState> ACTIVE =
EnumSet.of(EndpointState.ACTIVE);
+ private static final EnumSet<EndpointState> CLOSED =
EnumSet.of(EndpointState.CLOSED);
+ private static final EnumSet<EndpointState> ANY =
EnumSet.allOf(EndpointState.class);
+ private static final Accepted ACCEPTED = new Accepted();
+
+ private final Logger _logger = Logger.getLogger("proton.messenger");
+ private final String _name;
+ private long _timeout = -1;
+ private long _nextTag = 1;
+ private byte[] _buffer = new byte[5*1024];
+ private Driver _driver;
+ private int _credit;
+ private int _distributed;
+ private AcceptMode _acceptMode = AcceptMode.AUTO;
+ private TrackerQueue _incoming = new TrackerQueue();
+ private TrackerQueue _outgoing = new TrackerQueue();
+
+ public MessengerImpl()
+ {
+ this(java.util.UUID.randomUUID().toString());
+ }
+
+ public MessengerImpl(String name)
+ {
+ _name = name;
+ }
+
+ public void setTimeout(long timeInMillis)
+ {
+ _timeout = timeInMillis;
+ }
+
+ public long getTimeout()
+ {
+ return _timeout;
+ }
+
+ public void start() throws IOException
+ {
+ _driver = new DriverImpl();
+ }
+
+ public void stop()
+ {
+ //close all connections
+ for (Connector c : _driver.connectors())
+ {
+ Connection connection = c.getConnection();
+ connection.close();
+ try
+ {
+ c.process();
+ }
+ catch (IOException e)
+ {
+ _logger.log(Level.WARNING, "Error while sending close", e);
+ }
+ }
+ //stop listeners
+ for (Listener l : _driver.listeners())
+ {
+ try
+ {
+ l.close();
+ }
+ catch (IOException e)
+ {
+ _logger.log(Level.WARNING, "Error while closing listener", e);
+ }
+ }
+ _driver.destroy();
+ }
+
+ public void put(Message m) throws MessengerException
+ {
+ try
+ {
+ URI address = new URI(m.getAddress());
+ if (address.getHost() == null) throw new
MessengerException("unable to send to address: " + m.getAddress());
+ int port = address.getPort() < 0 ?
defaultPort(address.getScheme()) : address.getPort();
+ Sender sender = getLink(address.getHost(), port, new
SenderFinder(address.getPath()));
+
+ adjustReplyTo(m);
+
+ byte[] tag = String.valueOf(_nextTag++).getBytes();
+ Delivery delivery = sender.delivery(tag);
+ int encoded;
+ while (true)
+ {
+ try
+ {
+ encoded = m.encode(_buffer, 0, _buffer.length);
+ break;
+ } catch (java.nio.BufferOverflowException e) {
+ _buffer = new byte[_buffer.length*2];
+ }
+ }
+ sender.send(_buffer, 0, encoded);
+ _outgoing.add(delivery);
+ sender.advance();
+ }
+ catch (URISyntaxException e)
+ {
+ throw new MessengerException("Invalid address: " + m.getAddress(),
e);
+ }
+ }
+
+ public void send() throws java.util.concurrent.TimeoutException
+ {
+ waitUntil(_sentSettled);
+ }
+
+ public void recv(int n) throws java.util.concurrent.TimeoutException
+ {
+ _credit += n;
+ distributeCredit();
+
+ waitUntil(_messageAvailable);
+ }
+
+ public Message get()
+ {
+ for (Connector c : _driver.connectors())
+ {
+ Connection connection = c.getConnection();
+ _logger.log(Level.FINE, "Attempting to get message from " +
connection);
+ Delivery delivery = connection.getWorkHead();
+ while (delivery != null)
+ {
+ if (delivery.isReadable())
+ {
+ _logger.log(Level.FINE, "Readable delivery found: " +
delivery);
+ int size = read((Receiver) delivery.getLink());
+ Message message = new Message();
+ message.decode(_buffer, 0, size);
+ _incoming.add(delivery);
+ if (_acceptMode == AcceptMode.AUTO) {
+ _incoming.accept(incomingTracker());
+ }
+ _distributed--;
+ delivery.getLink().advance();
+ return message;
+ }
+ else
+ {
+ _logger.log(Level.FINE, "Delivery not readable: " +
delivery);
+ delivery = delivery.getWorkNext();
+ }
+ }
+ }
+ return null;
+ }
+
+ public void subscribe(String source) throws MessengerException
+ {
+ //the following is not safe or accurate, but it appears '~' is
+ //invalid as the start of the hostname and URI can't handle
+ //it, so this is a quick hack to avoid rewriting the parsing
+ //logic for URLs right now...
+ boolean listen = source.contains("~");
+ try
+ {
+ URI address = new URI(listen ? source.replace("~", "") : source);
+ if (address.getHost() == null) throw new
MessengerException("Invalid source address (hostname cannot be null): " +
source);
+ int port = address.getPort() < 0 ?
defaultPort(address.getScheme()) : address.getPort();
+ if (listen)
+ {
+ _driver.createListener(address.getHost(), port, null);
+ }
+ else
+ {
+ getLink(address.getHost(), port, new
ReceiverFinder(address.getPath()));
+ }
+ }
+ catch (URISyntaxException e)
+ {
+ throw new MessengerException("Invalid source: " + source, e);
+ }
+
+ }
+
+ public int outgoing()
+ {
+ return queued(true);
+ }
+
+ public int incoming()
+ {
+ return queued(false);
+ }
+
+
+ public AcceptMode getAcceptMode()
+ {
+ return _acceptMode;
+ }
+ public void setAcceptMode(AcceptMode mode)
+ {
+ _acceptMode = mode;
+ }
+
+ public int getIncomingWindow()
+ {
+ return _incoming.getWindow();
+ }
+ public void setIncomingWindow(int window)
+ {
+ _incoming.setWindow(window);
+ }
+
+ public int getOutgoingWindow()
+ {
+ return _outgoing.getWindow();
+ }
+ public void setOutgoingWindow(int window)
+ {
+ _outgoing.setWindow(window);
+ }
+
+ public Tracker incomingTracker()
+ {
+ return new TrackerImpl(false, _incoming.getHighWaterMark() - 1);
+ }
+ public Tracker outgoingTracker()
+ {
+ return new TrackerImpl(true, _outgoing.getHighWaterMark() - 1);
+ }
+
+ private TrackerQueue getTrackerQueue(Tracker tracker)
+ {
+ return TrackerQueue.isOutgoing(tracker) ? _outgoing : _incoming;
+ }
+ public void reject(Tracker tracker, int flags)
+ {
+ getTrackerQueue(tracker).reject(tracker, flags);
+ }
+ public void accept(Tracker tracker, int flags)
+ {
+ getTrackerQueue(tracker).accept(tracker, flags);
+ }
+ public void settle(Tracker tracker, int flags)
+ {
+ getTrackerQueue(tracker).settle(tracker, flags);
+ }
+
+ public Status getStatus(Tracker tracker)
+ {
+ return getTrackerQueue(tracker).getStatus(tracker);
+ }
+
+ private int queued(boolean outgoing)
+ {
+ int count = 0;
+ for (Connector c : _driver.connectors())
+ {
+ Connection connection = c.getConnection();
+ for (Link link : new Links(connection, ACTIVE, ANY))
+ {
+ if (outgoing)
+ {
+ if (link instanceof Sender) count += link.getQueued();
+ }
+ else
+ {
+ if (link instanceof Receiver) count += link.getQueued();
+ }
+ }
+ }
+ return count;
+ }
+
+ private int read(Receiver receiver)
+ {
+ //TODO: add pending count to Delivery?
+ int total = 0;
+ int start = 0;
+ while (true)
+ {
+ int read = receiver.recv(_buffer, start, _buffer.length - start);
+ total += read;
+ if (read == (_buffer.length - start))
+ {
+ //may need to expand the buffer (is there a better test?)
+ byte[] old = _buffer;
+ _buffer = new byte[_buffer.length*2];
+ System.arraycopy(old, 0, _buffer, 0, old.length);
+ start += read;
+ }
+ else
+ {
+ break;
+ }
+ }
+ return total;
+ }
+
+ private void process()
+ {
+ processAllConnectors();
+ processActive();
+ }
+
+ private void processAllConnectors()
+ {
+ for (Connector c : _driver.connectors())
+ {
+ try
+ {
+ c.process();
+ }
+ catch (IOException e)
+ {
+ _logger.log(Level.SEVERE, "Error processing connection", e);
+ }
+ }
+ }
+
+ private void processActive()
+ {
+ //process active listeners
+ for (Listener l = _driver.listener(); l != null; l =
_driver.listener())
+ {
+ Connector c = l.accept();
+ Connection connection = new ConnectionImpl();
+ connection.setContainer(_name);
+ c.setConnection(connection);
+ //TODO: SSL and full SASL
+ Sasl sasl = c.sasl();
+ if (sasl != null)
+ {
+ sasl.server();
+ sasl.setMechanisms(new String[]{"ANONYMOUS"});
+ sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
+ }
+ connection.open();
+ }
+ //process active connectors, handling opened & closed connections as
needed
+ for (Connector c = _driver.connector(); c != null; c =
_driver.connector())
+ {
+ _logger.log(Level.FINE, "Processing active connector " + c);
+ try
+ {
+ c.process();
+ } catch (IOException e) {
+ _logger.log(Level.SEVERE, "Error processing connection", e);
+ }
+ Connection connection = c.getConnection();
+
+ if (connection.getLocalState() == EndpointState.UNINITIALIZED)
+ {
+ connection.open();
+ }
+
+ Delivery delivery = connection.getWorkHead();
+ while (delivery != null)
+ {
+ if (delivery.getLink() instanceof Sender &&
delivery.isUpdated())
+ {
+ delivery.disposition(delivery.getRemoteState());
+ }
+ //TODO: delivery.clear(); What's the equivalent in java?
+ delivery = delivery.getWorkNext();
+ }
+ _outgoing.slide();
+
+ for (Session session : new Sessions(connection, UNINIT, ANY))
+ {
+ session.open();
+ _logger.log(Level.FINE, "Opened session " + session);
+ }
+ for (Link link : new Links(connection, UNINIT, ANY))
+ {
+ //TODO: the following is not correct; should only copy those
properties that we understand
+ link.setSource(link.getRemoteSource());
+ link.setTarget(link.getRemoteTarget());
+ link.open();
+ _logger.log(Level.FINE, "Opened link " + link);
+ }
+
+ distributeCredit();
+
+ for (Link link : new Links(connection, ACTIVE, CLOSED))
+ {
+ link.close();
+ }
+ for (Session session : new Sessions(connection, ACTIVE, CLOSED))
+ {
+ session.close();
+ }
+ if (connection.getLocalState() == EndpointState.ACTIVE &&
connection.getRemoteState() == EndpointState.CLOSED)
+ {
+ connection.close();
+ }
+
+ if (c.isClosed())
+ {
+ reclaimCredit(connection);
+ c.destroy();
+ }
+ else
+ {
+ try
+ {
+ c.process();
+ }
+ catch (IOException e)
+ {
+ _logger.log(Level.SEVERE, "Error processing connection",
e);
+ }
+ }
+ }
+ }
+
+ private void waitUntil(Predicate condition) throws TimeoutException
+ {
+ waitUntil(condition, _timeout);
+ }
+
+ private void waitUntil(Predicate condition, long timeout) throws
TimeoutException
+ {
+ processAllConnectors();
+
+ //wait until timeout expires or until test is true
+ long deadline = timeout < 0 ? Long.MAX_VALUE :
System.currentTimeMillis() + timeout;
+
+ boolean wait = deadline > System.currentTimeMillis();
+ boolean first = true;
+ boolean done = condition.test();
+
+ while (first || (!done && wait))
+ {
+ if (wait && !done && !first) {
+ _driver.doWait(deadline - System.currentTimeMillis());
+ }
+ processActive();
+ wait = deadline > System.currentTimeMillis();
+ done = done || condition.test();
+ first = false;
+ }
+ }
+
+ private Connection lookup(String host, String service)
+ {
+ for (Connector c : _driver.connectors())
+ {
+ Connection connection = c.getConnection();
+ if (host.equals(connection.getRemoteContainer()) ||
service.equals(connection.getContext()))
+ {
+ return connection;
+ }
+ }
+ return null;
+ }
+
+ private void reclaimCredit(Connection connection)
+ {
+ for (Link link : new Links(connection, ANY, ANY))
+ {
+ if (link instanceof Receiver && link.getCredit() > 0)
+ {
+ reclaimCredit(link.getCredit());
+ }
+ }
+ }
+
+ private void reclaimCredit(int credit)
+ {
+ _credit += credit;
+ _distributed -= credit;
+ }
+
+ private void distributeCredit()
+ {
+ int previous = 0;
+ while (_credit > 0 && _credit != previous)
+ {
+ previous = _credit;
+ for (Connector c : _driver.connectors())
+ {
+ Connection connection = c.getConnection();
+ for (Link link : new Links(connection, ACTIVE, ANY))
+ {
+ if (link instanceof Receiver)
+ {
+ ((Receiver) link).flow(1);
+ _credit--;
+ _distributed++;
+ if (_credit == 0) return;
+ }
+ }
+ }
+ }
+ }
+
+ private interface Predicate
+ {
+ boolean test();
+ }
+
+ private class SentSettled implements Predicate
+ {
+ public boolean test()
+ {
+ //are all sent messages settled?
+ for (Connector c : _driver.connectors())
+ {
+ Connection connection = c.getConnection();
+ for (Link link : new Links(connection, ACTIVE, ACTIVE))
+ {
+ if (link instanceof Sender)
+ {
+ if (link.getQueued() > 0)
+ {
+ return false;
+ }
+ //TODO: Sender.unsettled() not yet implemented, when
it is change to the following
+ //if (checkSettled(link.unsettled())
+ //{
+ // return false;
+ //}
+ }
+ }
+ }
+ //TODO: Sender.unsettled() not yet implemented, when it is change
to the following
+ //return true;
+ return checkSettled(_outgoing.deliveries());
+ }
+
+ boolean checkSettled(Iterator<Delivery> unsettled)
+ {
+ if (unsettled != null)
+ {
+ while (unsettled.hasNext())
+ {
+ Delivery d = unsettled.next();
+ if (d == null)
+ {
+ break;
+ }
+ if (d.getRemoteState() != null || d.remotelySettled())
+ {
+ d.settle();
+ }
+ else if
(d.getLink().getSession().getConnection().getRemoteState() ==
EndpointState.CLOSED)
+ {
+ continue;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+ }
+
+ private class MessageAvailable implements Predicate
+ {
+ public boolean test()
+ {
+ //do we have at least one message?
+ for (Connector c : _driver.connectors())
+ {
+ Connection connection = c.getConnection();
+ Delivery delivery = connection.getWorkHead();
+ while (delivery != null)
+ {
+ if (delivery.isReadable())
+ {
+ //TODO: check for partial delivery?
+ return true;
+ }
+ else
+ {
+ delivery = delivery.getWorkNext();
+ }
+ }
+ }
+ return false;
+ }
+ }
+
+ private final SentSettled _sentSettled = new SentSettled();
+ private final MessageAvailable _messageAvailable = new MessageAvailable();
+
+ private interface LinkFinder<C extends Link>
+ {
+ C test(Link link);
+ C create(Session session);
+ }
+
+ private class SenderFinder implements LinkFinder<Sender>
+ {
+ private final String _path;
+
+ SenderFinder(String path)
+ {
+ _path = path;
+ }
+
+ public Sender test(Link link)
+ {
+ if (link instanceof Sender && matchTarget((Target)
link.getTarget(), _path))
+ {
+ return (Sender) link;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public Sender create(Session session)
+ {
+ return session.sender(_path);
+ }
+ }
+
+ private class ReceiverFinder implements LinkFinder<Receiver>
+ {
+ private final String _path;
+
+ ReceiverFinder(String path)
+ {
+ _path = path;
+ }
+
+ public Receiver test(Link link)
+ {
+ if (link instanceof Receiver && matchSource((Source)
link.getSource(), _path))
+ {
+ return (Receiver) link;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public Receiver create(Session session)
+ {
+ return session.receiver(_path);
+ }
+ }
+
+ private <C extends Link> C getLink(String host, int port, LinkFinder<C>
finder)
+ {
+ String service = host + ":" + port;
+ Connection connection = lookup(host, service);
+ if (connection == null)
+ {
+ Connector connector = _driver.createConnector(host, port, null);
+ _logger.log(Level.FINE, "Connecting to " + host + ":" + port);
+ connection = new ConnectionImpl();
+ connection.setContainer(_name);
+ connection.setHostname(host);
+ connection.setContext(service);
+ connector.setConnection(connection);
+ Sasl sasl = connector.sasl();
+ if (sasl != null)
+ {
+ sasl.client();
+ sasl.setMechanisms(new String[]{"ANONYMOUS"});
+ }
+ connection.open();
+ }
+
+ for (Link link : new Links(connection, ACTIVE, ANY))
+ {
+ C result = finder.test(link);
+ if (result != null) return result;
+ }
+ Session session = connection.session();
+ session.open();
+ C link = finder.create(session);
+ link.open();
+ return link;
+ }
+
+ private static class Links implements Iterable<Link>
+ {
+ private final Connection _connection;
+ private final EnumSet<EndpointState> _local;
+ private final EnumSet<EndpointState> _remote;
+
+ Links(Connection connection, EnumSet<EndpointState> local,
EnumSet<EndpointState> remote)
+ {
+ _connection = connection;
+ _local = local;
+ _remote = remote;
+ }
+
+ public java.util.Iterator<Link> iterator()
+ {
+ return new LinkIterator(_connection, _local, _remote);
+ }
+ }
+
+ private static class LinkIterator implements java.util.Iterator<Link>
+ {
+ private final EnumSet<EndpointState> _local;
+ private final EnumSet<EndpointState> _remote;
+ private Link _next;
+
+ LinkIterator(Connection connection, EnumSet<EndpointState> local,
EnumSet<EndpointState> remote)
+ {
+ _local = local;
+ _remote = remote;
+ _next = connection.linkHead(_local, _remote);
+ }
+
+ public boolean hasNext()
+ {
+ return _next != null;
+ }
+
+ public Link next()
+ {
+ try
+ {
+ return _next;
+ }
+ finally
+ {
+ _next = _next.next(_local, _remote);
+ }
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private static class Sessions implements Iterable<Session>
+ {
+ private final Connection _connection;
+ private final EnumSet<EndpointState> _local;
+ private final EnumSet<EndpointState> _remote;
+
+ Sessions(Connection connection, EnumSet<EndpointState> local,
EnumSet<EndpointState> remote)
+ {
+ _connection = connection;
+ _local = local;
+ _remote = remote;
+ }
+
+ public java.util.Iterator<Session> iterator()
+ {
+ return new SessionIterator(_connection, _local, _remote);
+ }
+ }
+
+ private static class SessionIterator implements java.util.Iterator<Session>
+ {
+ private final EnumSet<EndpointState> _local;
+ private final EnumSet<EndpointState> _remote;
+ private Session _next;
+
+ SessionIterator(Connection connection, EnumSet<EndpointState> local,
EnumSet<EndpointState> remote)
+ {
+ _local = local;
+ _remote = remote;
+ _next = connection.sessionHead(_local, _remote);
+ }
+
+ public boolean hasNext()
+ {
+ return _next != null;
+ }
+
+ public Session next()
+ {
+ try
+ {
+ return _next;
+ }
+ finally
+ {
+ _next = _next.next(_local, _remote);
+ }
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private void adjustReplyTo(Message m)
+ {
+ String original = m.getReplyTo();
+ if (original == null || original.length() == 0)
+ {
+ m.setReplyTo("amqp://" + _name);
+ }
+ else if (original.startsWith("~/"))
+ {
+ m.setReplyTo("amqp://" + _name + "/" + original.substring(2));
+ }
+ }
+
+ private static boolean matchTarget(Target target, String path)
+ {
+ if (target == null) return path.isEmpty();
+ else return path.equals(target.getAddress());
+ }
+
+ private static boolean matchSource(Source source, String path)
+ {
+ if (source == null) return path.isEmpty();
+ else return path.equals(source.getAddress());
+ }
+
+ private static int defaultPort(String scheme)
+ {
+ if ("amqps".equals(scheme)) return 5671;
+ else return 5672;
+ }
+}
Added:
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java?rev=1420140&view=auto
==============================================================================
---
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java
(added)
+++
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java
Tue Dec 11 13:11:00 2012
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.qpid.proton.messenger.impl;
+
+import org.apache.qpid.proton.messenger.Tracker;
+
+class TrackerImpl implements Tracker
+{
+ private boolean _outgoing;
+ private int _sequence;
+
+ TrackerImpl(boolean outgoing, int sequence)
+ {
+ _outgoing = outgoing;
+ _sequence = sequence;
+ }
+
+ boolean isOutgoing()
+ {
+ return _outgoing;
+ }
+
+ int getSequence()
+ {
+ return _sequence;
+ }
+
+ public String toString()
+ {
+ return (_outgoing ? "O:" : "I:") + Integer.toString(_sequence);
+ }
+}
Added:
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerQueue.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerQueue.java?rev=1420140&view=auto
==============================================================================
---
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerQueue.java
(added)
+++
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerQueue.java
Tue Dec 11 13:11:00 2012
@@ -0,0 +1,222 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.qpid.proton.messenger.impl;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.messenger.Messenger;
+import org.apache.qpid.proton.messenger.Status;
+import org.apache.qpid.proton.messenger.Tracker;
+import org.apache.qpid.proton.type.messaging.Accepted;
+import org.apache.qpid.proton.type.messaging.Rejected;
+import org.apache.qpid.proton.type.transport.DeliveryState;
+
+class TrackerQueue
+{
+ private static final Accepted ACCEPTED = new Accepted();
+ private static final Rejected REJECTED = new Rejected();
+ private int _window = 0;
+ private int _hwm = 0;
+ private int _lwm = 0;
+ private ArrayList<Delivery> _deliveries = new ArrayList<Delivery>();
+
+ void setWindow(int window)
+ {
+ _window = window;
+ }
+
+ int getWindow()
+ {
+ return _window;
+ }
+
+ void accept(Tracker tracker)
+ {
+ accept(tracker, 0);
+ }
+
+ void accept(Tracker tracker, int flags)
+ {
+ apply(tracker, flags, ACCEPT);
+ }
+
+ void reject(Tracker tracker, int flags)
+ {
+ apply(tracker, flags, REJECT);
+ }
+
+ void settle(Tracker tracker, int flags)
+ {
+ apply(tracker, flags, SETTLE);
+ }
+
+ void add(Delivery delivery)
+ {
+ if (delivery == null)
+ {
+ throw new NullPointerException("Cannot add null delivery!");
+ }
+ int sequence = _hwm++;
+ _deliveries.add(delivery);
+ }
+
+ Status getStatus(Tracker tracker)
+ {
+ Delivery delivery = getDelivery(tracker);
+ if (delivery != null)
+ {
+ DeliveryState state = delivery.getRemoteState();
+ if (state != null)
+ {
+ return getStatus(state);
+ }
+ else if (delivery.remotelySettled() || delivery.isSettled())
+ {
+ return getStatus(delivery.getLocalState());
+ }
+ else
+ {
+ return Status.PENDING;
+ }
+ }
+ else
+ {
+ return Status.UNKNOWN;
+ }
+
+
+ }
+
+ private Status getStatus(DeliveryState state)
+ {
+ if (state instanceof Accepted)
+ {
+ return Status.ACCEPTED;
+ }
+ else if (state instanceof Rejected)
+ {
+ return Status.REJECTED;
+ }
+ else if (state == null)
+ {
+ return Status.PENDING;
+ }
+ else
+ {
+ throw new RuntimeException("Unexpected disposition: " + state);
+ }
+ }
+
+ void slide()
+ {
+ if (_window >= 0)
+ {
+ while (_hwm - _lwm > _window)
+ {
+ if (_deliveries.isEmpty())
+ {
+ throw new RuntimeException("Inconsistent state, empty
delivery queue but lwm=" + _lwm + " and hwm=" + _hwm);
+ }
+ Delivery d = _deliveries.get(0);
+ if (d.getLocalState() == null)
+ {
+ return;
+ }
+
+ d.settle();
+ _deliveries.remove(0);
+ _lwm++;
+ }
+ }
+ }
+
+ int getHighWaterMark()
+ {
+ return _hwm;
+ }
+
+ Iterator<Delivery> deliveries()
+ {
+ return _deliveries.iterator();
+ }
+
+ private Delivery getDelivery(Tracker tracker)
+ {
+ int seq = ((TrackerImpl) tracker).getSequence();
+ if (seq < _lwm || seq > _hwm) return null;
+ int index = seq - _lwm;
+ return index < _deliveries.size() ? _deliveries.get(index) : null;
+ }
+
+ static boolean isOutgoing(Tracker tracker)
+ {
+ return ((TrackerImpl) tracker).isOutgoing();
+ }
+
+ private void apply(Tracker tracker, int flags, DeliveryOperation operation)
+ {
+ int seq = ((TrackerImpl) tracker).getSequence();
+ if (seq < _lwm || seq > _hwm) return;
+ int last = seq - _lwm;
+ int start = (flags & Messenger.CUMULATIVE) != 0 ? 0 : last;
+ for (int i = start; i <= last && i < _deliveries.size(); ++i)
+ {
+ Delivery d = _deliveries.get(i);
+ if (d != null && !d.isSettled())
+ {
+ operation.apply(d);
+ }
+ }
+ slide();
+ }
+
+ private static interface DeliveryOperation
+ {
+ void apply(Delivery d);
+ }
+
+ private static final DeliveryOperation ACCEPT = new DeliveryOperation()
+ {
+ public void apply(Delivery d)
+ {
+ if (d.getLocalState() == null) d.disposition(ACCEPTED);
+ }
+ };
+ private static final DeliveryOperation REJECT = new DeliveryOperation()
+ {
+ public void apply(Delivery d)
+ {
+ if (d.getLocalState() == null) d.disposition(REJECTED);
+ }
+ };
+ private static final DeliveryOperation SETTLE = new DeliveryOperation()
+ {
+ public void apply(Delivery d)
+ {
+ if (d.getLocalState() == null)
+ {
+ d.disposition(d.getRemoteState());
+ }
+ d.settle();
+ }
+ };
+}
Modified: qpid/proton/trunk/proton-j/proton/src/main/scripts/proton.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/scripts/proton.py?rev=1420140&r1=1420139&r2=1420140&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/scripts/proton.py (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/scripts/proton.py Tue Dec 11
13:11:00 2012
@@ -22,7 +22,9 @@ from org.apache.qpid.proton.engine.impl
SenderImpl, ReceiverImpl, TransportImpl
from org.apache.qpid.proton.message import Message as MessageImpl, \
MessageFormat
-from org.apache.qpid.proton.type.messaging import Source, Target, Accepted
+from org.apache.qpid.proton.messenger import AcceptMode, MessengerException,
Status
+from org.apache.qpid.proton.messenger.impl import MessengerImpl
+from org.apache.qpid.proton.type.messaging import Source, Target, Accepted,
AmqpValue
from org.apache.qpid.proton.type import UnsignedInteger
from jarray import zeros
from java.util import EnumSet, UUID as JUUID
@@ -32,6 +34,29 @@ class Skipped(Exception):
PN_SESSION_WINDOW = TransportImpl.SESSION_WINDOW
+PENDING = "PENDING"
+ACCEPTED = "ACCEPTED"
+REJECTED = "REJECTED"
+
+STATUSES = {
+ Status.ACCEPTED: ACCEPTED,
+ Status.REJECTED: REJECTED,
+ Status.PENDING: PENDING,
+ Status.UNKNOWN: None
+ }
+
+MANUAL = "MANUAL"
+AUTOMATIC = "AUTOMATIC"
+
+_ACCEPT_MODE2CONST = {
+ AcceptMode.AUTO: AUTOMATIC,
+ AcceptMode.MANUAL: MANUAL
+ }
+_CONST2ACCEPT_MODE = {
+ AUTOMATIC: AcceptMode.AUTO,
+ MANUAL: AcceptMode.MANUAL
+ }
+
class Endpoint(object):
LOCAL_UNINIT = 1
@@ -228,7 +253,7 @@ class Link(Endpoint):
return wrap_session(self.impl.getSession())
def delivery(self, tag):
- return wrap_delivery(self.impl.delivery(tag, 0, len(tag)))
+ return wrap_delivery(self.impl.delivery(tag))
@property
def current(self):
@@ -480,10 +505,95 @@ class Data(object):
def __init__(self, *args, **kwargs):
raise Skipped()
+class Timeout(Exception):
+ pass
+
class Messenger(object):
def __init__(self, *args, **kwargs):
- raise Skipped()
+ #raise Skipped()
+ self.impl = MessengerImpl()
+
+ def start(self):
+ self.impl.start()
+
+ def stop(self):
+ self.impl.stop()
+
+ def subscribe(self, source):
+ self.impl.subscribe(source)
+
+ def put(self, message):
+ self.impl.put(message.impl)
+ return self.impl.outgoingTracker()
+
+ def send(self):
+ self.impl.send()
+
+ def recv(self, n):
+ self.impl.recv(n)
+
+ def get(self, message=None):
+ if message is None:
+ self.impl.get()
+ else:
+ message.impl = self.impl.get()
+ return self.impl.incomingTracker()
+
+ @property
+ def outgoing(self):
+ return self.impl.outgoing()
+
+ @property
+ def incoming(self):
+ return self.impl.incoming()
+
+ def _get_accept_mode(self):
+ return _ACCEPT_MODE2CONST(self.impl.getAcceptMode())
+ def _set_accept_mode(self, mode):
+ mode = _CONST2ACCEPT_MODE[mode]
+ self.impl.setAcceptMode(mode)
+ accept_mode = property(_get_accept_mode, _set_accept_mode)
+
+ def accept(self, tracker=None):
+ if tracker is None:
+ tracker = self.impl.incomingTracker()
+ flags = self.impl.CUMULATIVE
+ else:
+ flags = 0
+ self.impl.accept(tracker, flags)
+
+ def reject(self, tracker=None):
+ if tracker is None:
+ tracker = self.impl.incomingTracker()
+ flags = self.impl.CUMULATIVE
+ else:
+ flags = 0
+ self.impl.reject(tracker, flags)
+
+ def settle(self, tracker=None):
+ if tracker is None:
+ tracker = self.impl.outgoingTracker()
+ flags = self.impl.CUMULATIVE
+ else:
+ flags = 0
+ self.impl.settle(tracker, flags)
+
+ def status(self, tracker):
+ return STATUSES[self.impl.getStatus(tracker)]
+
+ def _get_incoming_window(self):
+ return self.impl.getIncomingWindow()
+ def _set_incoming_window(self, window):
+ self.impl.setIncomingWindow(window)
+ incoming_window = property(_get_incoming_window, _set_incoming_window)
+
+ def _get_outgoing_window(self):
+ return self.impl.getOutgoingWindow()
+ def _set_outgoing_window(self, window):
+ self.impl.setOutgoingWindow(window)
+ outgoing_window = property(_get_outgoing_window, _set_outgoing_window)
+
class Message(object):
@@ -651,6 +761,17 @@ class Message(object):
self.impl.setMessageFormat(format)
format = property(_get_format, _set_format)
+ def _get_body(self):
+ body = self.impl.getBody()
+ if isinstance(body, AmqpValue):
+ return body.getValue()
+ else:
+ return body
+ def _set_body(self, body):
+ self.impl.setBody(AmqpValue(body))
+ body = property(_get_body, _set_body)
+
+
class SASL(object):
OK = Sasl.PN_SASL_OK
@@ -742,4 +863,6 @@ __all__ = ["Messenger", "Message", "Prot
"MessageException", "Timeout", "Condition", "Data", "Endpoint",
"Connection", "Session", "Link", "Terminus", "Sender", "Receiver",
"Delivery", "Transport", "TransportException", "SASL", "SSL",
- "SSLException", "SSLUnavailable", "PN_SESSION_WINDOW", "symbol"]
+ "SSLException", "SSLUnavailable", "PN_SESSION_WINDOW", "symbol",
+ "MANUAL", "PENDING", "ACCEPTED", "REJECTED"]
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]