http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/proton-j/src/main/java/org/apache/qpid/proton/messenger/Tracker.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/Tracker.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/Tracker.java deleted file mode 100644 index 974b1b6..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/Tracker.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.qpid.proton.messenger; - -/** - * @deprecated Messenger will be removed from upcoming proton-j releases. - */ -@Deprecated -public interface Tracker { } -
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java deleted file mode 100644 index 27b0d39..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.proton.messenger.impl; - - -/** - * Address - * - * @deprecated Messenger will be removed from upcoming proton-j releases. - */ -public class Address -{ - - private String _address; - private boolean _passive; - private String _scheme; - private String _user; - private String _pass; - private String _host; - private String _port; - private String _name; - - public void clear() - { - _passive = false; - _scheme = null; - _user = null; - _pass = null; - _host = null; - _port = null; - _name = null; - } - - /** - * @deprecated Messenger will be removed from upcoming proton-j releases. - */ - public Address() - { - clear(); - } - - /** - * @deprecated Messenger will be removed from upcoming proton-j releases. - */ - public Address(String address) - { - clear(); - int start = 0; - int schemeEnd = address.indexOf("://", start); - if (schemeEnd >= 0) { - _scheme = address.substring(start, schemeEnd); - start = schemeEnd + 3; - } - - String uphp; - int slash = address.indexOf("/", start); - if (slash >= 0) { - uphp = address.substring(start, slash); - _name = address.substring(slash + 1); - } else { - uphp = address.substring(start); - } - - String hp; - int at = uphp.indexOf('@'); - if (at >= 0) { - String up = uphp.substring(0, at); - hp = uphp.substring(at + 1); - - int colon = up.indexOf(':'); - if (colon >= 0) { - _user = up.substring(0, colon); - _pass = up.substring(colon + 1); - } else { - _user = up; - } - } else { - hp = uphp; - } - - if (hp.startsWith("[")) { - int close = hp.indexOf(']'); - if (close >= 0) { - _host = hp.substring(1, close); - if (hp.substring(close + 1).startsWith(":")) { - _port = hp.substring(close + 2); - } - } - } - - if (_host == null) { - int colon = hp.indexOf(':'); - if (colon >= 0) { - _host = hp.substring(0, colon); - _port = hp.substring(colon + 1); - } else { - _host = hp; - } - } - - if (_host.startsWith("~")) { - _host = _host.substring(1); - _passive = true; - } - } - - public String toString() - { - String str = new String(); - if (_scheme != null) str += _scheme + "://"; - if (_user != null) str += _user; - if (_pass != null) str += ":" + _pass; - if (_user != null || _pass != null) str += "@"; - if (_host != null) { - if (_host.contains(":")) str += "[" + _host + "]"; - else str += _host; - } - if (_port != null) str += ":" + _port; - if (_name != null) str += "/" + _name; - return str; - } - - public boolean isPassive() - { - return _passive; - } - - public String getScheme() - { - return _scheme; - } - - public String getUser() - { - return _user; - } - - public String getPass() - { - return _pass; - } - - public String getHost() - { - return _host; - } - - public String getPort() - { - return _port; - } - - public String getImpliedPort() - { - if (_port == null) { - return getDefaultPort(); - } else { - return getPort(); - } - } - - public String getDefaultPort() - { - if ("amqps".equals(_scheme)) return "5671"; - else return "5672"; - } - - public String getName() - { - return _name; - } - - public void setScheme(String scheme) - { - _scheme= scheme; - } - - public void setUser(String user) - { - _user= user; - } - - public void setPass(String pass) - { - _pass= pass; - } - - public void setHost(String host) - { - _host= host; - } - - public void setPort(String port) - { - _port= port; - } - - public void setName(String name) - { - _name= name; - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java deleted file mode 100644 index e7c9d9e..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java +++ /dev/null @@ -1,1555 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.qpid.proton.messenger.impl; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.Iterator; -import java.util.List; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.qpid.proton.Proton; -import org.apache.qpid.proton.InterruptException; -import org.apache.qpid.proton.TimeoutException; -import org.apache.qpid.proton.driver.Connector; -import org.apache.qpid.proton.driver.Driver; -import org.apache.qpid.proton.driver.Listener; -import org.apache.qpid.proton.engine.Connection; -import org.apache.qpid.proton.engine.Delivery; -import org.apache.qpid.proton.engine.EndpointState; -import org.apache.qpid.proton.engine.Link; -import org.apache.qpid.proton.engine.Receiver; -import org.apache.qpid.proton.engine.Sasl; -import org.apache.qpid.proton.engine.Sender; -import org.apache.qpid.proton.engine.Session; -import org.apache.qpid.proton.engine.SslDomain; -import org.apache.qpid.proton.engine.Ssl; -import org.apache.qpid.proton.engine.Transport; -import org.apache.qpid.proton.message.Message; -import org.apache.qpid.proton.messenger.Messenger; -import org.apache.qpid.proton.messenger.MessengerException; -import org.apache.qpid.proton.messenger.Status; -import org.apache.qpid.proton.messenger.Tracker; -import org.apache.qpid.proton.amqp.messaging.Source; -import org.apache.qpid.proton.amqp.messaging.Target; -import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; -import org.apache.qpid.proton.amqp.transport.SenderSettleMode; -import org.apache.qpid.proton.amqp.Binary; - -/** - * @deprecated Messenger will be removed from upcoming proton-j releases. - */ -@Deprecated -public class MessengerImpl implements Messenger -{ - private enum LinkCreditMode - { - // method for replenishing credit - LINK_CREDIT_EXPLICIT, // recv(N) - LINK_CREDIT_AUTO; // recv() - } - - private static final EnumSet<EndpointState> UNINIT = EnumSet.of(EndpointState.UNINITIALIZED); - private static final EnumSet<EndpointState> ACTIVE = EnumSet.of(EndpointState.ACTIVE); - private static final EnumSet<EndpointState> CLOSED = EnumSet.of(EndpointState.CLOSED); - private static final EnumSet<EndpointState> ANY = EnumSet.allOf(EndpointState.class); - - private final Logger _logger = Logger.getLogger("proton.messenger"); - private final String _name; - private long _timeout = -1; - private boolean _blocking = true; - private long _nextTag = 1; - private Driver _driver; - private LinkCreditMode _credit_mode = LinkCreditMode.LINK_CREDIT_EXPLICIT; - private final int _credit_batch = 1024; // credit_mode == LINK_CREDIT_AUTO - private int _credit; // available - private int _distributed; // outstanding credit - private int _receivers; // total # receiver Links - private int _draining; // # Links in drain state - private List<Receiver> _credited = new ArrayList<Receiver>(); - private List<Receiver> _blocked = new ArrayList<Receiver>(); - private long _next_drain; - private TrackerImpl _incomingTracker; - private TrackerImpl _outgoingTracker; - private Store _incomingStore = new Store(); - private Store _outgoingStore = new Store(); - private List<Connector> _awaitingDestruction = new ArrayList<Connector>(); - private int _sendThreshold; - - private Transform _routes = new Transform(); - private Transform _rewrites = new Transform(); - - private String _certificate; - private String _privateKey; - private String _password; - private String _trustedDb; - - - /** - * @deprecated Messenger will be removed from upcoming proton-j releases. - */ - @Deprecated public MessengerImpl() - { - this(java.util.UUID.randomUUID().toString()); - } - - /** - * @deprecated Messenger will be removed from upcoming proton-j releases. - */ - @Deprecated public MessengerImpl(String name) - { - _name = name; - } - - public void setTimeout(long timeInMillis) - { - _timeout = timeInMillis; - } - - public long getTimeout() - { - return _timeout; - } - - public boolean isBlocking() - { - return _blocking; - } - - public void setBlocking(boolean b) - { - _blocking = b; - } - - public void setCertificate(String certificate) - { - _certificate = certificate; - } - - public String getCertificate() - { - return _certificate; - } - - public void setPrivateKey(String privateKey) - { - _privateKey = privateKey; - } - - public String getPrivateKey() - { - return _privateKey; - } - - public void setPassword(String password) - { - _password = password; - } - - public String getPassword() - { - return _password; - } - - public void setTrustedCertificates(String trusted) - { - _trustedDb = trusted; - } - - public String getTrustedCertificates() - { - return _trustedDb; - } - - public void start() throws IOException - { - _driver = Proton.driver(); - } - - public void stop() - { - if (_driver != null) { - if(_logger.isLoggable(Level.FINE)) - { - _logger.fine(this + " about to stop"); - } - //close all connections - for (Connector<?> c : _driver.connectors()) - { - Connection connection = c.getConnection(); - connection.close(); - } - //stop listeners - for (Listener<?> l : _driver.listeners()) - { - try - { - l.close(); - } - catch (IOException e) - { - _logger.log(Level.WARNING, "Error while closing listener", e); - } - } - waitUntil(_allClosed); - } - } - - public boolean stopped() - { - return _allClosed.test(); - } - - public boolean work(long timeout) throws TimeoutException - { - if (_driver == null) { return false; } - _worked = false; - return waitUntil(_workPred, timeout); - } - - public void interrupt() - { - if (_driver != null) { - _driver.wakeup(); - } - } - - private String defaultRewrite(String address) { - if (address != null && address.contains("@")) { - Address addr = new Address(address); - String scheme = addr.getScheme(); - String host = addr.getHost(); - String port = addr.getPort(); - String name = addr.getName(); - - StringBuilder sb = new StringBuilder(); - if (scheme != null) { - sb.append(scheme).append("://"); - } - if (host != null) { - sb.append(host); - } - if (port != null) { - sb.append(":").append(port); - } - if (name != null) { - sb.append("/").append(name); - } - return sb.toString(); - } else { - return address; - } - } - - - private String _original; - - private void rewriteMessage(Message m) - { - _original = m.getAddress(); - if (_rewrites.apply(_original)) { - m.setAddress(_rewrites.result()); - } else { - m.setAddress(defaultRewrite(_original)); - } - } - - private void restoreMessage(Message m) - { - m.setAddress(_original); - } - - private String routeAddress(String addr) - { - if (_routes.apply(addr)) { - return _routes.result(); - } else { - return addr; - } - } - - public void put(Message m) throws MessengerException - { - if (_driver == null) { - throw new IllegalStateException("cannot put while messenger is stopped"); - } - - if(_logger.isLoggable(Level.FINE)) - { - _logger.fine(this + " about to put message: " + m); - } - - StoreEntry entry = _outgoingStore.put( m.getAddress() ); - _outgoingTracker = new TrackerImpl(TrackerImpl.Type.OUTGOING, - _outgoingStore.trackEntry(entry)); - - String routedAddress = routeAddress(m.getAddress()); - Address address = new Address(routedAddress); - if (address.getHost() == null) - { - throw new MessengerException("unable to send to address: " + routedAddress); - } - - rewriteMessage(m); - - try { - adjustReplyTo(m); - - int encoded; - byte[] buffer = new byte[5*1024]; - while (true) - { - try - { - encoded = m.encode(buffer, 0, buffer.length); - break; - } catch (java.nio.BufferOverflowException e) { - buffer = new byte[buffer.length*2]; - } - } - entry.setEncodedMsg( buffer, encoded ); - } - finally - { - restoreMessage(m); - } - - Sender sender = getLink(address, new SenderFinder(address.getName())); - pumpOut(m.getAddress(), sender); - } - - private void reclaimLink(Link link) - { - if (link instanceof Receiver) - { - int credit = link.getCredit(); - if (credit > 0) - { - _credit += credit; - _distributed -= credit; - } - } - - Delivery delivery = link.head(); - while (delivery != null) - { - StoreEntry entry = (StoreEntry) delivery.getContext(); - if (entry != null) - { - entry.setDelivery(null); - if (delivery.isBuffered()) { - entry.setStatus(Status.ABORTED); - } - } - delivery = delivery.next(); - } - linkRemoved(link); - } - - private int pumpOut( String address, Sender sender ) - { - StoreEntry entry = _outgoingStore.get( address ); - if (entry == null) { - sender.drained(); - return 0; - } - - byte[] tag = String.valueOf(_nextTag++).getBytes(); - Delivery delivery = sender.delivery(tag); - entry.setDelivery( delivery ); - _logger.log(Level.FINE, "Sending on delivery: " + delivery); - int n = sender.send( entry.getEncodedMsg(), 0, entry.getEncodedLength()); - if (n < 0) { - _outgoingStore.freeEntry( entry ); - _logger.log(Level.WARNING, "Send error: " + n); - return n; - } else { - sender.advance(); - _outgoingStore.freeEntry( entry ); - return 0; - } - } - - public void send() throws TimeoutException - { - send(-1); - } - - public void send(int n) throws TimeoutException - { - if (_driver == null) { - throw new IllegalStateException("cannot send while messenger is stopped"); - } - - if(_logger.isLoggable(Level.FINE)) - { - _logger.fine(this + " about to send"); - } - - if (n == -1) - _sendThreshold = 0; - else - { - _sendThreshold = outgoing() - n; - if (_sendThreshold < 0) - _sendThreshold = 0; - } - - waitUntil(_sentSettled); - } - - public void recv(int n) throws TimeoutException - { - if (_driver == null) { - throw new IllegalStateException("cannot recv while messenger is stopped"); - } - - if (_logger.isLoggable(Level.FINE) && n != -1) - { - _logger.fine(this + " about to wait for up to " + n + " messages to be received"); - } - - if (n == -1) - { - _credit_mode = LinkCreditMode.LINK_CREDIT_AUTO; - } - else - { - _credit_mode = LinkCreditMode.LINK_CREDIT_EXPLICIT; - if (n > _distributed) - _credit = n - _distributed; - else // cancel unallocated - _credit = 0; - } - - distributeCredit(); - - waitUntil(_messageAvailable); - } - - public void recv() throws TimeoutException - { - recv(-1); - } - - public int receiving() - { - return _credit + _distributed; - } - - public Message get() - { - StoreEntry entry = _incomingStore.get( null ); - if (entry != null) - { - Message message = Proton.message(); - message.decode( entry.getEncodedMsg(), 0, entry.getEncodedLength() ); - - _incomingTracker = new TrackerImpl(TrackerImpl.Type.INCOMING, - _incomingStore.trackEntry(entry)); - - _incomingStore.freeEntry( entry ); - return message; - } - return null; - } - - private int pumpIn(String address, Receiver receiver) - { - Delivery delivery = receiver.current(); - if (delivery.isReadable() && !delivery.isPartial()) - { - StoreEntry entry = _incomingStore.put( address ); - entry.setDelivery( delivery ); - - _logger.log(Level.FINE, "Readable delivery found: " + delivery); - - int size = delivery.pending(); - byte[] buffer = new byte[size]; - int read = receiver.recv( buffer, 0, buffer.length ); - if (read != size) { - throw new IllegalStateException(); - } - entry.setEncodedMsg( buffer, size ); - receiver.advance(); - - // account for the used credit, replenish if - // low (< 20% maximum per-link batch) and - // extra credit available - assert(_distributed > 0); - _distributed--; - if (!receiver.getDrain() && _blocked.isEmpty() && _credit > 0) - { - final int max = perLinkCredit(); - final int lo_thresh = (int)(max * 0.2 + 0.5); - if (receiver.getRemoteCredit() < lo_thresh) - { - final int more = Math.min(_credit, max - receiver.getRemoteCredit()); - _credit -= more; - _distributed += more; - receiver.flow(more); - } - } - // check if blocked - if (receiver.getRemoteCredit() == 0 && _credited.contains(receiver)) - { - _credited.remove(receiver); - if (receiver.getDrain()) - { - receiver.setDrain(false); - assert( _draining > 0 ); - _draining--; - } - _blocked.add(receiver); - } - } - return 0; - } - - public void subscribe(String source) throws MessengerException - { - if (_driver == null) { - throw new IllegalStateException("messenger is stopped"); - } - - String routed = routeAddress(source); - Address address = new Address(routed); - - String hostName = address.getHost(); - if (hostName == null) throw new MessengerException("Invalid address (hostname cannot be null): " + routed); - int port = Integer.valueOf(address.getImpliedPort()); - if (address.isPassive()) - { - if(_logger.isLoggable(Level.FINE)) - { - _logger.fine(this + " about to subscribe to source " + source + " using address " + hostName + ":" + port); - } - ListenerContext ctx = new ListenerContext(address); - _driver.createListener(hostName, port, ctx); - } - else - { - if(_logger.isLoggable(Level.FINE)) - { - _logger.fine(this + " about to subscribe to source " + source); - } - getLink(address, new ReceiverFinder(address.getName())); - } - } - - public int outgoing() - { - return _outgoingStore.size() + queued(true); - } - - public int incoming() - { - return _incomingStore.size() + queued(false); - } - - public int getIncomingWindow() - { - return _incomingStore.getWindow(); - } - - public void setIncomingWindow(int window) - { - _incomingStore.setWindow(window); - } - - public int getOutgoingWindow() - { - return _outgoingStore.getWindow(); - } - - public void setOutgoingWindow(int window) - { - _outgoingStore.setWindow(window); - } - - public Tracker incomingTracker() - { - return _incomingTracker; - } - public Tracker outgoingTracker() - { - return _outgoingTracker; - } - - private Store getTrackerStore(Tracker tracker) - { - return ((TrackerImpl)tracker).isOutgoing() ? _outgoingStore : _incomingStore; - } - - @Override - public void reject(Tracker tracker, int flags) - { - int id = ((TrackerImpl)tracker).getSequence(); - getTrackerStore(tracker).update(id, Status.REJECTED, flags, false, false); - } - - @Override - public void accept(Tracker tracker, int flags) - { - int id = ((TrackerImpl)tracker).getSequence(); - getTrackerStore(tracker).update(id, Status.ACCEPTED, flags, false, false); - } - - @Override - public void settle(Tracker tracker, int flags) - { - int id = ((TrackerImpl)tracker).getSequence(); - getTrackerStore(tracker).update(id, Status.UNKNOWN, flags, true, true); - } - - public Status getStatus(Tracker tracker) - { - int id = ((TrackerImpl)tracker).getSequence(); - StoreEntry e = getTrackerStore(tracker).getEntry(id); - if (e != null) - { - return e.getStatus(); - } - return Status.UNKNOWN; - } - - @Override - public void route(String pattern, String address) - { - _routes.rule(pattern, address); - } - - @Override - public void rewrite(String pattern, String address) - { - _rewrites.rule(pattern, address); - } - - private int queued(boolean outgoing) - { - int count = 0; - if (_driver != null) { - for (Connector<?> c : _driver.connectors()) - { - Connection connection = c.getConnection(); - for (Link link : new Links(connection, ACTIVE, ANY)) - { - if (outgoing) - { - if (link instanceof Sender) count += link.getQueued(); - } - else - { - if (link instanceof Receiver) count += link.getQueued(); - } - } - } - } - return count; - } - - private void bringDestruction() - { - for (Connector<?> c : _awaitingDestruction) - { - c.destroy(); - } - _awaitingDestruction.clear(); - } - - private void processAllConnectors() - { - distributeCredit(); - for (Connector<?> c : _driver.connectors()) - { - processEndpoints(c); - try - { - if (c.process()) { - _worked = true; - } - } - catch (IOException e) - { - _logger.log(Level.SEVERE, "Error processing connection", e); - } - } - bringDestruction(); - distributeCredit(); - } - - private void processActive() - { - //process active listeners - for (Listener<?> l = _driver.listener(); l != null; l = _driver.listener()) - { - _worked = true; - Connector<?> c = l.accept(); - Connection connection = Proton.connection(); - connection.setContainer(_name); - ListenerContext ctx = (ListenerContext) l.getContext(); - connection.setContext(new ConnectionContext(ctx.getAddress(), c)); - c.setConnection(connection); - Transport transport = c.getTransport(); - //TODO: full SASL - Sasl sasl = c.sasl(); - if (sasl != null) - { - sasl.server(); - sasl.setMechanisms(new String[]{"ANONYMOUS"}); - sasl.done(Sasl.SaslOutcome.PN_SASL_OK); - } - transport.ssl(ctx.getDomain()); - connection.open(); - } - // process connectors, reclaiming credit on closed connectors - for (Connector<?> c = _driver.connector(); c != null; c = _driver.connector()) - { - _worked = true; - if (c.isClosed()) - { - _awaitingDestruction.add(c); - reclaimCredit(c.getConnection()); - } - else - { - _logger.log(Level.FINE, "Processing active connector " + c); - try - { - c.process(); - processEndpoints(c); - c.process(); - } - catch (IOException e) - { - _logger.log(Level.SEVERE, "Error processing connection", e); - } - } - } - bringDestruction(); - distributeCredit(); - } - - private void processEndpoints(Connector c) - { - Connection connection = c.getConnection(); - - if (connection.getLocalState() == EndpointState.UNINITIALIZED) - { - connection.open(); - } - - Delivery delivery = connection.getWorkHead(); - while (delivery != null) - { - Link link = delivery.getLink(); - if (delivery.isUpdated()) - { - if (link instanceof Sender) - { - delivery.disposition(delivery.getRemoteState()); - } - StoreEntry e = (StoreEntry) delivery.getContext(); - if (e != null) e.updated(); - } - - if (delivery.isReadable()) - { - pumpIn( link.getSource().getAddress(), (Receiver)link ); - } - - Delivery next = delivery.getWorkNext(); - delivery.clear(); - delivery = next; - } - - for (Session session : new Sessions(connection, UNINIT, ANY)) - { - session.open(); - _logger.log(Level.FINE, "Opened session " + session); - } - for (Link link : new Links(connection, UNINIT, ANY)) - { - //TODO: the following is not correct; should only copy those properties that we understand - //TODO: is this any better: - if (link.getRemoteSource() != null) { - link.setSource(link.getRemoteSource().copy()); - } - if (link.getRemoteTarget() != null) { - link.setTarget(link.getRemoteTarget().copy()); - } - linkAdded(link); - link.open(); - _logger.log(Level.FINE, "Opened link " + link); - } - - distributeCredit(); - - for (Link link : new Links(connection, ACTIVE, ACTIVE)) - { - if (link instanceof Sender) - { - pumpOut(link.getTarget().getAddress(), (Sender)link); - } - } - - for (Session session : new Sessions(connection, ACTIVE, CLOSED)) - { - session.close(); - } - - for (Link link : new Links(connection, ANY, CLOSED)) - { - if (link.getLocalState() == EndpointState.ACTIVE) - { - link.close(); - } - else - { - reclaimLink(link); - } - } - - if (connection.getRemoteState() == EndpointState.CLOSED) - { - if (connection.getLocalState() == EndpointState.ACTIVE) - { - connection.close(); - } - } - } - - private boolean waitUntil(Predicate condition) throws TimeoutException - { - if (_blocking) { - boolean done = waitUntil(condition, _timeout); - if (!done) { - _logger.log(Level.SEVERE, String.format - ("Timeout when waiting for condition %s after %s ms", - condition, _timeout)); - throw new TimeoutException(); - } - return done; - } else { - return waitUntil(condition, 0); - } - } - - private boolean waitUntil(Predicate condition, long timeout) - { - if (_driver == null) { - throw new IllegalStateException("cannot wait while messenger is stopped"); - } - - processAllConnectors(); - - // wait until timeout expires or until test is true - long now = System.currentTimeMillis(); - final long deadline = timeout < 0 ? Long.MAX_VALUE : now + timeout; - boolean done = false; - - while (true) - { - done = condition.test(); - if (done) break; - - long remaining; - if (timeout < 0) - remaining = -1; - else { - remaining = deadline - now; - if (remaining < 0) break; - } - - // Update the credit scheduler. If the scheduler detects - // credit imbalance on the links, wake up in time to - // service credit drain - distributeCredit(); - if (_next_drain != 0) - { - long wakeup = (_next_drain > now) ? _next_drain - now : 0; - remaining = (remaining == -1) ? wakeup : Math.min(remaining, wakeup); - } - - boolean woken; - woken = _driver.doWait(remaining); - processActive(); - if (woken) { - throw new InterruptException(); - } - now = System.currentTimeMillis(); - } - - return done; - } - - private Connection lookup(Address address) - { - for (Connector<?> c : _driver.connectors()) - { - Connection connection = c.getConnection(); - ConnectionContext ctx = (ConnectionContext) connection.getContext(); - if (ctx.matches(address)) - { - return connection; - } - } - return null; - } - - private void reclaimCredit(Connection connection) - { - for (Link link : new Links(connection, ANY, ANY)) - { - reclaimLink(link); - } - } - - private void distributeCredit() - { - if (_receivers == 0) return; - - if (_credit_mode == LinkCreditMode.LINK_CREDIT_AUTO) - { - // replenish, but limit the max total messages buffered - final int max = _receivers * _credit_batch; - final int used = _distributed + incoming(); - if (max > used) - _credit = max - used; - } - - // reclaim any credit left over after draining links has completed - if (_draining > 0) - { - Iterator<Receiver> itr = _credited.iterator(); - while (itr.hasNext()) - { - Receiver link = (Receiver) itr.next(); - if (link.getDrain()) - { - if (!link.draining()) - { - // drain completed for this link - int drained = link.drained(); - assert(_distributed >= drained); - _distributed -= drained; - _credit += drained; - link.setDrain(false); - _draining--; - itr.remove(); - _blocked.add(link); - } - } - } - } - - // distribute available credit to blocked links - final int batch = perLinkCredit(); - while (_credit > 0 && !_blocked.isEmpty()) - { - Receiver link = _blocked.get(0); - _blocked.remove(0); - - final int more = Math.min(_credit, batch); - _distributed += more; - _credit -= more; - - link.flow(more); - _credited.add(link); - - // flow changed, must process it - ConnectionContext ctx = (ConnectionContext) link.getSession().getConnection().getContext(); - try - { - ctx.getConnector().process(); - } catch (IOException e) { - _logger.log(Level.SEVERE, "Error processing connection", e); - } - } - - if (_blocked.isEmpty()) - { - _next_drain = 0; - } - else - { - // not enough credit for all links - start draining granted credit - if (_draining == 0) - { - // don't do it too often - pace ourselves (it's expensive) - if (_next_drain == 0) - { - _next_drain = System.currentTimeMillis() + 250; - } - else if (_next_drain <= System.currentTimeMillis()) - { - // initiate drain, free up at most enough to satisfy blocked - _next_drain = 0; - int needed = _blocked.size() * batch; - - for (Receiver link : _credited) - { - if (!link.getDrain()) { - link.setDrain(true); - needed -= link.getRemoteCredit(); - _draining++; - // drain requested on link, must process it - ConnectionContext ctx = (ConnectionContext) link.getSession().getConnection().getContext(); - try - { - ctx.getConnector().process(); - } catch (IOException e) { - _logger.log(Level.SEVERE, "Error processing connection", e); - } - if (needed <= 0) break; - } - } - } - } - } - } - - private interface Predicate - { - boolean test(); - } - - private class SentSettled implements Predicate - { - public boolean test() - { - //are all sent messages settled? - int total = _outgoingStore.size(); - - for (Connector<?> c : _driver.connectors()) - { - // TBD - // check if transport is done generating output - // pn_transport_t *transport = pn_connector_transport(ctor); - // if (transport) { - // if (!pn_transport_quiesced(transport)) { - // pn_connector_process(ctor); - // return false; - // } - // } - - Connection connection = c.getConnection(); - for (Link link : new Links(connection, ACTIVE, ANY)) - { - if (link instanceof Sender) - { - total += link.getQueued(); - } - } - - // TBD: there is no per-link unsettled - // deliveries iterator, so for now get the - // deliveries by walking the outgoing trackers - Iterator<StoreEntry> entries = _outgoingStore.trackedEntries(); - while (entries.hasNext() && total <= _sendThreshold) - { - StoreEntry e = (StoreEntry) entries.next(); - if (e != null ) - { - Delivery d = e.getDelivery(); - if (d != null) - { - if (d.getRemoteState() == null && !d.remotelySettled()) - { - total++; - } - } - } - } - } - return total <= _sendThreshold; - } - } - - private class MessageAvailable implements Predicate - { - public boolean test() - { - //do we have at least one pending message? - if (_incomingStore.size() > 0) return true; - for (Connector<?> c : _driver.connectors()) - { - Connection connection = c.getConnection(); - Delivery delivery = connection.getWorkHead(); - while (delivery != null) - { - if (delivery.isReadable() && !delivery.isPartial()) - { - return true; - } - else - { - delivery = delivery.getWorkNext(); - } - } - } - // if no connections, or not listening, exit as there won't ever be a message - if (!_driver.listeners().iterator().hasNext() && !_driver.connectors().iterator().hasNext()) - return true; - - return false; - } - } - - private class AllClosed implements Predicate - { - public boolean test() - { - if (_driver == null) { - return true; - } - - for (Connector<?> c : _driver.connectors()) { - if (!c.isClosed()) { - return false; - } - } - - _driver.destroy(); - _driver = null; - - return true; - } - } - - private boolean _worked = false; - - private class WorkPred implements Predicate - { - public boolean test() - { - return _worked; - } - } - - private final SentSettled _sentSettled = new SentSettled(); - private final MessageAvailable _messageAvailable = new MessageAvailable(); - private final AllClosed _allClosed = new AllClosed(); - private final WorkPred _workPred = new WorkPred(); - - private interface LinkFinder<C extends Link> - { - C test(Link link); - C create(Session session); - } - - private class SenderFinder implements LinkFinder<Sender> - { - private final String _path; - - SenderFinder(String path) - { - _path = path == null ? "" : path; - } - - public Sender test(Link link) - { - if (link instanceof Sender && matchTarget((Target) link.getTarget(), _path)) - { - return (Sender) link; - } - else - { - return null; - } - } - - public Sender create(Session session) - { - Sender sender = session.sender(_path); - Target target = new Target(); - target.setAddress(_path); - sender.setTarget(target); - // the C implemenation does this: - Source source = new Source(); - source.setAddress(_path); - sender.setSource(source); - if (getOutgoingWindow() > 0) - { - // use explicit settlement via dispositions (not pre-settled) - sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); - sender.setReceiverSettleMode(ReceiverSettleMode.SECOND); // desired - } - return sender; - } - } - - private class ReceiverFinder implements LinkFinder<Receiver> - { - private final String _path; - - ReceiverFinder(String path) - { - _path = path == null ? "" : path; - } - - public Receiver test(Link link) - { - if (link instanceof Receiver && matchSource((Source) link.getSource(), _path)) - { - return (Receiver) link; - } - else - { - return null; - } - } - - public Receiver create(Session session) - { - Receiver receiver = session.receiver(_path); - Source source = new Source(); - source.setAddress(_path); - receiver.setSource(source); - // the C implemenation does this: - Target target = new Target(); - target.setAddress(_path); - receiver.setTarget(target); - if (getIncomingWindow() > 0) - { - // use explicit settlement via dispositions (not pre-settled) - receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); // desired - receiver.setReceiverSettleMode(ReceiverSettleMode.SECOND); - } - return receiver; - } - } - - private <C extends Link> C getLink(Address address, LinkFinder<C> finder) - { - Connection connection = lookup(address); - if (connection == null) - { - String host = address.getHost(); - int port = Integer.valueOf(address.getImpliedPort()); - Connector<?> connector = _driver.createConnector(host, port, null); - _logger.log(Level.FINE, "Connecting to " + host + ":" + port); - connection = Proton.connection(); - connection.setContainer(_name); - connection.setHostname(host); - connection.setContext(new ConnectionContext(address, connector)); - connector.setConnection(connection); - Sasl sasl = connector.sasl(); - if (sasl != null) - { - sasl.client(); - sasl.setMechanisms(new String[]{"ANONYMOUS"}); - } - if ("amqps".equalsIgnoreCase(address.getScheme())) { - Transport transport = connector.getTransport(); - SslDomain domain = makeDomain(address, SslDomain.Mode.CLIENT); - if (_trustedDb != null) { - domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER); - //domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER_NAME); - } else { - domain.setPeerAuthentication(SslDomain.VerifyMode.ANONYMOUS_PEER); - } - Ssl ssl = transport.ssl(domain); - //ssl.setPeerHostname(host); - } - connection.open(); - } - - for (Link link : new Links(connection, ACTIVE, ANY)) - { - C result = finder.test(link); - if (result != null) return result; - } - Session session = connection.session(); - session.open(); - C link = finder.create(session); - linkAdded(link); - link.open(); - return link; - } - - private static class Links implements Iterable<Link> - { - private final Connection _connection; - private final EnumSet<EndpointState> _local; - private final EnumSet<EndpointState> _remote; - - Links(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote) - { - _connection = connection; - _local = local; - _remote = remote; - } - - public java.util.Iterator<Link> iterator() - { - return new LinkIterator(_connection, _local, _remote); - } - } - - private static class LinkIterator implements java.util.Iterator<Link> - { - private final EnumSet<EndpointState> _local; - private final EnumSet<EndpointState> _remote; - private Link _next; - - LinkIterator(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote) - { - _local = local; - _remote = remote; - _next = connection.linkHead(_local, _remote); - } - - public boolean hasNext() - { - return _next != null; - } - - public Link next() - { - try - { - return _next; - } - finally - { - _next = _next.next(_local, _remote); - } - } - - public void remove() - { - throw new UnsupportedOperationException(); - } - } - - private static class Sessions implements Iterable<Session> - { - private final Connection _connection; - private final EnumSet<EndpointState> _local; - private final EnumSet<EndpointState> _remote; - - Sessions(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote) - { - _connection = connection; - _local = local; - _remote = remote; - } - - public java.util.Iterator<Session> iterator() - { - return new SessionIterator(_connection, _local, _remote); - } - } - - private static class SessionIterator implements java.util.Iterator<Session> - { - private final EnumSet<EndpointState> _local; - private final EnumSet<EndpointState> _remote; - private Session _next; - - SessionIterator(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote) - { - _local = local; - _remote = remote; - _next = connection.sessionHead(_local, _remote); - } - - public boolean hasNext() - { - return _next != null; - } - - public Session next() - { - try - { - return _next; - } - finally - { - _next = _next.next(_local, _remote); - } - } - - public void remove() - { - throw new UnsupportedOperationException(); - } - } - - private void adjustReplyTo(Message m) - { - String original = m.getReplyTo(); - if (original != null) { - if (original.startsWith("~/")) - { - m.setReplyTo("amqp://" + _name + "/" + original.substring(2)); - } - else if (original.equals("~")) - { - m.setReplyTo("amqp://" + _name); - } - } - } - - private static boolean matchTarget(Target target, String path) - { - if (target == null) return path.isEmpty(); - else return path.equals(target.getAddress()); - } - - private static boolean matchSource(Source source, String path) - { - if (source == null) return path.isEmpty(); - else return path.equals(source.getAddress()); - } - - @Override - public String toString() - { - StringBuilder builder = new StringBuilder(); - builder.append("MessengerImpl [_name=").append(_name).append("]"); - return builder.toString(); - } - - // compute the maximum amount of credit each receiving link is - // entitled to. The actual credit given to the link depends on - // what amount of credit is actually available. - private int perLinkCredit() - { - if (_receivers == 0) return 0; - int total = _credit + _distributed; - return Math.max(total/_receivers, 1); - } - - // a new link has been created, account for it. - private void linkAdded(Link link) - { - if (link instanceof Receiver) - { - _receivers++; - _blocked.add((Receiver)link); - link.setContext(Boolean.TRUE); - } - } - - // a link is being removed, account for it. - private void linkRemoved(Link _link) - { - if (_link instanceof Receiver && (Boolean) _link.getContext()) - { - _link.setContext(Boolean.FALSE); - Receiver link = (Receiver)_link; - assert _receivers > 0; - _receivers--; - if (link.getDrain()) - { - link.setDrain(false); - assert _draining > 0; - _draining--; - } - if (_blocked.contains(link)) - _blocked.remove(link); - else if (_credited.contains(link)) - _credited.remove(link); - else - assert(false); - } - } - - private static class ConnectionContext - { - private Address _address; - private Connector _connector; - - public ConnectionContext(Address address, Connector connector) - { - _address = address; - _connector = connector; - } - - public Address getAddress() - { - return _address; - } - - public boolean matches(Address address) - { - String host = address.getHost(); - String port = address.getImpliedPort(); - Connection conn = _connector.getConnection(); - return host.equals(conn.getRemoteContainer()) || - (_address.getHost().equals(host) && _address.getImpliedPort().equals(port)); - } - - public Connector getConnector() - { - return _connector; - } - } - - private SslDomain makeDomain(Address address, SslDomain.Mode mode) - { - SslDomain domain = Proton.sslDomain(); - domain.init(mode); - if (_certificate != null) { - domain.setCredentials(_certificate, _privateKey, _password); - } - if (_trustedDb != null) { - domain.setTrustedCaDb(_trustedDb); - } - - if ("amqps".equalsIgnoreCase(address.getScheme())) { - domain.allowUnsecuredClient(false); - } else { - domain.allowUnsecuredClient(true); - } - - return domain; - } - - - private class ListenerContext - { - private Address _address; - private SslDomain _domain; - - public ListenerContext(Address address) - { - _address = address; - _domain = makeDomain(address, SslDomain.Mode.SERVER); - } - - public SslDomain getDomain() - { - return _domain; - } - - public Address getAddress() - { - return _address; - } - - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Store.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Store.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Store.java deleted file mode 100644 index b60e8ed..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Store.java +++ /dev/null @@ -1,213 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.qpid.proton.messenger.impl; - -import java.util.List; -import java.util.LinkedList; -import java.util.Map; -import java.util.HashMap; -import java.util.Collection; -import java.util.Iterator; - -import org.apache.qpid.proton.messenger.Status; -import org.apache.qpid.proton.messenger.Messenger; -import org.apache.qpid.proton.engine.Delivery; -import org.apache.qpid.proton.amqp.messaging.Accepted; -import org.apache.qpid.proton.amqp.messaging.Rejected; - -/** - * @deprecated Messenger will be removed from upcoming proton-j releases. - */ -class Store -{ - private static final Accepted ACCEPTED = Accepted.getInstance(); - private static final Rejected REJECTED = new Rejected(); - - private LinkedList<StoreEntry> _store = new LinkedList<StoreEntry>(); - private HashMap<String, LinkedList<StoreEntry>> _stream = new HashMap<String, LinkedList<StoreEntry>>(); - - // for incoming/outgoing window tracking - int _window; - int _lwm; - int _hwm; - private HashMap<Integer, StoreEntry> _tracked = new HashMap<Integer, StoreEntry>(); - - Store() - { - } - - private boolean isTracking( Integer id ) - { - return id != null && (id.intValue() - _lwm >= 0) && (_hwm - id.intValue() > 0); - } - - int size() - { - return _store.size(); - } - - int getWindow() - { - return _window; - } - - void setWindow(int window) - { - _window = window; - } - - StoreEntry put(String address) - { - if (address == null) address = ""; - StoreEntry entry = new StoreEntry(this, address); - _store.add( entry ); - LinkedList<StoreEntry> list = _stream.get( address ); - if (list != null) { - list.add( entry ); - } else { - list = new LinkedList<StoreEntry>(); - list.add( entry ); - _stream.put( address, list ); - } - entry.stored(); - return entry; - } - - StoreEntry get(String address) - { - if (address != null) { - LinkedList<StoreEntry> list = _stream.get( address ); - if (list != null) return list.peekFirst(); - } else { - return _store.peekFirst(); - } - return null; - } - - StoreEntry getEntry(int id) - { - return _tracked.get(id); - } - - Iterator<StoreEntry> trackedEntries() - { - return _tracked.values().iterator(); - } - - void freeEntry(StoreEntry entry) - { - if (entry.isStored()) { - _store.remove( entry ); - LinkedList<StoreEntry> list = _stream.get( entry.getAddress() ); - if (list != null) list.remove( entry ); - entry.notStored(); - } - // note well: may still be in _tracked map if still in window! - } - - public int trackEntry(StoreEntry entry) - { - assert( entry.getStore() == this ); - entry.setId(_hwm++); - _tracked.put(entry.getId(), entry); - slideWindow(); - return entry.getId(); - } - - private void slideWindow() - { - if (_window >= 0) - { - while (_hwm - _lwm > _window) - { - StoreEntry old = getEntry(_lwm); - if (old != null) - { - _tracked.remove( old.getId() ); - Delivery d = old.getDelivery(); - if (d != null) { - if (d.getLocalState() == null) - d.disposition(ACCEPTED); - d.settle(); - } - } - _lwm++; - } - } - } - - int update(int id, Status status, int flags, boolean settle, boolean match ) - { - if (!isTracking(id)) return 0; - - int start = (Messenger.CUMULATIVE & flags) != 0 ? _lwm : id; - for (int i = start; (id - i) >= 0; i++) - { - StoreEntry e = getEntry(i); - if (e != null) - { - Delivery d = e.getDelivery(); - if (d != null) - { - if (d.getLocalState() == null) - { - if (match) - { - d.disposition(d.getRemoteState()); - } - else - { - switch (status) - { - case ACCEPTED: - d.disposition(ACCEPTED); - break; - case REJECTED: - d.disposition(REJECTED); - break; - default: - break; - } - } - e.updated(); - } - } - if (settle) - { - if (d != null) - { - d.settle(); - } - _tracked.remove(e.getId()); - } - } - } - - while (_hwm - _lwm > 0 && !_tracked.containsKey(_lwm)) - { - _lwm++; - } - - return 0; - } -} - - http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java deleted file mode 100644 index 1687b94..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.qpid.proton.messenger.impl; - -import org.apache.qpid.proton.messenger.Tracker; -import org.apache.qpid.proton.messenger.Status; -import org.apache.qpid.proton.engine.Delivery; -import org.apache.qpid.proton.amqp.messaging.Accepted; -import org.apache.qpid.proton.amqp.messaging.Modified; -import org.apache.qpid.proton.amqp.messaging.Rejected; -import org.apache.qpid.proton.amqp.messaging.Released; -import org.apache.qpid.proton.amqp.messaging.Received; -import org.apache.qpid.proton.amqp.transport.DeliveryState; - -/** - * @deprecated Messenger will be removed from upcoming proton-j releases. - */ -class StoreEntry -{ - private Store _store; - private Integer _id; - private String _address; - private byte[] _encodedMsg; - private int _encodedLength; - private Delivery _delivery; - private Status _status = Status.UNKNOWN; - private Object _context; - private boolean _inStore = false; - - public StoreEntry(Store store, String address) - { - _store = store; - _address = address; - } - - public Store getStore() - { - return _store; - } - - public boolean isStored() - { - return _inStore; - } - - public void stored() - { - _inStore = true; - } - - public void notStored() - { - _inStore = false; - } - - public String getAddress() - { - return _address; - } - - public byte[] getEncodedMsg() - { - return _encodedMsg; - } - - public int getEncodedLength() - { - return _encodedLength; - } - - public void setEncodedMsg( byte[] encodedMsg, int length ) - { - _encodedMsg = encodedMsg; - _encodedLength = length; - } - - public void setId(int id) - { - _id = new Integer(id); - } - - public Integer getId() - { - return _id; - } - - public void setDelivery( Delivery d ) - { - if (_delivery != null) - { - _delivery.setContext(null); - } - _delivery = d; - if (_delivery != null) - { - _delivery.setContext(this); - } - updated(); - } - - public Delivery getDelivery() - { - return _delivery; - } - - public Status getStatus() - { - return _status; - } - - public void setStatus(Status status) - { - _status = status; - } - - private static Status _disp2status(DeliveryState disp) - { - if (disp == null) return Status.PENDING; - - if (disp instanceof Received) - return Status.PENDING; - if (disp instanceof Accepted) - return Status.ACCEPTED; - if (disp instanceof Rejected) - return Status.REJECTED; - if (disp instanceof Released) - return Status.RELEASED; - if (disp instanceof Modified) - return Status.MODIFIED; - assert(false); - return null; - } - - public void updated() - { - if (_delivery != null) - { - if (_delivery.getRemoteState() != null) - { - _status = _disp2status(_delivery.getRemoteState()); - } - else if (_delivery.remotelySettled()) - { - DeliveryState disp = _delivery.getLocalState(); - if (disp == null) { - _status = Status.SETTLED; - } else { - _status = _disp2status(_delivery.getLocalState()); - } - } - else - { - _status = Status.PENDING; - } - } - } - - public void setContext(Object context) - { - _context = context; - } - - public Object getContext() - { - return _context; - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java deleted file mode 100644 index 2d8b584..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.qpid.proton.messenger.impl; - -import org.apache.qpid.proton.messenger.Tracker; - -/** - * @deprecated Messenger will be removed from upcoming proton-j releases. - */ -class TrackerImpl implements Tracker -{ - public enum Type { - OUTGOING, - INCOMING - } - - private Type _type; - private int _sequence; - - TrackerImpl(Type type, int sequence) - { - _type = type; - _sequence = sequence; - } - - boolean isOutgoing() - { - return _type == Type.OUTGOING; - } - - int getSequence() - { - return _sequence; - } - - public String toString() - { - return (isOutgoing() ? "O:" : "I:") + Integer.toString(_sequence); - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Transform.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Transform.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Transform.java deleted file mode 100644 index c3a08ea..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Transform.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.proton.messenger.impl; - -import java.util.ArrayList; -import java.util.List; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - - -/** - * Transform - * - * @deprecated Messenger will be removed from upcoming proton-j releases. - */ -class Transform -{ - - private static class Rule { - - String _pattern; - String _substitution; - - Pattern _compiled; - StringBuilder _sb = new StringBuilder(); - boolean _matched = false; - String _result = null; - - Rule(String pattern, String substitution) - { - _pattern = pattern; - _substitution = substitution; - _compiled = Pattern.compile(_pattern.replace("*", "(.*)").replace("%", "([^/]*)")); - } - - boolean apply(String src) { - _matched = false; - _result = null; - Matcher m = _compiled.matcher(src); - if (m.matches()) { - _matched = true; - if (_substitution != null) { - _sb.setLength(0); - int limit = _substitution.length(); - int idx = 0; - while (idx < limit) { - char c = _substitution.charAt(idx); - switch (c) { - case '$': - idx++; - if (idx < limit) { - c = _substitution.charAt(idx); - } else { - throw new IllegalStateException("substition index truncated"); - } - - if (c == '$') { - _sb.append(c); - idx++; - } else { - int num = 0; - while (Character.isDigit(c)) { - num *= 10; - num += c - '0'; - idx++; - c = idx < limit ? _substitution.charAt(idx) : '\0'; - } - if (num > 0) { - _sb.append(m.group(num)); - } else { - throw new IllegalStateException - ("bad substitution index at character[" + - idx + "]: " + _substitution); - } - } - break; - default: - _sb.append(c); - idx++; - break; - } - } - _result = _sb.toString(); - } - } - - return _matched; - } - - boolean matched() { - return _matched; - } - - String result() { - return _result; - } - - } - - private List<Rule> _rules = new ArrayList<Rule>(); - private Rule _matched = null; - - public void rule(String pattern, String substitution) - { - _rules.add(new Rule(pattern, substitution)); - } - - public boolean apply(String src) - { - _matched = null; - - for (Rule rule: _rules) { - if (rule.apply(src)) { - _matched = rule; - break; - } - } - - return _matched != null; - } - - public boolean matched() - { - return _matched != null; - } - - public String result() - { - return _matched != null ? _matched.result() : null; - } - -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java index c5abbd8..b9fd1de 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java @@ -40,7 +40,6 @@ import org.apache.qpid.proton.reactor.Reactor; import org.apache.qpid.proton.reactor.impl.ReactorImpl; import org.apache.qpid.proton.reactor.Selectable; import org.apache.qpid.proton.reactor.Selectable.Callback; -import org.apache.qpid.proton.messenger.impl.Address; @SuppressWarnings("deprecation") public class AcceptorImpl implements Acceptor { http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Address.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Address.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Address.java new file mode 100644 index 0000000..619912f --- /dev/null +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Address.java @@ -0,0 +1,219 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.proton.reactor.impl; + + +/** + * Address + * + */ +public class Address +{ + + private String _address; + private boolean _passive; + private String _scheme; + private String _user; + private String _pass; + private String _host; + private String _port; + private String _name; + + public void clear() + { + _passive = false; + _scheme = null; + _user = null; + _pass = null; + _host = null; + _port = null; + _name = null; + } + + /** + * @deprecated Messenger will be removed from upcoming proton-j releases. + */ + public Address() + { + clear(); + } + + /** + * @deprecated Messenger will be removed from upcoming proton-j releases. + */ + public Address(String address) + { + clear(); + int start = 0; + int schemeEnd = address.indexOf("://", start); + if (schemeEnd >= 0) { + _scheme = address.substring(start, schemeEnd); + start = schemeEnd + 3; + } + + String uphp; + int slash = address.indexOf("/", start); + if (slash >= 0) { + uphp = address.substring(start, slash); + _name = address.substring(slash + 1); + } else { + uphp = address.substring(start); + } + + String hp; + int at = uphp.indexOf('@'); + if (at >= 0) { + String up = uphp.substring(0, at); + hp = uphp.substring(at + 1); + + int colon = up.indexOf(':'); + if (colon >= 0) { + _user = up.substring(0, colon); + _pass = up.substring(colon + 1); + } else { + _user = up; + } + } else { + hp = uphp; + } + + if (hp.startsWith("[")) { + int close = hp.indexOf(']'); + if (close >= 0) { + _host = hp.substring(1, close); + if (hp.substring(close + 1).startsWith(":")) { + _port = hp.substring(close + 2); + } + } + } + + if (_host == null) { + int colon = hp.indexOf(':'); + if (colon >= 0) { + _host = hp.substring(0, colon); + _port = hp.substring(colon + 1); + } else { + _host = hp; + } + } + + if (_host.startsWith("~")) { + _host = _host.substring(1); + _passive = true; + } + } + + public String toString() + { + String str = new String(); + if (_scheme != null) str += _scheme + "://"; + if (_user != null) str += _user; + if (_pass != null) str += ":" + _pass; + if (_user != null || _pass != null) str += "@"; + if (_host != null) { + if (_host.contains(":")) str += "[" + _host + "]"; + else str += _host; + } + if (_port != null) str += ":" + _port; + if (_name != null) str += "/" + _name; + return str; + } + + public boolean isPassive() + { + return _passive; + } + + public String getScheme() + { + return _scheme; + } + + public String getUser() + { + return _user; + } + + public String getPass() + { + return _pass; + } + + public String getHost() + { + return _host; + } + + public String getPort() + { + return _port; + } + + public String getImpliedPort() + { + if (_port == null) { + return getDefaultPort(); + } else { + return getPort(); + } + } + + public String getDefaultPort() + { + if ("amqps".equals(_scheme)) return "5671"; + else return "5672"; + } + + public String getName() + { + return _name; + } + + public void setScheme(String scheme) + { + _scheme= scheme; + } + + public void setUser(String user) + { + _user= user; + } + + public void setPass(String pass) + { + _pass= pass; + } + + public void setHost(String host) + { + _host= host; + } + + public void setPort(String port) + { + _port= port; + } + + public void setName(String name) + { + _name= name; + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java index 2dd7e1a..1282083 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java @@ -45,7 +45,6 @@ import org.apache.qpid.proton.reactor.Selectable.Callback; import org.apache.qpid.proton.reactor.Selector; import org.apache.qpid.proton.reactor.Acceptor; import org.apache.qpid.proton.reactor.impl.AcceptorImpl; -import org.apache.qpid.proton.messenger.impl.Address; @SuppressWarnings("deprecation") public class IOHandler extends BaseHandler { http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java index 30c8df9..9d38b85 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java @@ -51,7 +51,6 @@ import org.apache.qpid.proton.reactor.Selectable; import org.apache.qpid.proton.reactor.Selectable.Callback; import org.apache.qpid.proton.reactor.Selector; import org.apache.qpid.proton.reactor.Task; -import org.apache.qpid.proton.messenger.impl.Address; @SuppressWarnings("deprecation") public class ReactorImpl implements Reactor, Extendable { http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/proton-j/src/test/java/org/apache/qpid/proton/messenger/impl/AddressTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/messenger/impl/AddressTest.java b/proton-j/src/test/java/org/apache/qpid/proton/messenger/impl/AddressTest.java deleted file mode 100644 index 77154b6..0000000 --- a/proton-j/src/test/java/org/apache/qpid/proton/messenger/impl/AddressTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.qpid.proton.messenger.impl; - -import static org.junit.Assert.*; - -import org.junit.Test; - -public class AddressTest { - - @SuppressWarnings("deprecation") - private void testParse(String url, String scheme, String user, String pass, String host, String port, String name) - { - Address address = new Address(url); - assertEquals(scheme, address.getScheme()); - assertEquals(user, address.getUser()); - assertEquals(pass, address.getPass()); - assertEquals(host, address.getHost()); - assertEquals(port, address.getPort()); - assertEquals(url, address.toString()); - } - - @Test - public void addressTests() - { - testParse("host", null, null, null, "host", null, null); - testParse("host:423", null, null, null, "host", "423", null); - testParse("user@host", null, "user", null, "host", null, null); - testParse("user:1243^&^:pw@host:423", null, "user", "1243^&^:pw", "host", "423", null); - testParse("user:1243^&^:pw@host:423/Foo.bar:90087", null, "user", "1243^&^:pw", "host", "423", "Foo.bar:90087"); - testParse("user:1243^&^:pw@host:423/Foo.bar:90087@somewhere", null, "user", "1243^&^:pw", "host", "423", "Foo.bar:90087@somewhere"); - testParse("[::1]", null, null, null, "::1", null, null); - testParse("[::1]:amqp", null, null, null, "::1", "amqp", null); - testParse("user@[::1]", null, "user", null, "::1", null, null); - testParse("user@[::1]:amqp", null, "user", null, "::1", "amqp", null); - testParse("user:1243^&^:pw@[::1]:amqp", null, "user", "1243^&^:pw", "::1", "amqp", null); - testParse("user:1243^&^:pw@[::1]:amqp/Foo.bar:90087", null, "user", "1243^&^:pw", "::1", "amqp", "Foo.bar:90087"); - testParse("user:1243^&^:pw@[::1:amqp/Foo.bar:90087", null, "user", "1243^&^:pw", "[", ":1:amqp", "Foo.bar:90087"); - testParse("user:1243^&^:pw@::1]:amqp/Foo.bar:90087", null, "user", "1243^&^:pw", "", ":1]:amqp", "Foo.bar:90087"); - testParse("amqp://user@[::1]", "amqp", "user", null, "::1", null, null); - testParse("amqp://user@[::1]:amqp", "amqp", "user", null, "::1", "amqp", null); - testParse("amqp://user@[1234:52:0:1260:f2de:f1ff:fe59:8f87]:amqp", "amqp", "user", null, "1234:52:0:1260:f2de:f1ff:fe59:8f87", "amqp", null); - testParse("amqp://user:1243^&^:pw@[::1]:amqp", "amqp", "user", "1243^&^:pw", "::1", "amqp", null); - testParse("amqp://user:1243^&^:pw@[::1]:amqp/Foo.bar:90087", "amqp", "user", "1243^&^:pw", "::1", "amqp", "Foo.bar:90087"); - testParse("amqp://host", "amqp", null, null, "host", null, null); - testParse("amqp://user@host", "amqp", "user", null, "host", null, null); - testParse("amqp://user@host/path:%", "amqp", "user", null, "host", null, "path:%"); - testParse("amqp://user@host:5674/path:%", "amqp", "user", null, "host", "5674", "path:%"); - testParse("amqp://user@host/path:%", "amqp", "user", null, "host", null, "path:%"); - testParse("amqp://bigbird@host/queue@host", "amqp", "bigbird", null, "host", null, "queue@host"); - testParse("amqp://host/queue@host", "amqp", null, null, "host", null, "queue@host"); - testParse("amqp://host:9765/queue@host", "amqp", null, null, "host", "9765", "queue@host"); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
