Author: pmouawad
Date: Sat Nov 22 20:01:50 2014
New Revision: 1641118

URL: http://svn.apache.org/r1641118
Log:
Bug 55932 - Create a Async BackendListener to allow easy plug of new listener 
(Graphite, JDBC, Console,...)
Fix javadocs , naming
But more importantly fix multi-threading issues on instance variables as there 
is only 1 BackendListener shared by all threads
Bugzilla Id: 55932

Modified:
    
jmeter/trunk/src/components/org/apache/jmeter/visualizers/backend/BackendListener.java

Modified: 
jmeter/trunk/src/components/org/apache/jmeter/visualizers/backend/BackendListener.java
URL: 
http://svn.apache.org/viewvc/jmeter/trunk/src/components/org/apache/jmeter/visualizers/backend/BackendListener.java?rev=1641118&r1=1641117&r2=1641118&view=diff
==============================================================================
--- 
jmeter/trunk/src/components/org/apache/jmeter/visualizers/backend/BackendListener.java
 (original)
+++ 
jmeter/trunk/src/components/org/apache/jmeter/visualizers/backend/BackendListener.java
 Sat Nov 22 20:01:50 2014
@@ -21,11 +21,10 @@ package org.apache.jmeter.visualizers.ba
 import java.io.Serializable;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.LockSupport;
 
 import org.apache.jmeter.config.Arguments;
@@ -53,13 +52,7 @@ public class BackendListener extends Abs
      */
     private static final long serialVersionUID = 8184103677832024335L;
 
-    private static final Logger log = LoggingManager.getLoggerForClass();
-
-    /**
-     * Set used to register instances which implement teardownTest.
-     * This is used so that the BackendListenerClient can be notified when the 
test ends.
-     */
-    private static final Set<BackendListener> TEAR_DOWN_SET = new 
HashSet<BackendListener>();
+    private static final Logger LOGGER = LoggingManager.getLoggerForClass();
 
     /**
      * Property key representing the classname of the BackendListenerClient to 
user.
@@ -80,7 +73,7 @@ public class BackendListener extends Abs
      * The BackendListenerClient class used by this sampler.
      * Created by testStarted; copied to cloned instances.
      */
-    private Class<?> javaClass;
+    private Class<?> clientClass;
 
     /**
      * If true, the BackendListenerClient class implements teardownTest.
@@ -93,20 +86,14 @@ public class BackendListener extends Abs
      */
     private transient BackendListenerClient backendListenerClient = null;
 
-    /**
-     * The BackendListenerContext instance used by this sampler to hold 
information
-     * related to the test run, such as the parameters specified for the 
sampler
-     * client.
-     */
-    private transient BackendListenerContext context = null;
 
     private static final int DEFAULT_QUEUE_SIZE = 5000;
     
     private transient BlockingQueue<SampleResult> queue; // created by server 
in readResolve method
     
-    private transient long queueWaits; // how many times we had to wait to 
queue a sample
+    private AtomicLong queueWaits; // how many times we had to wait to queue a 
sample
     
-    private transient long queueWaitTime; // how long we had to wait 
(nanoSeconds)
+    private AtomicLong queueWaitTime; // how long we had to wait (nanoSeconds)
 
     // Create unique object as marker for end of queue
     private transient static final SampleResult FINAL_EVENT = new 
SampleResult();
@@ -125,7 +112,7 @@ public class BackendListener extends Abs
     @Override
     public Object clone() {
         BackendListener clone = (BackendListener) super.clone();
-        clone.javaClass = this.javaClass;
+        clone.clientClass = this.clientClass;
         clone.isToBeRegistered = this.isToBeRegistered;
         return clone;
     }
