Author: hiranya
Date: Sat Aug  3 00:37:18 2013
New Revision: 1509920

URL: http://svn.apache.org/r1509920
Log:
Adding a JMX MBean to monitor the synapse call back store - SYNAPSE-528

Added:
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackStoreView.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackStoreViewMBean.java
Modified:
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Axis2SynapseController.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/ServerContextInformation.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AnonymousServiceFactory.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Axis2SynapseController.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Axis2SynapseController.java?rev=1509920&r1=1509919&r2=1509920&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Axis2SynapseController.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Axis2SynapseController.java
 Sat Aug  3 00:37:18 2013
@@ -348,6 +348,9 @@ public class Axis2SynapseController impl
         } catch (IOException e) {
             log.error("Error while initializing SNMP", e);
         }
+
+        
SynapseCallbackReceiver.getInstance().init(serverContextInformation.getSynapseConfiguration(),
+                serverContextInformation);
     }
 
     /**
@@ -459,6 +462,8 @@ public class Axis2SynapseController impl
                     }
                 }
             }
+
+            SynapseCallbackReceiver.getInstance().destroy();
         } catch (AxisFault e) {
             log.error("Error stopping the Axis2 Environment");
         }
@@ -621,7 +626,7 @@ public class Axis2SynapseController impl
             }
             int pendingTransportThreads = pendingListenerThreads + 
pendingSenderThreads;
 
-            int pendingCallbacks = serverContextInformation.getCallbackCount();
+            int pendingCallbacks = 
SynapseCallbackReceiver.getInstance().getCallbackCount();
             if (pendingCallbacks > 0) {
                 log.info("Waiting for: " + pendingCallbacks + " 
callbacks/replies..");
             }

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/ServerContextInformation.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/ServerContextInformation.java?rev=1509920&r1=1509919&r2=1509920&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/ServerContextInformation.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/ServerContextInformation.java
 Sat Aug  3 00:37:18 2013
@@ -38,8 +38,6 @@ public class ServerContextInformation {
     private SynapseConfiguration synapseConfiguration;
     /* Keeps the SynapseEnvironment */
     private SynapseEnvironment synapseEnvironment;
-    /** Callback receiver */    
-    private SynapseCallbackReceiver synapseCallbackReceiver;
     /** State of the server */
     private ServerState serverState = ServerState.UNDETERMINED;
     /** Reference to the server configuration */
@@ -95,27 +93,7 @@ public class ServerContextInformation {
         this.synapseEnvironment = synapseEnvironment;
     }
 
-    public SynapseCallbackReceiver getSynapseCallbackReceiver() {
-        return synapseCallbackReceiver;
-    }
-
-    public void setSynapseCallbackReceiver(SynapseCallbackReceiver 
synapseCallbackReceiver) {
-        this.synapseCallbackReceiver = synapseCallbackReceiver;
-    }
-
     public ServerConfigurationInformation getServerConfigurationInformation() {
         return serverConfigurationInformation;
     }
-
-    /**
-     * Returns the number of current callbacks.
-     *
-     * @return the number of current callbacks.
-     */
-    public int getCallbackCount() {
-        if (synapseCallbackReceiver != null) {
-            return synapseCallbackReceiver.getCallbackCount();
-        }
-        return 0;
-    }
 }

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AnonymousServiceFactory.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AnonymousServiceFactory.java?rev=1509920&r1=1509919&r2=1509920&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AnonymousServiceFactory.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AnonymousServiceFactory.java
 Sat Aug  3 00:37:18 2013
@@ -27,7 +27,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.synapse.SynapseConstants;
 import org.apache.synapse.SynapseException;
-import org.apache.synapse.ServerContextInformation;
 import org.apache.synapse.config.SynapseConfiguration;
 
 import javax.xml.namespace.QName;
