Author: hiranya
Date: Fri May 6 09:34:52 2011
New Revision: 1100149
URL: http://svn.apache.org/viewvc?rev=1100149&view=rev
Log:
Improving endpoint JMX stats
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/AbstractEndpoint.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointContext.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointView.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointViewMBean.java
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/AbstractEndpoint.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/AbstractEndpoint.java?rev=1100149&r1=1100148&r2=1100149&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/AbstractEndpoint.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/AbstractEndpoint.java
Fri May 6 09:34:52 2011
@@ -184,7 +184,8 @@ public abstract class AbstractEndpoint e
isClusteringEnabled = Boolean.FALSE;
}
- context = new EndpointContext(getName(), getDefinition(),
isClusteringEnabled, cc);
+ context = new EndpointContext(getName(), getDefinition(),
isClusteringEnabled,
+ cc, metricsMBean);
}
initialized = true;
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointContext.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointContext.java?rev=1100149&r1=1100148&r2=1100149&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointContext.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointContext.java
Fri May 6 09:34:52 2011
@@ -25,6 +25,7 @@ import org.apache.synapse.SynapseConstan
import org.apache.synapse.SynapseException;
import org.apache.synapse.util.Replicator;
+import java.util.Calendar;
import java.util.Date;
/**
@@ -52,7 +53,7 @@ public class EndpointContext {
*/
public static final int ST_SUSPENDED = 3;
/**
- * An endpoint manually switched off into maintenence -
+ * An endpoint manually switched off into maintenance -
* it will never change state automatically
*/
public static final int ST_OFF = 4;
@@ -75,6 +76,9 @@ public class EndpointContext {
/** The endpoint definition that holds static endpoint information */
private EndpointDefinition definition = null;
+ /** Metrics bean to notify the state changes */
+ private EndpointView metricsBean = null;
+
// for clustered mode operation, keys pre-computed and used for replication
private final String STATE_KEY;
private final String NEXT_RETRY_TIME_KEY;
@@ -90,7 +94,7 @@ public class EndpointContext {
* @param cfgCtx the Axis2 configurationContext for clustering
*/
public EndpointContext(String endpointName, EndpointDefinition
endpointDefinition,
- boolean clustered, ConfigurationContext cfgCtx) {
+ boolean clustered, ConfigurationContext cfgCtx,
EndpointView metricsBean) {
if (clustered) {
if (endpointName == null) {
@@ -111,6 +115,8 @@ public class EndpointContext {
this.endpointName = endpointDefinition.toString();
}
+ this.metricsBean = metricsBean;
+
STATE_KEY = KEY_PREFIX + endpointName + STATE;
NEXT_RETRY_TIME_KEY = KEY_PREFIX + endpointName + NEXT_RETRY_TIME;
REMAINING_RETRIES_KEY = KEY_PREFIX + endpointName + REMAINING_RETRIES;
@@ -123,6 +129,39 @@ public class EndpointContext {
}
}
+ private void recordStatistics(int state) {
+ if (metricsBean == null) {
+ return;
+ }
+
+ switch (state) {
+ case ST_ACTIVE:
+ metricsBean.resetConsecutiveSuspensions();
+ metricsBean.resetConsecutiveTimeouts();
+ metricsBean.setSuspendedAt(null);
+ metricsBean.setTimedoutAt(null);
+ break;
+
+ case ST_TIMEOUT:
+ metricsBean.resetConsecutiveSuspensions();
+ metricsBean.incrementTimeouts();
+ if (localState != ST_TIMEOUT) {
+
metricsBean.setTimedoutAt(Calendar.getInstance().getTime());
+ metricsBean.setSuspendedAt(null);
+ }
+ break;
+
+ case ST_SUSPENDED:
+ metricsBean.resetConsecutiveTimeouts();
+ metricsBean.incrementSuspensions();
+ if (localState != ST_SUSPENDED) {
+
metricsBean.setSuspendedAt(Calendar.getInstance().getTime());
+ metricsBean.setTimedoutAt(null);
+ }
+ break;
+ }
+ }
+
/**
* Update the internal state of the endpoint
*
@@ -130,6 +169,8 @@ public class EndpointContext {
*/
private void setState(int state) {
+ recordStatistics(state);
+
if (isClustered) {
Replicator.setAndReplicateState(STATE_KEY, state, cfgCtx);
switch (state) {
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointView.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointView.java?rev=1100149&r1=1100148&r2=1100149&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointView.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointView.java
Fri May 6 09:34:52 2011
@@ -23,9 +23,11 @@ import org.apache.axis2.transport.base.M
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* This class is the metrics collector and JMX control point for Endpoints
@@ -58,6 +60,16 @@ public class EndpointView implements End
private long minSizeSent;
private long maxSizeSent;
private double avgSizeSent;
+
+ private int consecutiveSuspensions;
+ private int consecutiveTimeouts;
+ private int totalSuspensions;
+ private int totalTimeouts;
+ private AtomicInteger suspensions = new AtomicInteger(0);
+ private AtomicInteger timeouts = new AtomicInteger(0);
+ private Date suspendedAt;
+ private Date timedoutAt;
+
private final Map<Integer, Long> sendingFaultTable =
Collections.synchronizedMap(new HashMap<Integer, Long>());
@@ -66,6 +78,10 @@ public class EndpointView implements End
private long lastResetTime = System.currentTimeMillis();
+ private ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1);
+ private Queue<Integer> suspensionCounts = new LinkedList<Integer>();
+ private Queue<Integer> timeoutCounts = new LinkedList<Integer>();
+
/**
* Create a new MBean to manage the given endpoint
* @param endpointName the name of the endpoint
@@ -74,6 +90,23 @@ public class EndpointView implements End
public EndpointView(String endpointName, Endpoint endpoint) {
this.endpointName = endpointName;
this.endpoint = endpoint;
+ scheduler.scheduleAtFixedRate(new Runnable() {
+ public void run() {
+ if (suspensionCounts.size() == 15) {
+ suspensionCounts.remove();
+ }
+ suspensionCounts.offer(suspensions.getAndSet(0));
+
+ if (timeoutCounts.size() == 15) {
+ timeoutCounts.remove();
+ }
+ timeoutCounts.offer(timeouts.getAndSet(0));
+ }
+ }, 60, 60, TimeUnit.SECONDS);
+ }
+
+ public void destroy() {
+ scheduler.shutdownNow();
}
// --- endpoint control ---
@@ -276,6 +309,98 @@ public class EndpointView implements End
}
}
+ public Date getSuspendedAt() {
+ return suspendedAt;
+ }
+
+ public void setSuspendedAt(Date suspendedAt) {
+ this.suspendedAt = suspendedAt;
+ }
+
+ public Date getTimedoutAt() {
+ return timedoutAt;
+ }
+
+ public void setTimedoutAt(Date timedoutAt) {
+ this.timedoutAt = timedoutAt;
+ }
+
+ public int getConsecutiveEndpointSuspensions() {
+ return consecutiveSuspensions;
+ }
+
+ public void incrementSuspensions() {
+ consecutiveSuspensions++;
+ totalSuspensions++;
+ suspensions.incrementAndGet();
+ }
+
+ public void resetConsecutiveSuspensions() {
+ consecutiveSuspensions = 0;
+ }
+
+ public int getConsecutiveEndpointTimeouts() {
+ return consecutiveTimeouts;
+ }
+
+ public void incrementTimeouts() {
+ consecutiveTimeouts++;
+ totalTimeouts++;
+ timeouts.incrementAndGet();
+ }
+
+ public void resetConsecutiveTimeouts() {
+ consecutiveTimeouts = 0;
+ }
+
+ public int getTotalEndpointSuspensions() {
+ return totalSuspensions;
+ }
+
+ public int getTotalEndpointTimeouts() {
+ return totalTimeouts;
+ }
+
+ public int getLastMinuteEndpointSuspensions() {
+ return getTotal(suspensionCounts, 1);
+ }
+
+ public int getLast5MinuteEndpointSuspensions() {
+ return getTotal(suspensionCounts, 5);
+ }
+
+ public int getLast15MinuteEndpointSuspensions() {
+ return getTotal(suspensionCounts, 15);
+ }
+
+ public int getLastMinuteEndpointTimeouts() {
+ return getTotal(timeoutCounts, 1);
+ }
+
+ public int getLast5MinuteEndpointTimeouts() {
+ return getTotal(timeoutCounts, 5);
+ }
+
+ public int getLast15MinuteEndpointTimeouts() {
+ return getTotal(timeoutCounts, 15);
+ }
+
+ private int getTotal(Queue<Integer> queue, int count) {
+ int sum = 0;
+ Integer[] array = queue.toArray(new Integer[queue.size()]);
+
+ if (count > array.length) {
+ for (int i = 0; i < array.length; i++) {
+ sum +=array[i];
+ }
+ } else {
+ for (int i = 0; i < count; i++) {
+ sum += array[array.length - 1 - i];
+ }
+ }
+ return sum;
+ }
+
/**
* Number of messages (ie replies) received
* @return # of messages (replies) received
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointViewMBean.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointViewMBean.java?rev=1100149&r1=1100148&r2=1100149&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointViewMBean.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointViewMBean.java
Fri May 6 09:34:52 2011
@@ -20,6 +20,7 @@
package org.apache.synapse.endpoints;
import java.util.Map;
+import java.util.Date;
public interface EndpointViewMBean {
@@ -43,6 +44,19 @@ public interface EndpointViewMBean {
public Map getSendingFaultTable();
public Map getResponseCodeTable();
+ public Date getSuspendedAt();
+ public Date getTimedoutAt();
+ public int getConsecutiveEndpointSuspensions();
+ public int getConsecutiveEndpointTimeouts();
+ public int getTotalEndpointSuspensions();
+ public int getTotalEndpointTimeouts();
+ public int getLastMinuteEndpointSuspensions();
+ public int getLast5MinuteEndpointSuspensions();
+ public int getLast15MinuteEndpointSuspensions();
+ public int getLastMinuteEndpointTimeouts();
+ public int getLast5MinuteEndpointTimeouts();
+ public int getLast15MinuteEndpointTimeouts();
+
// JMX Operations
public void switchOn() throws Exception;
public void switchOff() throws Exception;