@@ -133,26 +120,16 @@ public class BackendListener extends Abs
     private void initClass() {
         String name = getClassname().trim();
         try {
-            javaClass = Class.forName(name, false, 
Thread.currentThread().getContextClassLoader());
-            Method method = javaClass.getMethod("teardownTest", new 
Class[]{BackendListenerContext.class});
+            clientClass = Class.forName(name, false, 
Thread.currentThread().getContextClassLoader());
+            Method method = clientClass.getMethod("teardownTest", new 
Class[]{BackendListenerContext.class});
             isToBeRegistered = 
!method.getDeclaringClass().equals(AbstractBackendListenerClient.class);
-            log.info("Created class: " + name + ". Uses teardownTest: " + 
isToBeRegistered);
+            LOGGER.info("Created class: " + name + ". Uses teardownTest: " + 
isToBeRegistered);
         } catch (Exception e) {
-            log.error(whoAmI() + "\tException initialising: " + name, e);
+            LOGGER.error(whoAmI() + "\tException initialising: " + name, e);
         }   
     }
 
     /**
-     * Retrieves reference to BackendListenerClient.
-     *
-     * Convience method used to check for null reference without actually
-     * creating a BackendListenerClient
-     *
-     * @return reference to BackendListenerClient NOTUSED private 
BackendListenerClient
-     *         retrieveJavaClient() { return javaClient; }
-     */
-
-    /**
      * Generate a String identifier of this instance for debugging purposes.
      *
      * @return a String identifier for this sampler instance
@@ -168,37 +145,40 @@ public class BackendListener extends Abs
     }
 
     // TestStateListener implementation
-    /* Implements TestStateListener.testStarted() */
+    /**
+     *  Implements TestStateListener.testStarted() 
+     **/
     @Override
     public void testStarted() {
         testStarted("");
     }
 