@@ -50,8 +49,6 @@ public class AnonymousServiceFactory {
     public static final String OUT_IN_OPERATION   = "anonOutInOp";
     public static final String OUT_ONLY_OPERATION = "anonOutonlyOp";
 
-    private static SynapseCallbackReceiver synapseCallbackReceiver = null;
-
     /**
      * Creates an AxisService for the requested QoS for sending out messages
      * Callers must guarantee that if wsRMon or wsSecOn is required, that 
wsAddrOn is also set
@@ -161,7 +158,7 @@ public class AnonymousServiceFactory {
         try {
             DynamicAxisOperation dynamicOperation =
                 new DynamicAxisOperation(new QName(OUT_IN_OPERATION));
-            dynamicOperation.setMessageReceiver(getCallbackReceiver(synCfg, 
axisCfg));
+            
dynamicOperation.setMessageReceiver(SynapseCallbackReceiver.getInstance());
             AxisMessage inMsg = new AxisMessage();
             inMsg.setName("in-message");
             inMsg.setParent(dynamicOperation);
@@ -173,7 +170,7 @@ public class AnonymousServiceFactory {
 
             OutOnlyAxisOperation asyncOperation =
                 new OutOnlyAxisOperation(new QName(OUT_ONLY_OPERATION));
-            asyncOperation.setMessageReceiver(getCallbackReceiver(synCfg, 
axisCfg));
+            
asyncOperation.setMessageReceiver(SynapseCallbackReceiver.getInstance());
             AxisMessage outOnlyMsg = new AxisMessage();
             outOnlyMsg.setName("out-message");
             outOnlyMsg.setParent(asyncOperation);
@@ -193,39 +190,9 @@ public class AnonymousServiceFactory {
 
         } catch (AxisFault e) {
             handleException(
-                "Error occured while creating an anonymous service for QoS : " 
+
+                "Error occurred while creating an anonymous service for QoS : 
" +
                  serviceKey, e);
         }
         return null;
     }
-
-    /**
-     * Create a single callback receiver if required, and return its reference
-     * @param synCfg the Synapse configuration
-     * @param axisCfg axis configuration
-     * @return the callback receiver thats created or now exists
-     */
-    private static synchronized SynapseCallbackReceiver getCallbackReceiver(
-            SynapseConfiguration synCfg, AxisConfiguration axisCfg) {
-
-        if (synapseCallbackReceiver == null) {
-            Parameter serverCtxParam =
-                    axisCfg.getParameter(
-                            SynapseConstants.SYNAPSE_SERVER_CTX_INFO);
-            if (serverCtxParam == null ||
-                    !(serverCtxParam.getValue() instanceof 
ServerContextInformation)) {
-                String msg = "ServerContextInformation not found";
-                log.error(msg);
-                throw new SynapseException(msg);
-            }
-
-            ServerContextInformation contextInformation =
-                    (ServerContextInformation) serverCtxParam.getValue();
-
-            synapseCallbackReceiver = new SynapseCallbackReceiver(synCfg, 
contextInformation);
-
-            
contextInformation.setSynapseCallbackReceiver(synapseCallbackReceiver);
-        }
-        return synapseCallbackReceiver;
-    }
 }

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java?rev=1509920&r1=1509919&r2=1509920&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java
 Sat Aug  3 00:37:18 2013
@@ -40,6 +40,7 @@ import org.apache.synapse.SynapseExcepti
 import org.apache.synapse.ServerContextInformation;
 import org.apache.synapse.aspects.statistics.ErrorLogFactory;
 import org.apache.synapse.aspects.statistics.StatisticsReporter;
+import org.apache.synapse.commons.jmx.MBeanRegistrar;
 import org.apache.synapse.config.SynapseConfigUtils;
 import org.apache.synapse.config.SynapseConfiguration;
 import org.apache.synapse.endpoints.Endpoint;
@@ -61,19 +62,47 @@ public class SynapseCallbackReceiver imp
 
     private static final Log log = 
LogFactory.getLog(SynapseCallbackReceiver.class);
 
+    private static final String CALLBACK_STORE_CATEGORY = 
"SynapseCallbackStore";
+    private static final String CALLBACK_STORE_NAME = "SynapseCallbackStore";
+
+    private static final SynapseCallbackReceiver instance = new 
SynapseCallbackReceiver();
+
     /** This is the synchronized callbackStore that maps outgoing messageID's 
to callback objects */
     private final Map<String, AxisCallback> callbackStore;  // will be made 
thread safe in the constructor
 
