Author: kwall
Date: Wed Feb 24 16:30:32 2016
New Revision: 1732184
URL: http://svn.apache.org/viewvc?rev=1732184&view=rev
Log:
QPID-7033: [Java Broker] Change IO tickers use scheduled time when considering
if it is time to 'tick'
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ConnectionClosingTicker.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SuspendedConsumerLoggingTicker.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
qpid/java/trunk/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/HeartbeatTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ConnectionClosingTicker.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ConnectionClosingTicker.java?rev=1732184&r1=1732183&r2=1732184&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ConnectionClosingTicker.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ConnectionClosingTicker.java
Wed Feb 24 16:30:32 2016
@@ -20,15 +20,15 @@
*/
package org.apache.qpid.server.protocol;
-import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.server.transport.ServerNetworkConnection;
import org.apache.qpid.transport.network.Ticker;
public class ConnectionClosingTicker implements Ticker
{
private final long _timeoutTime;
- private final NetworkConnection _network;
+ private final ServerNetworkConnection _network;
- public ConnectionClosingTicker(final long timeoutTime, final
NetworkConnection network)
+ public ConnectionClosingTicker(final long timeoutTime, final
ServerNetworkConnection network)
{
_timeoutTime = timeoutTime;
_network = network;
@@ -37,6 +37,11 @@ public class ConnectionClosingTicker imp
@Override
public int getTimeToNextTick(final long currentTime)
{
+ if (_network.getScheduledTime() > 0)
+ {
+ return (int) (_timeoutTime - _network.getScheduledTime());
+ }
+
return (int) (_timeoutTime - currentTime);
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SuspendedConsumerLoggingTicker.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SuspendedConsumerLoggingTicker.java?rev=1732184&r1=1732183&r2=1732184&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SuspendedConsumerLoggingTicker.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SuspendedConsumerLoggingTicker.java
Wed Feb 24 16:30:32 2016
@@ -24,8 +24,8 @@ import org.apache.qpid.transport.network
abstract public class SuspendedConsumerLoggingTicker implements Ticker
{
- private long _nextTick;
- private long _startTime;
+ private volatile long _nextTick;
+ private volatile long _startTime;
private final long _repeatPeriod;
public SuspendedConsumerLoggingTicker(final long repeatPeriod)
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1732184&r1=1732183&r2=1732184&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
Wed Feb 24 16:30:32 2016
@@ -43,7 +43,6 @@ import com.google.common.util.concurrent
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.connection.ConnectionPrincipal;
@@ -107,6 +106,7 @@ public abstract class AbstractAMQPConnec
private volatile Thread _ioThread;
private volatile boolean _messageAuthorizationRequired;
+ private volatile SlowConnectionOpenTicker _slowConnectionOpenTicker;
private final AtomicLong _maxMessageSize = new AtomicLong(Long.MAX_VALUE);
@@ -173,7 +173,8 @@ public abstract class AbstractAMQPConnec
{
super.onOpen();
long maxAuthDelay = _port.getContextValue(Long.class,
Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY);
- _aggregateTicker.addTicker(new SlowConnectionOpenTicker(maxAuthDelay));
+ _slowConnectionOpenTicker = new SlowConnectionOpenTicker(maxAuthDelay);
+ _aggregateTicker.addTicker(_slowConnectionOpenTicker);
_lastReadTime = _lastWriteTime = getCreatedTime();
}
@@ -183,7 +184,7 @@ public abstract class AbstractAMQPConnec
return _broker;
}
- public final NetworkConnection getNetwork()
+ public final ServerNetworkConnection getNetwork()
{
return _network;
}
@@ -722,9 +723,21 @@ public abstract class AbstractAMQPConnec
return !_messageAuthorizationRequired ||
getAuthorizedPrincipal().getName().equals(userId == null? "" : userId);
}
+ @Override
+ public void processingStarted(final long currentTime)
+ {
+ SlowConnectionOpenTicker ticker = _slowConnectionOpenTicker;
+ long scheduledTime = _network.getScheduledTime();
+ if (ticker != null && scheduledTime > 0)
+ {
+ ticker.addSchedulingDelay(currentTime - scheduledTime);
+ }
+ }
+
private class SlowConnectionOpenTicker implements Ticker
{
private final long _allowedTime;
+ private volatile long _accumulatedDelay;
public SlowConnectionOpenTicker(long timeoutTime)
{
@@ -734,7 +747,7 @@ public abstract class AbstractAMQPConnec
@Override
public int getTimeToNextTick(final long currentTime)
{
- final int timeToNextTick = (int) (getCreatedTime() + _allowedTime
- currentTime);
+ final int timeToNextTick = (int) (getCreatedTime() + _allowedTime
+ _accumulatedDelay - currentTime);
return timeToNextTick;
}
@@ -754,10 +767,15 @@ public abstract class AbstractAMQPConnec
else
{
_aggregateTicker.removeTicker(this);
+ _slowConnectionOpenTicker = null;
}
}
return nextTick;
}
- }
+ public void addSchedulingDelay(final long delay)
+ {
+ _accumulatedDelay += delay;
+ }
+ }
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1732184&r1=1732183&r2=1732184&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
Wed Feb 24 16:30:32 2016
@@ -148,6 +148,12 @@ public class MultiVersionProtocolEngine
_delegate.setIOThread(ioThread);
}
+ @Override
+ public void processingStarted(final long currentTime)
+ {
+ _delegate.processingStarted(currentTime);
+ }
+
public long getConnectionId()
{
return _id;
@@ -294,6 +300,12 @@ public class MultiVersionProtocolEngine
}
@Override
+ public void processingStarted(final long currentTime)
+ {
+
+ }
+
+ @Override
public void closed()
{
@@ -539,6 +551,11 @@ public class MultiVersionProtocolEngine
}
+ @Override
+ public void processingStarted(final long currentTime)
+ {
+
+ }
@Override
public Subject getSubject()
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1732184&r1=1732183&r2=1732184&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
Wed Feb 24 16:30:32 2016
@@ -66,6 +66,7 @@ public class NonBlockingConnection imple
private final AmqpPort _port;
private final AtomicBoolean _scheduled = new AtomicBoolean();
+ private volatile long _scheduledTime;
private volatile boolean _unexpectedByteBufferSizeReported;
private final String _threadName;
private volatile SelectorThread.SelectionTask _selectionTask;
@@ -241,11 +242,13 @@ public class NonBlockingConnection imple
try
{
long currentTime = System.currentTimeMillis();
+ _protocolEngine.processingStarted(currentTime);
int tick = getTicker().getTimeToNextTick(currentTime);
if (tick <= 0)
{
getTicker().tick(currentTime);
}
+ _scheduledTime = 0;
_protocolEngine.setIOThread(Thread.currentThread());
_protocolEngine.setMessageAssignmentSuspended(true, true);
@@ -597,7 +600,12 @@ public class NonBlockingConnection imple
public boolean setScheduled()
{
- return _scheduled.compareAndSet(false,true);
+ final boolean scheduled = _scheduled.compareAndSet(false, true);
+ if (scheduled)
+ {
+ _scheduledTime = System.currentTimeMillis();
+ }
+ return scheduled;
}
public void clearScheduled()
@@ -605,6 +613,12 @@ public class NonBlockingConnection imple
_scheduled.set(false);
}
+ @Override
+ public long getScheduledTime()
+ {
+ return _scheduledTime;
+ }
+
void reportUnexpectedByteBufferSizeUsage()
{
if (!_unexpectedByteBufferSizeReported)
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1732184&r1=1732183&r2=1732184&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
Wed Feb 24 16:30:32 2016
@@ -36,8 +36,8 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.configuration.CommonProperties;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.AggregateTicker;
+import org.apache.qpid.transport.network.Ticker;
import org.apache.qpid.transport.network.TransportEncryption;
-import org.apache.qpid.transport.network.io.IdleTimeoutTicker;
import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
@@ -149,10 +149,7 @@ public class NonBlockingNetworkTransport
socketChannel.configureBlocking(false);
- AggregateTicker aggregateTicker =
engine.getAggregateTicker();
- final IdleTimeoutTicker idleTimeoutTicker = new
IdleTimeoutTicker(engine, _timeout);
- aggregateTicker.addTicker(idleTimeoutTicker);
NonBlockingConnection connection =
new NonBlockingConnection(socketChannel,
@@ -170,11 +167,16 @@ public class NonBlockingNetworkTransport
_scheduler,
_port);
+ AggregateTicker aggregateTicker =
engine.getAggregateTicker();
+
+ Ticker writeIdleTimeoutTicker = new
ServerIdleWriteTimeoutTicker(connection, engine, _timeout);
+ Ticker readIdleTimeoutTicker = new
ServerIdleReadTimeoutTicker(connection, engine, _timeout);
+ aggregateTicker.addTicker(writeIdleTimeoutTicker);
+ aggregateTicker.addTicker(readIdleTimeoutTicker);
+
engine.setNetworkConnection(connection);
connection.setMaxReadIdleMillis(1000L * HANDSHAKE_TIMEOUT);
- idleTimeoutTicker.setConnection(connection);
-
connection.start();
_scheduler.addConnection(connection);
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java?rev=1732184&r1=1732183&r2=1732184&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
Wed Feb 24 16:30:32 2016
@@ -75,4 +75,6 @@ public interface ProtocolEngine extends
void received(QpidByteBuffer msg);
void setIOThread(Thread ioThread);
+
+ void processingStarted(long currentTime);
}
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java?rev=1732184&view=auto
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java
(added)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java
Wed Feb 24 16:30:32 2016
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.transport;
+
+import org.apache.qpid.transport.network.Ticker;
+import org.apache.qpid.transport.network.TransportActivity;
+
+public class ServerIdleReadTimeoutTicker implements Ticker
+{
+ private final TransportActivity _transport;
+ private final int _defaultTimeout;
+ private final ServerNetworkConnection _connection;
+
+ public ServerIdleReadTimeoutTicker(ServerNetworkConnection connection,
TransportActivity transport,
+ int defaultTimeout)
+ {
+ _connection = connection;
+ _transport = transport;
+ _defaultTimeout = defaultTimeout;
+ }
+
+ @Override
+ public int getTimeToNextTick(long currentTime)
+ {
+ final long maxReadIdle = _connection.getMaxReadIdleMillis();
+ if (maxReadIdle > 0)
+ {
+ long nextTime = _transport.getLastReadTime() + maxReadIdle;
+ return (int) (nextTime - (_connection.getScheduledTime() > 0 ?
_connection.getScheduledTime() : currentTime) );
+ }
+
+ return _defaultTimeout;
+ }
+
+ @Override
+ public int tick(long currentTime)
+ {
+ int timeToNextTick = getTimeToNextTick(currentTime);;
+ if (_connection.getMaxReadIdleMillis() > 0 && timeToNextTick <= 0)
+ {
+ _transport.readerIdle();
+ }
+
+ return timeToNextTick;
+ }
+}
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java?rev=1732184&view=auto
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java
(added)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java
Wed Feb 24 16:30:32 2016
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.transport;
+
+import org.apache.qpid.transport.network.Ticker;
+import org.apache.qpid.transport.network.TransportActivity;
+
+public class ServerIdleWriteTimeoutTicker implements Ticker
+{
+ private final TransportActivity _transport;
+ private final int _defaultTimeout;
+ private final ServerNetworkConnection _connection;
+
+ public ServerIdleWriteTimeoutTicker(ServerNetworkConnection connection,
TransportActivity transport,
+ int defaultTimeout)
+ {
+ _connection = connection;
+ _transport = transport;
+ _defaultTimeout = defaultTimeout;
+ }
+
+ @Override
+ public int getTimeToNextTick(long currentTime)
+ {
+ long maxWriteIdle = _connection.getMaxWriteIdleMillis();
+ if (maxWriteIdle > 0)
+ {
+ long writeTime = _transport.getLastWriteTime() + maxWriteIdle;
+ return (int) (writeTime - currentTime);
+ }
+
+ return _defaultTimeout;
+ }
+
+ @Override
+ public int tick(long currentTime)
+ {
+ int timeToNextTick = getTimeToNextTick(currentTime);
+ if (_connection.getMaxWriteIdleMillis() > 0 && timeToNextTick <= 0)
+ {
+ _transport.writerIdle();
+ }
+
+ return timeToNextTick;
+ }
+}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java?rev=1732184&r1=1732183&r2=1732184&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
Wed Feb 24 16:30:32 2016
@@ -26,4 +26,6 @@ public interface ServerNetworkConnection
void reserveOutboundMessageSpace(long size);
String getTransportInfo();
+
+ long getScheduledTime();
}
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1732184&r1=1732183&r2=1732184&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
Wed Feb 24 16:30:32 2016
@@ -50,6 +50,7 @@ import org.apache.qpid.server.model.Tran
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
+import org.apache.qpid.server.transport.ServerNetworkConnection;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.transport.Connection;
@@ -139,7 +140,7 @@ public class ServerConnection extends Co
if(state == State.CLOSING)
{
- getAmqpConnection().getAggregateTicker().addTicker(new
ConnectionClosingTicker(System.currentTimeMillis() + CLOSE_OK_TIMEOUT,
getNetworkConnection()));
+ getAmqpConnection().getAggregateTicker().addTicker(new
ConnectionClosingTicker(System.currentTimeMillis() + CLOSE_OK_TIMEOUT,
(ServerNetworkConnection) getNetworkConnection()));
}
}
Modified:
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1732184&r1=1732183&r2=1732184&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
(original)
+++
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
Wed Feb 24 16:30:32 2016
@@ -451,6 +451,12 @@ class WebSocketProvider implements Accep
return _connection.getProtocol();
}
+ @Override
+ public long getScheduledTime()
+ {
+ return 0;
+ }
+
void setPeerCertificate(final Certificate certificate)
{
_certificate = certificate;
Modified:
qpid/java/trunk/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java?rev=1732184&r1=1732183&r2=1732184&view=diff
==============================================================================
---
qpid/java/trunk/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
(original)
+++
qpid/java/trunk/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
Wed Feb 24 16:30:32 2016
@@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
* A basic implementation of TCP traffic forwarder between ports.
* It is intended to use in tests.
*/
-public class TCPTunneler
+public class TCPTunneler implements AutoCloseable
{
private static final Logger LOGGER =
LoggerFactory.getLogger(TCPTunneler.class);
@@ -60,6 +60,11 @@ public class TCPTunneler
_tcpWorker.start();
}
+ public void stopClientToServerForwarding(final InetSocketAddress
clientAddress)
+ {
+ _tcpWorker.stopClientToServerForwarding(clientAddress);
+ }
+
public void stop()
{
try
@@ -96,6 +101,12 @@ public class TCPTunneler
return _tcpWorker.getLocalPort();
}
+ @Override
+ public void close() throws Exception
+ {
+ stop();
+ }
+
public interface TunnelListener
{
void clientConnected(InetSocketAddress clientAddress);
@@ -115,6 +126,7 @@ public class TCPTunneler
private final TunnelListener _notifyingListener;
private volatile ServerSocket _serverSocket;
private volatile ExecutorService _executor;
+ private int _actualLocalPort;
public TCPWorker(final int localPort,
final String remoteHost,
@@ -184,20 +196,22 @@ public class TCPTunneler
public void start()
{
- LOGGER.info("Starting TCPTunneler forwarding from port {} to {}",
_localPort, _remoteHostPort);
+ _actualLocalPort = _localPort;
try
{
_serverSocket = new ServerSocket(_localPort);
+ _actualLocalPort = _serverSocket.getLocalPort();
+ LOGGER.info ("Starting
TCPTunneler forwarding from port {} to {}",
+ _actualLocalPort, _remoteHostPort);
_serverSocket.setReuseAddress(true);
}
catch (IOException e)
{
- throw new RuntimeException("Cannot start TCPTunneler on port "
+ _localPort, e);
+ throw new RuntimeException("Cannot start TCPTunneler on port "
+ _actualLocalPort, e);
}
if (_serverSocket != null)
{
- LOGGER.info("Listening on port {}", _localPort);
try
{
_executor.execute(this);
@@ -210,7 +224,7 @@ public class TCPTunneler
}
finally
{
- throw new RuntimeException("Cannot start acceptor
thread for TCPTunneler on port " + _localPort,
+ throw new RuntimeException("Cannot start acceptor
thread for TCPTunneler on port " + _actualLocalPort,
e);
}
}
@@ -222,7 +236,7 @@ public class TCPTunneler
if (_closed.compareAndSet(false, true))
{
LOGGER.info("Stopping TCPTunneler forwarding from port {} to
{}",
- _localPort,
+ _actualLocalPort,
_remoteHostPort);
try
{
@@ -237,7 +251,7 @@ public class TCPTunneler
}
LOGGER.info("TCPTunneler forwarding from port {} to {} is
stopped",
- _localPort,
+ _actualLocalPort,
_remoteHostPort);
}
}
@@ -330,6 +344,28 @@ public class TCPTunneler
}
}
+ public void stopClientToServerForwarding(final InetSocketAddress
clientAddress)
+ {
+ SocketTunnel target = null;
+ for (SocketTunnel tunnel : _tunnels)
+ {
+ if (tunnel.getClientAddress().equals(clientAddress))
+ {
+ target = tunnel;
+ break;
+ }
+ }
+ if (target != null)
+ {
+ LOGGER.debug("Stopping forwarding from client {} to server",
clientAddress);
+ target.stopClientToServerForwarding();
+ }
+ else
+ {
+ throw new IllegalArgumentException("Could not find tunnel for
address " + clientAddress);
+ }
+ }
+
private void closeServerSocket()
{
if (_serverSocket != null)
@@ -349,7 +385,6 @@ public class TCPTunneler
}
}
-
private SocketTunnel removeTunnel(final InetSocketAddress
clientAddress)
{
SocketTunnel client = null;
@@ -384,8 +419,8 @@ public class TCPTunneler
private final Socket _serverSocket;
private final TunnelListener _tunnelListener;
private final AtomicBoolean _closed;
- private final AutoClosingStreamForwarder _upStreamForwarder;
- private final AutoClosingStreamForwarder _downStreamForwarder;
+ private final AutoClosingStreamForwarder _clientToServer;
+ private final AutoClosingStreamForwarder _serverToClient;
private final InetSocketAddress _clientSocketAddress;
public SocketTunnel(final Socket clientSocket,
@@ -400,8 +435,8 @@ public class TCPTunneler
_tunnelListener = tunnelListener;
_clientSocket.setKeepAlive(true);
_serverSocket.setKeepAlive(true);
- _upStreamForwarder = new AutoClosingStreamForwarder(new
StreamForwarder(_clientSocket, _serverSocket));
- _downStreamForwarder = new AutoClosingStreamForwarder(new
StreamForwarder(_serverSocket, _clientSocket));
+ _clientToServer = new AutoClosingStreamForwarder(new
StreamForwarder(_clientSocket, _serverSocket));
+ _serverToClient = new AutoClosingStreamForwarder(new
StreamForwarder(_serverSocket, _clientSocket));
}
public void close()
@@ -422,11 +457,16 @@ public class TCPTunneler
public void start(Executor executor) throws IOException
{
- executor.execute(_upStreamForwarder);
- executor.execute(_downStreamForwarder);
+ executor.execute(_clientToServer);
+ executor.execute(_serverToClient);
_tunnelListener.clientConnected(getClientAddress());
}
+ public void stopClientToServerForwarding()
+ {
+ _clientToServer.stopForwarding();
+ }
+
public boolean isClosed()
{
return _closed.get();
@@ -442,7 +482,6 @@ public class TCPTunneler
return _clientSocketAddress;
}
-
private static void closeSocket(Socket socket)
{
if (socket != null)
@@ -484,6 +523,11 @@ public class TCPTunneler
currentThread.setName(originalThreadName);
}
}
+
+ public void stopForwarding()
+ {
+ _streamForwarder.stopForwarding();
+ }
}
}
@@ -494,13 +538,13 @@ public class TCPTunneler
private final InputStream _inputStream;
private final OutputStream _outputStream;
private final String _name;
+ private AtomicBoolean _stopForwarding = new AtomicBoolean();
public StreamForwarder(Socket input, Socket output) throws IOException
{
_inputStream = input.getInputStream();
_outputStream = output.getOutputStream();
- _name = "Forwarder-" + input.getInetAddress().getHostName() + ":"
+ input.getPort() + "->"
- + output.getInetAddress().getHostName() + ":" +
output.getPort();
+ _name = "Forwarder-" + input.getLocalSocketAddress() + "->" +
output.getRemoteSocketAddress();
}
@Override
@@ -512,8 +556,16 @@ public class TCPTunneler
{
while ((bytesRead = _inputStream.read(buffer)) != -1)
{
- _outputStream.write(buffer, 0, bytesRead);
- _outputStream.flush();
+ if (!_stopForwarding.get())
+ {
+ _outputStream.write(buffer, 0, bytesRead);
+ _outputStream.flush();
+ LOGGER.debug("Forwarded {} byte(s)", bytesRead);
+ }
+ else
+ {
+ LOGGER.debug("Discarded {} byte(s)", bytesRead);
+ }
}
}
catch (IOException e)
@@ -547,5 +599,11 @@ public class TCPTunneler
{
return _name;
}
+
+ public void stopForwarding()
+ {
+ _stopForwarding.set(true);
+ }
+
}
}
Modified:
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java?rev=1732184&r1=1732183&r2=1732184&view=diff
==============================================================================
---
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
(original)
+++
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
Wed Feb 24 16:30:32 2016
@@ -529,6 +529,7 @@ public class QpidBrokerTestCase extends
@Override
protected void tearDown() throws java.lang.Exception
{
+ _logger.debug("tearDown started");
try
{
for (Connection c : _connections)
Modified:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/HeartbeatTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/HeartbeatTest.java?rev=1732184&r1=1732183&r2=1732184&view=diff
==============================================================================
---
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/HeartbeatTest.java
(original)
+++
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/HeartbeatTest.java
Wed Feb 24 16:30:32 2016
@@ -20,11 +20,15 @@ package org.apache.qpid.client;
import static
org.apache.qpid.configuration.ClientProperties.QPID_HEARTBEAT_INTERVAL;
+import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
@@ -34,6 +38,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TCPTunneler;
public class HeartbeatTest extends QpidBrokerTestCase
{
@@ -192,6 +197,56 @@ public class HeartbeatTest extends QpidB
conn.close();
}
+ public void testClientStopsSendingHeartbeats_BrokerClosesConnection()
throws Exception
+ {
+ try(TCPTunneler tcpTunneler = new TCPTunneler(getFailingPort(),
"localhost", getDefaultBroker().getAmqpPort(), 1))
+ {
+ tcpTunneler.start();
+
+ final AtomicReference<InetSocketAddress> clientAddressRef = new
AtomicReference<>();
+ tcpTunneler.addClientListener(new TCPTunneler.TunnelListener()
+ {
+ @Override
+ public void clientConnected(final InetSocketAddress
clientAddress)
+ {
+ clientAddressRef.set(clientAddress);
+ }
+
+ @Override
+ public void clientDisconnected(final InetSocketAddress
clientAddress)
+ {
+ }
+ });
+
+ final CountDownLatch exceptionLatch = new CountDownLatch(1);
+ final String url = String.format(CONNECTION_URL_WITH_HEARTBEAT,
tcpTunneler.getLocalPort(), 1);
+ AMQConnection conn = (AMQConnection) getConnection(new
AMQConnectionURL(url));
+ conn.setHeartbeatListener(_listener);
+ conn.setExceptionListener(new ExceptionListener()
+ {
+ @Override
+ public void onException(final JMSException exception)
+ {
+ LOGGER.debug("Exception listener got exception",
exception);
+ exceptionLatch.countDown();
+ }
+ });
+ conn.start();
+
+ assertNotNull(clientAddressRef.get());
+
+ _listener.awaitExpectedHeartbeats(MAXIMUM_WAIT_TIME);
+
+ assertTrue("Too few heartbeats received:
"+_listener.getHeartbeatsReceived() +" (expected at least 2)",
_listener.getHeartbeatsReceived() >=2);
+ assertTrue("Too few heartbeats sent
"+_listener.getHeartbeatsSent() +" (expected at least 2)",
_listener.getHeartbeatsSent() >=2);
+
+ tcpTunneler.stopClientToServerForwarding(clientAddressRef.get());
+
+ exceptionLatch.await(5, TimeUnit.SECONDS);
+ assertTrue("Connection should be disconnected within timeout",
conn.isConnected());
+ }
+ }
+
private class TestListener implements HeartbeatListener
{
private final String _name;
Modified:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java?rev=1732184&r1=1732183&r2=1732184&view=diff
==============================================================================
---
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
(original)
+++
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
Wed Feb 24 16:30:32 2016
@@ -143,11 +143,12 @@ public class ProtocolNegotiationTest ext
boolean brokenPipe = false;
while (timeout > System.currentTimeMillis())
{
- if (!writeHeartbeat(sender)) ;
+ if (!writeHeartbeat(sender))
{
brokenPipe = true;
break;
}
+ Thread.sleep(100);
}
assertTrue("Expected pipe to become broken within "
+ Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY + "
timeout", brokenPipe);
@@ -257,7 +258,6 @@ public class ProtocolNegotiationTest ext
catch (IOException e)
{
_success = false;
- throw new RuntimeException(e);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]