Author: rajdavies
Date: Wed May 21 04:47:13 2008
New Revision: 658637
URL: http://svn.apache.org/viewvc?rev=658637&view=rev
Log:
Make socket close async optional - for
https://issues.apache.org/activemq/browse/AMQ-1739
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/ConnectionChurnTest.java
(with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=658637&r1=658636&r2=658637&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
Wed May 21 04:47:13 2008
@@ -65,6 +65,7 @@
protected int soTimeout;
protected int socketBufferSize = 64 * 1024;
protected int ioBufferSize = 8 * 1024;
+ protected boolean closeAsync=true;
protected Socket socket;
protected DataOutputStream dataOut;
protected DataInputStream dataIn;
@@ -335,6 +336,20 @@
public void setIoBufferSize(int ioBufferSize) {
this.ioBufferSize = ioBufferSize;
}
+
+ /**
+ * @return the closeAsync
+ */
+ public boolean isCloseAsync() {
+ return closeAsync;
+ }
+
+ /**
+ * @param closeAsync the closeAsync to set
+ */
+ public void setCloseAsync(boolean closeAsync) {
+ this.closeAsync = closeAsync;
+ }
// Implementation methods
//
-------------------------------------------------------------------------
@@ -441,22 +456,33 @@
// is hung.. then this hangs the close.
// closeStreams();
if (socket != null) {
- //closing the socket can hang also
- final CountDownLatch latch = new CountDownLatch(1);
- SOCKET_CLOSE.execute(new Runnable() {
-
- public void run() {
- try {
- socket.close();
- } catch (IOException e) {
- LOG.debug("Caught exception closing socket",e);
- }finally {
- latch.countDown();
+ if (closeAsync) {
+ //closing the socket can hang also
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ SOCKET_CLOSE.execute(new Runnable() {
+
+ public void run() {
+ try {
+ socket.shutdownInput();
+ socket.shutdownOutput();
+ socket.close();
+ } catch (IOException e) {
+ LOG.debug("Caught exception closing socket",e);
+ }finally {
+ latch.countDown();
+ }
}
+
+ });
+ latch.await(1,TimeUnit.SECONDS);
+ }else {
+ try {
+ socket.close();
+ } catch (IOException e) {
+ LOG.debug("Caught exception closing socket",e);
}
-
- });
- latch.await(1,TimeUnit.SECONDS);
+ }
}
}
@@ -512,6 +538,7 @@
SOCKET_CLOSE = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "TcpSocketClose:
"+runnable);
+ thread.setPriority(Thread.MAX_PRIORITY);
thread.setDaemon(true);
return thread;
}
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/ConnectionChurnTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/ConnectionChurnTest.java?rev=658637&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/ConnectionChurnTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/ConnectionChurnTest.java
Wed May 21 04:47:13 2008
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.perf;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision: 1.3 $
+ */
+public class ConnectionChurnTest extends TestCase {
+ protected static final int CONNECTION_COUNT = 200;
+ private static final Log LOG =
LogFactory.getLog(ConnectionChurnTest.class);
+ protected BrokerService broker;
+ protected String bindAddress =
ActiveMQConnection.DEFAULT_BROKER_BIND_URL+"?transport.closeAsync=false";
+ protected int topicCount;
+
+ public void testPerformance() throws Exception {
+ ConnectionFactory factory = createConnectionFactory();
+ List<Connection> list = new ArrayList();
+ for (int i = 0; i < CONNECTION_COUNT; i++) {
+ Connection connection = factory.createConnection();
+ connection.start();
+ list.add(connection);
+ LOG.info("Created " + i);
+ if (i % 100 == 0) {
+ closeConnections(list);
+ }
+ }
+ closeConnections(list);
+ }
+
+ protected void closeConnections(List<Connection> list) throws JMSException
{
+ for (Connection c : list) {
+ c.close();
+ }
+ for (TransportConnector tc : broker.getTransportConnectors()) {
+ System.out.println(tc.getConnections().size());
+ }
+ list.clear();
+ }
+
+ protected void setUp() throws Exception {
+ if (broker == null) {
+ broker = createBroker();
+ }
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ if (broker != null) {
+ broker.stop();
+ }
+ }
+
+ protected ActiveMQConnectionFactory createConnectionFactory()
+ throws Exception {
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
+ ActiveMQConnection.DEFAULT_BROKER_URL);
+ return cf;
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService answer = new BrokerService();
+ configureBroker(answer);
+ answer.start();
+ return answer;
+ }
+
+ protected void configureBroker(BrokerService answer) throws Exception {
+ answer.setPersistent(false);
+ answer.addConnector(bindAddress);
+ answer.setDeleteAllMessagesOnStartup(true);
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/ConnectionChurnTest.java
------------------------------------------------------------------------------
svn:eol-style = native