Author: rgodfrey
Date: Fri Jun 5 13:54:57 2015
New Revision: 1683757
URL: http://svn.apache.org/r1683757
Log:
QPID-6573 : Add broker connection close guard to protect against a client that
does not respond to the connection close command in a timely manner
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ConnectionClosingTicker.java
(with props)
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/AggregateTicker.java
(with props)
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java
Fri Jun 5 13:54:57 2015
@@ -24,6 +24,7 @@ import org.apache.qpid.server.model.Brok
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.transport.network.AggregateTicker;
import org.apache.qpid.transport.network.NetworkConnection;
public interface ProtocolEngineCreator extends Pluggable
@@ -34,6 +35,6 @@ public interface ProtocolEngineCreator e
NetworkConnection network,
AmqpPort<?> port,
Transport transport,
- long id);
+ long id, final AggregateTicker
aggregateTicker);
}
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ConnectionClosingTicker.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ConnectionClosingTicker.java?rev=1683757&view=auto
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ConnectionClosingTicker.java
(added)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ConnectionClosingTicker.java
Fri Jun 5 13:54:57 2015
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.Ticker;
+
+public class ConnectionClosingTicker implements Ticker
+{
+ private final long _timeoutTime;
+ private final NetworkConnection _network;
+
+ public ConnectionClosingTicker(final long timeoutTime, final
NetworkConnection network)
+ {
+ _timeoutTime = timeoutTime;
+ _network = network;
+ }
+
+ @Override
+ public int getTimeToNextTick(final long currentTime)
+ {
+ return (int) (_timeoutTime - currentTime);
+ }
+
+ @Override
+ public int tick(final long currentTime)
+ {
+ int nextTick = getTimeToNextTick(currentTime);
+ if(nextTick <= 0)
+ {
+ _network.close();
+ }
+ return nextTick;
+ }
+}
Propchange:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ConnectionClosingTicker.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
Fri Jun 5 13:54:57 2015
@@ -24,7 +24,6 @@ package org.apache.qpid.server.protocol;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
-import java.security.Principal;
import java.security.cert.Certificate;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -44,6 +43,7 @@ import org.apache.qpid.server.plugin.Pro
import org.apache.qpid.server.security.ManagedPeerCertificateTrustStore;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.network.AggregateTicker;
import org.apache.qpid.transport.network.NetworkConnection;
public class MultiVersionProtocolEngine implements ServerProtocolEngine
@@ -65,6 +65,7 @@ public class MultiVersionProtocolEngine
private volatile ServerProtocolEngine _delegate = new
SelfDelegateProtocolEngine();
private final AtomicReference<Action<ServerProtocolEngine>> _workListener
= new AtomicReference<>();
+ private final AggregateTicker _aggregateTicker = new AggregateTicker();
public MultiVersionProtocolEngine(final Broker<?> broker,
final Set<Protocol> supported,
@@ -250,6 +251,12 @@ public class MultiVersionProtocolEngine
_delegate.clearWork();
}
+ @Override
+ public AggregateTicker getAggregateTicker()
+ {
+ return _aggregateTicker;
+ }
+
private class ClosedDelegateProtocolEngine implements ServerProtocolEngine
{
@@ -384,6 +391,12 @@ public class MultiVersionProtocolEngine
public void setTransportBlockedForWriting(final boolean blocked)
{
}
+
+ @Override
+ public AggregateTicker getAggregateTicker()
+ {
+ return _aggregateTicker;
+ }
}
private class SelfDelegateProtocolEngine implements ServerProtocolEngine
@@ -448,6 +461,12 @@ public class MultiVersionProtocolEngine
}
@Override
+ public AggregateTicker getAggregateTicker()
+ {
+ return _aggregateTicker;
+ }
+
+ @Override
public void clearWork()
{
_hasWork.set(false);
@@ -499,8 +518,8 @@ public class MultiVersionProtocolEngine
if(equal)
{
newDelegate =
_creators[i].newProtocolEngine(_broker,
-
_network, _port, _transport, _id
- );
+
_network, _port, _transport, _id,
+
_aggregateTicker);
}
}
@@ -566,6 +585,8 @@ public class MultiVersionProtocolEngine
}
+
+
}
public long getConnectionId()
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java
Fri Jun 5 13:54:57 2015
@@ -24,6 +24,7 @@ import javax.security.auth.Subject;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.transport.network.AggregateTicker;
public interface ServerProtocolEngine extends ProtocolEngine
{
@@ -51,4 +52,6 @@ public interface ServerProtocolEngine ex
void notifyWork();
void setWorkListener(Action<ServerProtocolEngine> listener);
+
+ AggregateTicker getAggregateTicker();
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
Fri Jun 5 13:54:57 2015
@@ -83,8 +83,7 @@ class QueueConsumerImpl
}
private final ConsumerTarget _target;
- private final StateChangeListener<ConsumerTarget, ConsumerTarget.State>
- _listener;
+ private final StateChangeListener<ConsumerTarget, ConsumerTarget.State>
_listener;
private volatile QueueContext _queueContext;
private StateChangeListener<? super QueueConsumerImpl, State>
_stateListener = new StateChangeListener<QueueConsumerImpl, State>()
{
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
Fri Jun 5 13:54:57 2015
@@ -59,7 +59,6 @@ public class NonBlockingConnection imple
private static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6;
private final SocketChannel _socketChannel;
- private final Ticker _ticker;
private final Object _peerPrincipalLock = new Object();
private final SelectorThread _selector;
private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new
ConcurrentLinkedQueue<>();
@@ -103,7 +102,6 @@ public class NonBlockingConnection imple
final SelectorThread selectorThread)
{
_socketChannel = socketChannel;
- _ticker = ticker;
_selector = selectorThread;
_protocolEngine = delegate;
@@ -152,7 +150,7 @@ public class NonBlockingConnection imple
Ticker getTicker()
{
- return _ticker;
+ return _protocolEngine.getAggregateTicker();
}
SocketChannel getSocketChannel()
@@ -282,10 +280,10 @@ public class NonBlockingConnection imple
_workDone = false;
long currentTime = System.currentTimeMillis();
- int tick = _ticker.getTimeToNextTick(currentTime);
+ int tick = getTicker().getTimeToNextTick(currentTime);
if (tick <= 0)
{
- _ticker.tick(currentTime);
+ getTicker().tick(currentTime);
}
_protocolEngine.setMessageAssignmentSuspended(true);
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
Fri Jun 5 13:54:57 2015
@@ -39,6 +39,7 @@ import org.apache.qpid.server.protocol.M
import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.transport.NetworkTransportConfiguration;
import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.AggregateTicker;
import org.apache.qpid.transport.network.TransportEncryption;
import org.apache.qpid.transport.network.io.AbstractNetworkTransport;
import org.apache.qpid.transport.network.io.IdleTimeoutTicker;
@@ -161,13 +162,16 @@ public class NonBlockingNetworkTransport
socketChannel.configureBlocking(false);
- final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine,
_timeout);
+ AggregateTicker aggregateTicker = engine.getAggregateTicker();
+
+ final IdleTimeoutTicker idleTimeoutTicker = new
IdleTimeoutTicker(engine, _timeout);
+ aggregateTicker.addTicker(idleTimeoutTicker);
NonBlockingConnection connection =
new NonBlockingConnection(socketChannel,
engine,
receiveBufferSize,
- ticker,
+ idleTimeoutTicker,
_encryptionSet,
_sslContext,
_config.wantClientAuth(),
@@ -188,7 +192,7 @@ public class NonBlockingNetworkTransport
engine.setNetworkConnection(connection,
connection.getSender());
connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
- ticker.setConnection(connection);
+ idleTimeoutTicker.setConnection(connection);
connection.start();
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
Fri Jun 5 13:54:57 2015
@@ -32,6 +32,7 @@ import org.apache.qpid.server.plugin.Plu
import org.apache.qpid.server.plugin.ProtocolEngineCreator;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.transport.ConnectionDelegate;
+import org.apache.qpid.transport.network.AggregateTicker;
import org.apache.qpid.transport.network.NetworkConnection;
@PluggableService
@@ -69,7 +70,7 @@ public class ProtocolEngineCreator_0_10
NetworkConnection network,
AmqpPort<?> port,
Transport transport,
- long id)
+ long id, final
AggregateTicker aggregateTicker)
{
String fqdn = null;
SocketAddress address = network.getLocalAddress();
@@ -86,7 +87,7 @@ public class ProtocolEngineCreator_0_10
conn.setRemoteAddress(network.getRemoteAddress());
conn.setLocalAddress(network.getLocalAddress());
- ProtocolEngine_0_10 protocolEngine = new ProtocolEngine_0_10(conn,
network);
+ ProtocolEngine_0_10 protocolEngine = new ProtocolEngine_0_10(conn,
network, aggregateTicker);
conn.setProtocolEngine(protocolEngine);
return protocolEngine;
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
Fri Jun 5 13:54:57 2015
@@ -41,6 +41,7 @@ import org.apache.qpid.server.protocol.A
import org.apache.qpid.server.util.Action;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.Constant;
+import org.apache.qpid.transport.network.AggregateTicker;
import org.apache.qpid.transport.network.Assembler;
import org.apache.qpid.transport.network.InputHandler;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -50,6 +51,7 @@ public class ProtocolEngine_0_10 extend
{
public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
private static final Logger _logger =
LoggerFactory.getLogger(ProtocolEngine_0_10.class);
+ private final AggregateTicker _aggregateTicker;
private NetworkConnection _network;
@@ -69,10 +71,11 @@ public class ProtocolEngine_0_10 extend
public ProtocolEngine_0_10(ServerConnection conn,
- NetworkConnection network)
+ NetworkConnection network, final
AggregateTicker aggregateTicker)
{
super(new ServerAssembler(conn));
_connection = conn;
+ _aggregateTicker = aggregateTicker;
if(network != null)
{
@@ -81,6 +84,12 @@ public class ProtocolEngine_0_10 extend
}
@Override
+ public AggregateTicker getAggregateTicker()
+ {
+ return _aggregateTicker;
+ }
+
+ @Override
public boolean isMessageAssignmentSuspended()
{
Thread lock = _messageAssignmentSuspended.get();
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
Fri Jun 5 13:54:57 2015
@@ -40,6 +40,7 @@ import java.util.concurrent.atomic.Atomi
import javax.security.auth.Subject;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.protocol.ConnectionClosingTicker;
import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.logging.EventLogger;
@@ -64,6 +65,7 @@ import org.apache.qpid.transport.Connect
import org.apache.qpid.transport.ExecutionErrorCode;
import org.apache.qpid.transport.ExecutionException;
import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.Option;
import org.apache.qpid.transport.ProtocolEvent;
import org.apache.qpid.transport.Session;
@@ -71,6 +73,7 @@ public class ServerConnection extends Co
LogSubject,
AuthorizationHolder
{
+ public static final long CLOSE_OK_TIMEOUT = 10000l;
private final Broker<?> _broker;
private AtomicBoolean _logClosed = new AtomicBoolean(false);
@@ -175,7 +178,10 @@ public class ServerConnection extends Co
_virtualHost.getConnectionRegistry().deregisterConnection(this);
}
}
-
+ if(state == State.CLOSING)
+ {
+ getProtocolEngine().getAggregateTicker().addTicker(new
ConnectionClosingTicker(System.currentTimeMillis() + CLOSE_OK_TIMEOUT,
getNetworkConnection()));
+ }
if (state == State.CLOSED)
{
logClosed();
@@ -416,6 +422,14 @@ public class ServerConnection extends Co
});
}
+ @Override
+ protected void sendConnectionClose(final ConnectionCloseCode replyCode,
+ final String replyText,
+ final Option... _options)
+ {
+ super.sendConnectionClose(replyCode, replyText, _options);
+ }
+
protected void performDeleteTasks()
{
for(Action<? super ServerConnection> task : _connectionCloseTaskList)
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
Fri Jun 5 13:54:57 2015
@@ -198,8 +198,8 @@ public class ServerConnectionDelegate ex
final String redirectHost = vhost.getRedirectHost(port);
if(redirectHost == null)
{
- sconn.invoke(new
ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED,
- "Virtual host '" +
vhostName + "' is not active"));
+
sconn.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED,
+ "Virtual host '" +
vhostName + "' is not active");
}
else
{
@@ -214,14 +214,14 @@ public class ServerConnectionDelegate ex
if(!vhost.authoriseCreateConnection(sconn))
{
sconn.setState(Connection.State.CLOSING);
- sconn.invoke(new
ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Connection not
authorized"));
+
sconn.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Connection
not authorized");
return;
}
}
catch (AccessControlException e)
{
sconn.setState(Connection.State.CLOSING);
- sconn.invoke(new
ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage()));
+
sconn.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED,
e.getMessage());
return;
}
@@ -231,8 +231,8 @@ public class ServerConnectionDelegate ex
else
{
sconn.setState(Connection.State.CLOSING);
- sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH,
- "Unknown virtualhost '" +
vhostName + "'"));
+ sconn.sendConnectionClose(ConnectionCloseCode.INVALID_PATH,
+ "Unknown virtualhost '" +
vhostName + "'");
}
}
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
Fri Jun 5 13:54:57 2015
@@ -59,6 +59,7 @@ import org.apache.qpid.common.ServerProp
import org.apache.qpid.framing.*;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.protocol.ConnectionClosingTicker;
import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.connection.ConnectionPrincipal;
@@ -91,6 +92,7 @@ import org.apache.qpid.transport.ByteBuf
import org.apache.qpid.transport.SenderClosedException;
import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.AggregateTicker;
import org.apache.qpid.transport.network.NetworkConnection;
public class AMQProtocolEngine implements ServerProtocolEngine,
@@ -99,6 +101,7 @@ public class AMQProtocolEngine implement
{
+ private final AggregateTicker _aggregateTicker;
enum ConnectionState
{
@@ -118,6 +121,9 @@ public class AMQProtocolEngine implement
private static final String BROKER_DEBUG_BINARY_DATA_LENGTH =
"broker.debug.binaryDataLength";
private static final int DEFAULT_DEBUG_BINARY_DATA_LENGTH = 80;
private static final long AWAIT_CLOSED_TIMEOUT = 60000;
+
+ private static final long CLOSE_OK_TIMEOUT = 10000l;
+
private final AmqpPort<?> _port;
private final long _creationTime;
private final AtomicBoolean _stateChanged = new AtomicBoolean();
@@ -206,48 +212,16 @@ public class AMQProtocolEngine implement
private final AtomicReference<Thread> _messageAssignmentSuspended = new
AtomicReference<>();
-
- @Override
- public boolean isMessageAssignmentSuspended()
- {
- Thread lock = _messageAssignmentSuspended.get();
- return lock != null && _messageAssignmentSuspended.get() !=
Thread.currentThread();
- }
-
- @Override
- public void setMessageAssignmentSuspended(final boolean
messageAssignmentSuspended)
- {
- _messageAssignmentSuspended.set(messageAssignmentSuspended ?
Thread.currentThread() : null);
- for(AMQSessionModel<?,?> session : getSessionModels())
- {
- for (Consumer<?> consumer : session.getConsumers())
- {
- ConsumerImpl consumerImpl = (ConsumerImpl) consumer;
- if (!messageAssignmentSuspended)
- {
- consumerImpl.getTarget().notifyCurrentState();
- }
- else
- {
- // ensure that by the time the method returns, no consumer
can be in the process of
- // delivering a message.
- consumerImpl.getSendLock();
- consumerImpl.releaseSendLock();
- }
- }
- }
- }
-
-
public AMQProtocolEngine(Broker<?> broker,
final NetworkConnection network,
final long connectionId,
AmqpPort<?> port,
- Transport transport)
+ Transport transport, final AggregateTicker
aggregateTicker)
{
_broker = broker;
_port = port;
_transport = transport;
+ _aggregateTicker = aggregateTicker;
_maxNoOfChannels = broker.getConnection_sessionCountLimit();
_decoder = new BrokerDecoder(this);
_connectionId = connectionId;
@@ -283,6 +257,44 @@ public class AMQProtocolEngine implement
_creationTime = System.currentTimeMillis();
}
+
+ @Override
+ public AggregateTicker getAggregateTicker()
+ {
+ return _aggregateTicker;
+ }
+
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ Thread lock = _messageAssignmentSuspended.get();
+ return lock != null && _messageAssignmentSuspended.get() !=
Thread.currentThread();
+ }
+
+ @Override
+ public void setMessageAssignmentSuspended(final boolean
messageAssignmentSuspended)
+ {
+ _messageAssignmentSuspended.set(messageAssignmentSuspended ?
Thread.currentThread() : null);
+ for(AMQSessionModel<?,?> session : getSessionModels())
+ {
+ for (Consumer<?> consumer : session.getConsumers())
+ {
+ ConsumerImpl consumerImpl = (ConsumerImpl) consumer;
+ if (!messageAssignmentSuspended)
+ {
+ consumerImpl.getTarget().notifyCurrentState();
+ }
+ else
+ {
+ // ensure that by the time the method returns, no consumer
can be in the process of
+ // delivering a message.
+ consumerImpl.getSendLock();
+ consumerImpl.releaseSendLock();
+ }
+ }
+ }
+ }
+
private <T> T runAsSubject(PrivilegedAction<T> action)
{
return Subject.doAs(getAuthorizedSubject(), action);
@@ -913,7 +925,10 @@ public class AMQProtocolEngine implement
}
finally
{
- closeNetworkConnection();
+ final long timeoutTime = System.currentTimeMillis() +
CLOSE_OK_TIMEOUT;
+ final NetworkConnection network = _network;
+ _aggregateTicker.addTicker(new
ConnectionClosingTicker(timeoutTime, network));
+
}
}
}
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java
Fri Jun 5 13:54:57 2015
@@ -27,6 +27,7 @@ import org.apache.qpid.server.model.Tran
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.PluggableService;
import org.apache.qpid.server.plugin.ProtocolEngineCreator;
+import org.apache.qpid.transport.network.AggregateTicker;
import org.apache.qpid.transport.network.NetworkConnection;
@PluggableService
@@ -62,9 +63,9 @@ public class ProtocolEngineCreator_0_8 i
NetworkConnection network,
AmqpPort<?> port,
Transport transport,
- long id)
+ long id, final
AggregateTicker aggregateTicker)
{
- return new AMQProtocolEngine(broker, network, id, port, transport);
+ return new AMQProtocolEngine(broker, network, id, port, transport,
aggregateTicker);
}
private static ProtocolEngineCreator INSTANCE = new
ProtocolEngineCreator_0_8();
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java
Fri Jun 5 13:54:57 2015
@@ -27,6 +27,7 @@ import org.apache.qpid.server.model.Tran
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.PluggableService;
import org.apache.qpid.server.plugin.ProtocolEngineCreator;
+import org.apache.qpid.transport.network.AggregateTicker;
import org.apache.qpid.transport.network.NetworkConnection;
@PluggableService
@@ -62,9 +63,9 @@ public class ProtocolEngineCreator_0_9 i
NetworkConnection network,
AmqpPort<?> port,
Transport transport,
- long id)
+ long id, final
AggregateTicker aggregateTicker)
{
- return new AMQProtocolEngine(broker, network, id, port, transport);
+ return new AMQProtocolEngine(broker, network, id, port, transport,
aggregateTicker);
}
private static ProtocolEngineCreator INSTANCE = new
ProtocolEngineCreator_0_9();
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java
Fri Jun 5 13:54:57 2015
@@ -27,6 +27,7 @@ import org.apache.qpid.server.model.Tran
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.PluggableService;
import org.apache.qpid.server.plugin.ProtocolEngineCreator;
+import org.apache.qpid.transport.network.AggregateTicker;
import org.apache.qpid.transport.network.NetworkConnection;
@PluggableService
@@ -63,9 +64,9 @@ public class ProtocolEngineCreator_0_9_1
NetworkConnection network,
AmqpPort<?> port,
Transport transport,
- long id)
+ long id, final
AggregateTicker aggregateTicker)
{
- return new AMQProtocolEngine(broker, network, id, port, transport);
+ return new AMQProtocolEngine(broker, network, id, port, transport,
aggregateTicker);
}
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java
Fri Jun 5 13:54:57 2015
@@ -36,6 +36,7 @@ import org.apache.qpid.server.model.Tran
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.transport.network.AggregateTicker;
import org.apache.qpid.transport.network.NetworkConnection;
public class AMQProtocolEngineTest extends QpidTestCase
@@ -73,7 +74,7 @@ public class AMQProtocolEngineTest exten
public void testSetClientPropertiesForNoRouteProvidedAsString()
{
- AMQProtocolEngine engine = new AMQProtocolEngine(_broker, _network, 0,
_port, _transport);
+ AMQProtocolEngine engine = new AMQProtocolEngine(_broker, _network, 0,
_port, _transport, new AggregateTicker());
assertTrue("Unexpected closeWhenNoRoute before client properties set",
engine.isCloseWhenNoRoute());
Map<String, Object> clientProperties = new HashMap<String, Object>();
@@ -85,7 +86,7 @@ public class AMQProtocolEngineTest exten
public void testSetClientPropertiesForNoRouteProvidedAsBoolean()
{
- AMQProtocolEngine engine = new AMQProtocolEngine(_broker, _network, 0,
_port, _transport);
+ AMQProtocolEngine engine = new AMQProtocolEngine(_broker, _network, 0,
_port, _transport, new AggregateTicker());
assertTrue("Unexpected closeWhenNoRoute before client properties set",
engine.isCloseWhenNoRoute());
Map<String, Object> clientProperties = new HashMap<String, Object>();
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
Fri Jun 5 13:54:57 2015
@@ -53,6 +53,7 @@ import org.apache.qpid.server.security.a
import org.apache.qpid.server.security.auth.UsernamePrincipal;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.network.AggregateTicker;
import org.apache.qpid.transport.network.NetworkConnection;
public class InternalTestProtocolSession extends AMQProtocolEngine implements
ProtocolOutputConverter
@@ -65,7 +66,7 @@ public class InternalTestProtocolSession
public InternalTestProtocolSession(VirtualHostImpl virtualHost, Broker<?>
broker, final AmqpPort<?> port) throws AMQException
{
- super(broker, new TestNetworkConnection(),
ID_GENERATOR.getAndIncrement(), port, null);
+ super(broker, new TestNetworkConnection(),
ID_GENERATOR.getAndIncrement(), port, null, new AggregateTicker());
_channelDelivers = new HashMap<Integer, Map<String,
LinkedList<DeliveryPair>>>();
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
Fri Jun 5 13:54:57 2015
@@ -27,6 +27,7 @@ import org.apache.qpid.server.model.Tran
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.PluggableService;
import org.apache.qpid.server.plugin.ProtocolEngineCreator;
+import org.apache.qpid.transport.network.AggregateTicker;
import org.apache.qpid.transport.network.NetworkConnection;
@PluggableService
@@ -62,9 +63,9 @@ public class ProtocolEngineCreator_1_0_0
NetworkConnection network,
AmqpPort<?> port,
Transport transport,
- long id)
+ long id, final
AggregateTicker aggregateTicker)
{
- return new ProtocolEngine_1_0_0_SASL(network, broker, id, port,
transport);
+ return new ProtocolEngine_1_0_0_SASL(network, broker, id, port,
transport, aggregateTicker);
}
private static ProtocolEngineCreator INSTANCE = new
ProtocolEngineCreator_1_0_0_SASL();
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
Fri Jun 5 13:54:57 2015
@@ -52,6 +52,7 @@ import org.apache.qpid.amqp_1_0.type.Fra
import org.apache.qpid.amqp_1_0.type.Symbol;
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.common.ServerPropertyNames;
+import org.apache.qpid.server.protocol.ConnectionClosingTicker;
import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.model.Broker;
@@ -63,13 +64,16 @@ import org.apache.qpid.server.security.S
import org.apache.qpid.server.security.auth.UsernamePrincipal;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.network.AggregateTicker;
import org.apache.qpid.transport.network.NetworkConnection;
public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine,
FrameOutputHandler
{
+ public static final long CLOSE_REPONSE_TIMEOUT = 10000l;
private final AmqpPort<?> _port;
private final Transport _transport;
+ private final AggregateTicker _aggregateTicker;
private long _readBytes;
private long _writtenBytes;
@@ -143,19 +147,29 @@ public class ProtocolEngine_1_0_0_SASL i
- public ProtocolEngine_1_0_0_SASL(final NetworkConnection networkDriver,
final Broker<?> broker,
- long id, AmqpPort<?> port, Transport
transport)
+ public ProtocolEngine_1_0_0_SASL(final NetworkConnection networkDriver,
+ final Broker<?> broker,
+ long id,
+ AmqpPort<?> port,
+ Transport transport,
+ final AggregateTicker aggregateTicker)
{
_connectionId = id;
_broker = broker;
_port = port;
_transport = transport;
+ _aggregateTicker = aggregateTicker;
if(networkDriver != null)
{
setNetworkConnection(networkDriver, networkDriver.getSender());
}
}
+ @Override
+ public AggregateTicker getAggregateTicker()
+ {
+ return _aggregateTicker;
+ }
@Override
public boolean isMessageAssignmentSuspended()
@@ -544,7 +558,8 @@ public class ProtocolEngine_1_0_0_SASL i
public void close()
{
- _sender.close();
+ getAggregateTicker().addTicker(new
ConnectionClosingTicker(System.currentTimeMillis()+ CLOSE_REPONSE_TIMEOUT,
_network));
+
}
public void setLogOutput(final PrintWriter out)
Added:
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/AggregateTicker.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/AggregateTicker.java?rev=1683757&view=auto
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/AggregateTicker.java
(added)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/AggregateTicker.java
Fri Jun 5 13:54:57 2015
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network;
+
+import java.util.concurrent.CopyOnWriteArrayList;
+
+public class AggregateTicker implements Ticker
+{
+
+ private final CopyOnWriteArrayList<Ticker> _tickers = new
CopyOnWriteArrayList<>();
+
+ @Override
+ public int getTimeToNextTick(final long currentTime)
+ {
+ int nextTick = Integer.MAX_VALUE;
+ for(Ticker ticker : _tickers)
+ {
+ nextTick = Math.min(ticker.getTimeToNextTick(currentTime),
nextTick);
+ }
+ return nextTick;
+ }
+
+ @Override
+ public int tick(final long currentTime)
+ {
+ int nextTick = Integer.MAX_VALUE;
+ for(Ticker ticker : _tickers)
+ {
+ nextTick = Math.min(ticker.tick(currentTime), nextTick);
+ }
+ return nextTick;
+ }
+
+ public CopyOnWriteArrayList<Ticker> getTickers()
+ {
+ return _tickers;
+ }
+
+ public void addTicker(Ticker ticker)
+ {
+ _tickers.add(ticker);
+ }
+
+ public void removeTicker(Ticker ticker)
+ {
+ _tickers.remove(ticker);
+ }
+}
Propchange:
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/AggregateTicker.java
------------------------------------------------------------------------------
svn:eol-style = native
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]