-    /* Implements TestStateListener.testStarted(String) */
+    /** Implements TestStateListener.testStarted(String) 
+     **/
     @Override
     public void testStarted(String host) {
-        log.debug(whoAmI() + "\ttestStarted(" + host + ")");
+        if(LOGGER.isDebugEnabled()){
+            LOGGER.debug(whoAmI() + "\ttestStarted(" + host + ")");
+        }
         queue = new ArrayBlockingQueue<SampleResult>(getQueueSize()); 
         initClass();
-        queueWaits=0L;
-        queueWaitTime=0L;
-        log.info(getName()+":Starting worker with class:"+javaClass +" and 
queue capacity:"+getQueueSize());
-
-        backendListenerClient = createBackendListenerClientImpl(javaClass);
-        context = new 
BackendListenerContext((Arguments)getArguments().clone());
-        if(isToBeRegistered) {
-            TEAR_DOWN_SET.add(this);
-        }
+        queueWaits = new AtomicLong(0L);
+        queueWaitTime = new AtomicLong(0L);
+        LOGGER.info(getName()+":Starting worker with class:"+clientClass +" 
and queue capacity:"+getQueueSize());
+
+        backendListenerClient = createBackendListenerClientImpl(clientClass);
+        BackendListenerContext context = new 
BackendListenerContext((Arguments)getArguments().clone());
+        
         try {
             backendListenerClient.setupTest(context);
         } catch (Exception e) {
             throw new java.lang.IllegalStateException("Failed calling 
setupTest", e);
         }
 
-        Worker worker = new Worker(javaClass, backendListenerClient, 
(Arguments) getArguments().clone(), queue);
+        Worker worker = new Worker(backendListenerClient, (Arguments) 
getArguments().clone(), queue);
         worker.setDaemon(true);
         worker.start();
-        log.info(getName()+":Started  worker with class:"+javaClass);
+        LOGGER.info(getName()+": Started  worker with class:"+clientClass);
         
     }
 
@@ -206,30 +186,33 @@ public class BackendListener extends Abs
      * @see 
org.apache.jmeter.samplers.SampleListener#sampleOccurred(org.apache.jmeter.samplers.SampleEvent)
      */
     @Override
-    public void sampleOccurred(SampleEvent e) {
+    public void sampleOccurred(SampleEvent event) {
         Arguments args = getArguments();
-        context = new BackendListenerContext(args);
+        BackendListenerContext context = new BackendListenerContext(args);
 
-        SampleResult sr = backendListenerClient.createSampleResult(context, 
e.getResult());
+        SampleResult sr = backendListenerClient.createSampleResult(context, 
event.getResult());
         try {
             if (!queue.offer(sr)){ // we failed to add the element first time
-                queueWaits++;
+                queueWaits.incrementAndGet();
                 long t1 = System.nanoTime();
                 queue.put(sr);
                 long t2 = System.nanoTime();
-                queueWaitTime += t2-t1;
+                queueWaitTime.addAndGet(t2-t1);
             }
         } catch (Exception err) {
-            log.error("sampleOccurred, failed to queue the sample", err);
+            LOGGER.error("sampleOccurred, failed to queue the sample", err);
         }
     }
     
+    /**
+     * Thread that dequeus data from queue to send it to {@link 
BackendListenerClient}
+     */
     private static final class Worker extends Thread {
         
         private final BlockingQueue<SampleResult> queue;
         private final BackendListenerContext context;
         private final BackendListenerClient backendListenerClient;
-        private Worker(Class<?> javaClass, BackendListenerClient 
backendListenerClient, Arguments arguments, BlockingQueue<SampleResult> q){
+        private Worker(BackendListenerClient backendListenerClient, Arguments 
arguments, BlockingQueue<SampleResult> q){
             queue = q;
             // Allow BackendListenerClient implementations to get access to 
test element name
             arguments.addArgument(TestElement.NAME, getName()); 
@@ -240,37 +223,37 @@ public class BackendListener extends Abs
         
         @Override
         public void run() {
-            boolean isDebugEnabled = log.isDebugEnabled();
-            List<SampleResult> l = new ArrayList<SampleResult>(queue.size());
+            boolean isDebugEnabled = LOGGER.isDebugEnabled();
+            List<SampleResult> sampleResults = new 
ArrayList<SampleResult>(queue.size());
             try {
                 boolean eof = false;
                 while (!eof) {
                     if(isDebugEnabled) {
-                        log.debug("Thread:"+Thread.currentThread().getName()+" 
taking SampleResult from queue:"+queue.size());
+                        
LOGGER.debug("Thread:"+Thread.currentThread().getName()+" taking SampleResult 
from queue:"+queue.size());
                     }
                     SampleResult e = queue.take();
                     if(isDebugEnabled) {
-                        log.debug("Thread:"+Thread.currentThread().getName()+" 
took SampleResult:"+e+", isFinal:" + (e==FINAL_EVENT));
+                        
LOGGER.debug("Thread:"+Thread.currentThread().getName()+" took 
SampleResult:"+e+", isFinal:" + (e==FINAL_EVENT));
                     }
                     while (!(eof = (e == FINAL_EVENT)) && e != null ) { // try 
to process as many as possible
-                        l.add(e);
+                        sampleResults.add(e);
                         if(isDebugEnabled) {
-                            
log.debug("Thread:"+Thread.currentThread().getName()+" polling from 
queue:"+queue.size());
+                            
LOGGER.debug("Thread:"+Thread.currentThread().getName()+" polling from 
queue:"+queue.size());
                         }
                         e = queue.poll(); // returns null if nothing on queue 
currently
                         if(isDebugEnabled) {
-                            
log.debug("Thread:"+Thread.currentThread().getName()+" took from queue:"+e+", 
isFinal:" + (e==FINAL_EVENT));
+                            
LOGGER.debug("Thread:"+Thread.currentThread().getName()+" took from 
queue:"+e+", isFinal:" + (e==FINAL_EVENT));
                         }
                     }
                     if(isDebugEnabled) {
-                        log.debug("Thread:"+Thread.currentThread().getName()+
+                        
LOGGER.debug("Thread:"+Thread.currentThread().getName()+
                                 " exiting with FINAL EVENT:"+(e == FINAL_EVENT)
                                 +", null:" + (e==null));
                     }
-                    int size = l.size();
+                    int size = sampleResults.size();
                     if (size > 0) {
-                        backendListenerClient.handleSampleResults(l, context);
-                        l.clear();
+                        
backendListenerClient.handleSampleResults(sampleResults, context);
+                        sampleResults.clear();
                     }
                     if(!eof) {
                         LockSupport.parkNanos(100);
@@ -280,12 +263,12 @@ public class BackendListener extends Abs
                 // NOOP
             }
             // We may have been interrupted
-            int size = l.size();
+            int size = sampleResults.size();
             if (size > 0) {
-                backendListenerClient.handleSampleResults(l, context);
-                l.clear();
+                backendListenerClient.handleSampleResults(sampleResults, 
context);
+                sampleResults.clear();
             }
-            log.info("Worker ended");
+            LOGGER.info("Worker ended");
         }
     }
     
@@ -296,15 +279,15 @@ public class BackendListener extends Abs
      *
      * @return BackendListenerClient reference.
      */
-    static BackendListenerClient createBackendListenerClientImpl(Class<?> 
javaClass) {
-        if (javaClass == null) { // failed to initialise the class
+    static BackendListenerClient createBackendListenerClientImpl(Class<?> 
clientClass) {
+        if (clientClass == null) { // failed to initialise the class
             return new ErrorBackendListenerClient();
         }
         BackendListenerClient client;
         try {
-            client = (BackendListenerClient) javaClass.newInstance();
+            client = (BackendListenerClient) clientClass.newInstance();
         } catch (Exception e) {
-            log.error("Exception creating: " + javaClass, e);
+            LOGGER.error("Exception creating: " + clientClass, e);
             client = new ErrorBackendListenerClient();
         }
         return client;
@@ -322,27 +305,22 @@ public class BackendListener extends Abs
         try {
             queue.put(FINAL_EVENT);
         } catch (Exception ex) {
-            log.warn("testEnded() with exception:"+ex.getMessage(), ex);
+            LOGGER.warn("testEnded() with exception:"+ex.getMessage(), ex);
         }
-        if (queueWaits > 0) {
-            log.warn("QueueWaits: "+queueWaits+"; QueueWaitTime: 
"+queueWaitTime+" (nanoseconds), you may need to increase queue capacity, see 
property 'backend_queue_capacity'");            
+        if (queueWaits.get() > 0) {
+            LOGGER.warn("QueueWaits: "+queueWaits+"; QueueWaitTime: 
"+queueWaitTime+" (nanoseconds), you may need to increase queue capacity, see 
property 'backend_queue_capacity'");            
         }
-        synchronized (TEAR_DOWN_SET) {
-            for (BackendListener backendListener : TEAR_DOWN_SET) {
-                BackendListenerClient client = 
backendListener.backendListenerClient;
-                if (client != null) {
-                    try {
-                        client.teardownTest(backendListener.context);
-                    } catch (Exception e) {
-                        throw new java.lang.IllegalStateException("Failed 
calling teardownTest", e);
-                    }
-                }
-            }
-            TEAR_DOWN_SET.clear();
+        
+        try {
+            BackendListenerContext context = new 
BackendListenerContext(getArguments());
+            backendListenerClient.teardownTest(context);
+        } catch (Exception e) {
+            throw new java.lang.IllegalStateException("Failed calling 
teardownTest", e);
         }
     }
 
-    /* Implements TestStateListener.testEnded(String) */
+    /** Implements TestStateListener.testEnded(String) 
+     **/
     @Override
     public void testEnded(String host) {
         testEnded();
@@ -363,7 +341,7 @@ public class BackendListener extends Abs
          */
         @Override
         public void handleSampleResults(List<SampleResult> sampleResults, 
BackendListenerContext context) {
-            log.warn("ErrorBackendListenerClient#handleSampleResult called, 
noop");
+            LOGGER.warn("ErrorBackendListenerClient#handleSampleResult called, 
noop");
             Thread.yield();
         }
     }


Reply via email to