Author: pmouawad
Date: Sat Nov 22 20:01:50 2014
New Revision: 1641118
URL: http://svn.apache.org/r1641118
Log:
Bug 55932 - Create a Async BackendListener to allow easy plug of new listener
(Graphite, JDBC, Console,...)
Fix javadocs , naming
But more importantly fix multi-threading issues on instance variables as there
is only 1 BackendListener shared by all threads
Bugzilla Id: 55932
Modified:
jmeter/trunk/src/components/org/apache/jmeter/visualizers/backend/BackendListener.java
Modified:
jmeter/trunk/src/components/org/apache/jmeter/visualizers/backend/BackendListener.java
URL:
http://svn.apache.org/viewvc/jmeter/trunk/src/components/org/apache/jmeter/visualizers/backend/BackendListener.java?rev=1641118&r1=1641117&r2=1641118&view=diff
==============================================================================
---
jmeter/trunk/src/components/org/apache/jmeter/visualizers/backend/BackendListener.java
(original)
+++
jmeter/trunk/src/components/org/apache/jmeter/visualizers/backend/BackendListener.java
Sat Nov 22 20:01:50 2014
@@ -21,11 +21,10 @@ package org.apache.jmeter.visualizers.ba
import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import org.apache.jmeter.config.Arguments;
@@ -53,13 +52,7 @@ public class BackendListener extends Abs
*/
private static final long serialVersionUID = 8184103677832024335L;
- private static final Logger log = LoggingManager.getLoggerForClass();
-
- /**
- * Set used to register instances which implement teardownTest.
- * This is used so that the BackendListenerClient can be notified when the
test ends.
- */
- private static final Set<BackendListener> TEAR_DOWN_SET = new
HashSet<BackendListener>();
+ private static final Logger LOGGER = LoggingManager.getLoggerForClass();
/**
* Property key representing the classname of the BackendListenerClient to
user.
@@ -80,7 +73,7 @@ public class BackendListener extends Abs
* The BackendListenerClient class used by this sampler.
* Created by testStarted; copied to cloned instances.
*/
- private Class<?> javaClass;
+ private Class<?> clientClass;
/**
* If true, the BackendListenerClient class implements teardownTest.
@@ -93,20 +86,14 @@ public class BackendListener extends Abs
*/
private transient BackendListenerClient backendListenerClient = null;
- /**
- * The BackendListenerContext instance used by this sampler to hold
information
- * related to the test run, such as the parameters specified for the
sampler
- * client.
- */
- private transient BackendListenerContext context = null;
private static final int DEFAULT_QUEUE_SIZE = 5000;
private transient BlockingQueue<SampleResult> queue; // created by server
in readResolve method
- private transient long queueWaits; // how many times we had to wait to
queue a sample
+ private AtomicLong queueWaits; // how many times we had to wait to queue a
sample
- private transient long queueWaitTime; // how long we had to wait
(nanoSeconds)
+ private AtomicLong queueWaitTime; // how long we had to wait (nanoSeconds)
// Create unique object as marker for end of queue
private transient static final SampleResult FINAL_EVENT = new
SampleResult();
@@ -125,7 +112,7 @@ public class BackendListener extends Abs
@Override
public Object clone() {
BackendListener clone = (BackendListener) super.clone();
- clone.javaClass = this.javaClass;
+ clone.clientClass = this.clientClass;
clone.isToBeRegistered = this.isToBeRegistered;
return clone;
}
@@ -133,26 +120,16 @@ public class BackendListener extends Abs
private void initClass() {
String name = getClassname().trim();
try {
- javaClass = Class.forName(name, false,
Thread.currentThread().getContextClassLoader());
- Method method = javaClass.getMethod("teardownTest", new
Class[]{BackendListenerContext.class});
+ clientClass = Class.forName(name, false,
Thread.currentThread().getContextClassLoader());
+ Method method = clientClass.getMethod("teardownTest", new
Class[]{BackendListenerContext.class});
isToBeRegistered =
!method.getDeclaringClass().equals(AbstractBackendListenerClient.class);
- log.info("Created class: " + name + ". Uses teardownTest: " +
isToBeRegistered);
+ LOGGER.info("Created class: " + name + ". Uses teardownTest: " +
isToBeRegistered);
} catch (Exception e) {
- log.error(whoAmI() + "\tException initialising: " + name, e);
+ LOGGER.error(whoAmI() + "\tException initialising: " + name, e);
}
}
/**
- * Retrieves reference to BackendListenerClient.
- *
- * Convience method used to check for null reference without actually
- * creating a BackendListenerClient
- *
- * @return reference to BackendListenerClient NOTUSED private
BackendListenerClient
- * retrieveJavaClient() { return javaClient; }
- */
-
- /**
* Generate a String identifier of this instance for debugging purposes.
*
* @return a String identifier for this sampler instance
@@ -168,37 +145,40 @@ public class BackendListener extends Abs
}
// TestStateListener implementation
- /* Implements TestStateListener.testStarted() */
+ /**
+ * Implements TestStateListener.testStarted()
+ **/
@Override
public void testStarted() {
testStarted("");
}
- /* Implements TestStateListener.testStarted(String) */
+ /** Implements TestStateListener.testStarted(String)
+ **/
@Override
public void testStarted(String host) {
- log.debug(whoAmI() + "\ttestStarted(" + host + ")");
+ if(LOGGER.isDebugEnabled()){
+ LOGGER.debug(whoAmI() + "\ttestStarted(" + host + ")");
+ }
queue = new ArrayBlockingQueue<SampleResult>(getQueueSize());
initClass();
- queueWaits=0L;
- queueWaitTime=0L;
- log.info(getName()+":Starting worker with class:"+javaClass +" and
queue capacity:"+getQueueSize());
-
- backendListenerClient = createBackendListenerClientImpl(javaClass);
- context = new
BackendListenerContext((Arguments)getArguments().clone());
- if(isToBeRegistered) {
- TEAR_DOWN_SET.add(this);
- }
+ queueWaits = new AtomicLong(0L);
+ queueWaitTime = new AtomicLong(0L);
+ LOGGER.info(getName()+":Starting worker with class:"+clientClass +"
and queue capacity:"+getQueueSize());
+
+ backendListenerClient = createBackendListenerClientImpl(clientClass);
+ BackendListenerContext context = new
BackendListenerContext((Arguments)getArguments().clone());
+
try {
backendListenerClient.setupTest(context);
} catch (Exception e) {
throw new java.lang.IllegalStateException("Failed calling
setupTest", e);
}
- Worker worker = new Worker(javaClass, backendListenerClient,
(Arguments) getArguments().clone(), queue);
+ Worker worker = new Worker(backendListenerClient, (Arguments)
getArguments().clone(), queue);
worker.setDaemon(true);
worker.start();
- log.info(getName()+":Started worker with class:"+javaClass);
+ LOGGER.info(getName()+": Started worker with class:"+clientClass);
}
@@ -206,30 +186,33 @@ public class BackendListener extends Abs
* @see
org.apache.jmeter.samplers.SampleListener#sampleOccurred(org.apache.jmeter.samplers.SampleEvent)
*/
@Override
- public void sampleOccurred(SampleEvent e) {
+ public void sampleOccurred(SampleEvent event) {
Arguments args = getArguments();
- context = new BackendListenerContext(args);
+ BackendListenerContext context = new BackendListenerContext(args);
- SampleResult sr = backendListenerClient.createSampleResult(context,
e.getResult());
+ SampleResult sr = backendListenerClient.createSampleResult(context,
event.getResult());
try {
if (!queue.offer(sr)){ // we failed to add the element first time
- queueWaits++;
+ queueWaits.incrementAndGet();
long t1 = System.nanoTime();
queue.put(sr);
long t2 = System.nanoTime();
- queueWaitTime += t2-t1;
+ queueWaitTime.addAndGet(t2-t1);
}
} catch (Exception err) {
- log.error("sampleOccurred, failed to queue the sample", err);
+ LOGGER.error("sampleOccurred, failed to queue the sample", err);
}
}
+ /**
+ * Thread that dequeus data from queue to send it to {@link
BackendListenerClient}
+ */
private static final class Worker extends Thread {
private final BlockingQueue<SampleResult> queue;
private final BackendListenerContext context;
private final BackendListenerClient backendListenerClient;
- private Worker(Class<?> javaClass, BackendListenerClient
backendListenerClient, Arguments arguments, BlockingQueue<SampleResult> q){
+ private Worker(BackendListenerClient backendListenerClient, Arguments
arguments, BlockingQueue<SampleResult> q){
queue = q;
// Allow BackendListenerClient implementations to get access to
test element name
arguments.addArgument(TestElement.NAME, getName());
@@ -240,37 +223,37 @@ public class BackendListener extends Abs
@Override
public void run() {
- boolean isDebugEnabled = log.isDebugEnabled();
- List<SampleResult> l = new ArrayList<SampleResult>(queue.size());
+ boolean isDebugEnabled = LOGGER.isDebugEnabled();
+ List<SampleResult> sampleResults = new
ArrayList<SampleResult>(queue.size());
try {
boolean eof = false;
while (!eof) {
if(isDebugEnabled) {
- log.debug("Thread:"+Thread.currentThread().getName()+"
taking SampleResult from queue:"+queue.size());
+
LOGGER.debug("Thread:"+Thread.currentThread().getName()+" taking SampleResult
from queue:"+queue.size());
}
SampleResult e = queue.take();
if(isDebugEnabled) {
- log.debug("Thread:"+Thread.currentThread().getName()+"
took SampleResult:"+e+", isFinal:" + (e==FINAL_EVENT));
+
LOGGER.debug("Thread:"+Thread.currentThread().getName()+" took
SampleResult:"+e+", isFinal:" + (e==FINAL_EVENT));
}
while (!(eof = (e == FINAL_EVENT)) && e != null ) { // try
to process as many as possible
- l.add(e);
+ sampleResults.add(e);
if(isDebugEnabled) {
-
log.debug("Thread:"+Thread.currentThread().getName()+" polling from
queue:"+queue.size());
+
LOGGER.debug("Thread:"+Thread.currentThread().getName()+" polling from
queue:"+queue.size());
}
e = queue.poll(); // returns null if nothing on queue
currently
if(isDebugEnabled) {
-
log.debug("Thread:"+Thread.currentThread().getName()+" took from queue:"+e+",
isFinal:" + (e==FINAL_EVENT));
+
LOGGER.debug("Thread:"+Thread.currentThread().getName()+" took from
queue:"+e+", isFinal:" + (e==FINAL_EVENT));
}
}
if(isDebugEnabled) {
- log.debug("Thread:"+Thread.currentThread().getName()+
+
LOGGER.debug("Thread:"+Thread.currentThread().getName()+
" exiting with FINAL EVENT:"+(e == FINAL_EVENT)
+", null:" + (e==null));
}
- int size = l.size();
+ int size = sampleResults.size();
if (size > 0) {
- backendListenerClient.handleSampleResults(l, context);
- l.clear();
+
backendListenerClient.handleSampleResults(sampleResults, context);
+ sampleResults.clear();
}
if(!eof) {
LockSupport.parkNanos(100);
@@ -280,12 +263,12 @@ public class BackendListener extends Abs
// NOOP
}
// We may have been interrupted
- int size = l.size();
+ int size = sampleResults.size();
if (size > 0) {
- backendListenerClient.handleSampleResults(l, context);
- l.clear();
+ backendListenerClient.handleSampleResults(sampleResults,
context);
+ sampleResults.clear();
}
- log.info("Worker ended");
+ LOGGER.info("Worker ended");
}
}
@@ -296,15 +279,15 @@ public class BackendListener extends Abs
*
* @return BackendListenerClient reference.
*/
- static BackendListenerClient createBackendListenerClientImpl(Class<?>
javaClass) {
- if (javaClass == null) { // failed to initialise the class
+ static BackendListenerClient createBackendListenerClientImpl(Class<?>
clientClass) {
+ if (clientClass == null) { // failed to initialise the class
return new ErrorBackendListenerClient();
}
BackendListenerClient client;
try {
- client = (BackendListenerClient) javaClass.newInstance();
+ client = (BackendListenerClient) clientClass.newInstance();
} catch (Exception e) {
- log.error("Exception creating: " + javaClass, e);
+ LOGGER.error("Exception creating: " + clientClass, e);
client = new ErrorBackendListenerClient();
}
return client;
@@ -322,27 +305,22 @@ public class BackendListener extends Abs
try {
queue.put(FINAL_EVENT);
} catch (Exception ex) {
- log.warn("testEnded() with exception:"+ex.getMessage(), ex);
+ LOGGER.warn("testEnded() with exception:"+ex.getMessage(), ex);
}
- if (queueWaits > 0) {
- log.warn("QueueWaits: "+queueWaits+"; QueueWaitTime:
"+queueWaitTime+" (nanoseconds), you may need to increase queue capacity, see
property 'backend_queue_capacity'");
+ if (queueWaits.get() > 0) {
+ LOGGER.warn("QueueWaits: "+queueWaits+"; QueueWaitTime:
"+queueWaitTime+" (nanoseconds), you may need to increase queue capacity, see
property 'backend_queue_capacity'");
}
- synchronized (TEAR_DOWN_SET) {
- for (BackendListener backendListener : TEAR_DOWN_SET) {
- BackendListenerClient client =
backendListener.backendListenerClient;
- if (client != null) {
- try {
- client.teardownTest(backendListener.context);
- } catch (Exception e) {
- throw new java.lang.IllegalStateException("Failed
calling teardownTest", e);
- }
- }
- }
- TEAR_DOWN_SET.clear();
+
+ try {
+ BackendListenerContext context = new
BackendListenerContext(getArguments());
+ backendListenerClient.teardownTest(context);
+ } catch (Exception e) {
+ throw new java.lang.IllegalStateException("Failed calling
teardownTest", e);
}
}
- /* Implements TestStateListener.testEnded(String) */
+ /** Implements TestStateListener.testEnded(String)
+ **/
@Override
public void testEnded(String host) {
testEnded();
@@ -363,7 +341,7 @@ public class BackendListener extends Abs
*/
@Override
public void handleSampleResults(List<SampleResult> sampleResults,
BackendListenerContext context) {
- log.warn("ErrorBackendListenerClient#handleSampleResult called,
noop");
+ LOGGER.warn("ErrorBackendListenerClient#handleSampleResult called,
noop");
Thread.yield();
}
}