+    private boolean initialized = false;
+
+    private SynapseCallbackReceiver() {
+        callbackStore = Collections.synchronizedMap(new HashMap<String, 
AxisCallback>());
+    }
+
+    /**
+     * Get the singleton SynapseCallbackReceiver instance
+     *
+     * @return A SynapseCallbackReceiver
+     */
+    public static SynapseCallbackReceiver getInstance() {
+        return instance;
+    }
+
     /**
-     * Create the *single* instance of this class that would be used by all 
anonymous services
+     * Initialize the singleton instance of this class that would be used by 
all anonymous services
      * used for outgoing messaging.
+     *
      * @param synCfg the Synapse configuration
      * @param contextInformation server runtime information
      */
-    public SynapseCallbackReceiver(SynapseConfiguration synCfg,
+    public void init(SynapseConfiguration synCfg,
                                    ServerContextInformation 
contextInformation) {
 
-        callbackStore = Collections.synchronizedMap(new HashMap<String, 
AxisCallback>());
+        if (initialized) {
+            log.warn("Attempted to re-initialize SynapseCallbackReceiver");
+            return;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Initializing SynapseCallbackReceiver");
+        }
 
         // create the Timer object and a TimeoutHandler task
         TimeoutHandler timeoutHandler = new TimeoutHandler(callbackStore, 
contextInformation);
@@ -83,12 +112,45 @@ public class SynapseCallbackReceiver imp
 
         // schedule timeout handler to run every n seconds (n : specified or 
defaults to 15s)
         timeOutTimer.schedule(timeoutHandler, 0, timeoutHandlerInterval);
+
+        MBeanRegistrar.getInstance().registerMBean(new 
SynapseCallbackStoreView(this),
+                CALLBACK_STORE_CATEGORY, CALLBACK_STORE_NAME);
+        initialized = true;
+    }
+
+    /**
+     * Destroy and cleanup this callback receiver instance
+     */
+    public void destroy() {
+        if (!initialized) {
+            log.warn("Attempted to destroy uninitialized 
SynapseCallbackReceiver");
+            return;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Destroying SynapseCallbackReceiver");
+        }
+        MBeanRegistrar.getInstance().unRegisterMBean(CALLBACK_STORE_CATEGORY,
+                CALLBACK_STORE_NAME);
+        initialized = false;
     }
 
     public int getCallbackCount() {
         return callbackStore.size();
     }
 
+    public String[] getPendingCallbacks() {
+        Set<String> keys = callbackStore.keySet();
+        List<String> list = new ArrayList<String>();
+        synchronized (callbackStore) {
+            Iterator<String> iterator = keys.iterator();
+            while (iterator.hasNext()) {
+                list.add(iterator.next());
+            }
+        }
+        return list.toArray(new String[list.size()]);
+    }
+
     public void addCallback(String MsgID, AxisCallback callback) {
         callbackStore.put(MsgID, callback);
         if (log.isDebugEnabled()) {

Added: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackStoreView.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackStoreView.java?rev=1509920&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackStoreView.java
 (added)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackStoreView.java
 Sat Aug  3 00:37:18 2013
@@ -0,0 +1,18 @@
+package org.apache.synapse.core.axis2;
+
+public class SynapseCallbackStoreView implements SynapseCallbackStoreViewMBean 
{
+
+    private SynapseCallbackReceiver receiver;
+
+    public SynapseCallbackStoreView(SynapseCallbackReceiver receiver) {
+        this.receiver = receiver;
+    }
+
+    public int getCallbackCount() {
+        return receiver.getCallbackCount();
+    }
+
+    public String[] getPendingCallbacks() {
+        return receiver.getPendingCallbacks();
+    }
+}

Added: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackStoreViewMBean.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackStoreViewMBean.java?rev=1509920&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackStoreViewMBean.java
 (added)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackStoreViewMBean.java
 Sat Aug  3 00:37:18 2013
@@ -0,0 +1,22 @@
+package org.apache.synapse.core.axis2;
+
+/**
+ * JMX MBean interface for monitoring the Synapse callback store.
+ */
+public interface SynapseCallbackStoreViewMBean {
+
+    /**
+     * Get the number of pending callbacks in Synapse callback store
+     *
+     * @return An integer
+     */
+    public int getCallbackCount();
+
+    /**
+     * Get the IDs (message IDs) of the pending callbacks in Synapse callback 
store
+     *
+     * @return An array of strings
+     */
+    public String[] getPendingCallbacks();
+
+}


Reply via email to