Author: hiranya
Date: Fri Jul 16 08:58:18 2010
New Revision: 964719

URL: http://svn.apache.org/viewvc?rev=964719&view=rev
Log:
Adding the latency view JMX MBean to the Synapse NHTTP transport. This new 
MBean approximates and displays the latency incurred by the NHTTP transport 
when mediating messages back and forth (SYNAPSE-669)


Added:
    
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/util/LatencyView.java
    
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/util/LatencyViewMBean.java
Modified:
    
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java
    
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
    
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java

Modified: 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java?rev=964719&r1=964718&r2=964719&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java
 (original)
+++ 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java
 Fri Jul 16 08:58:18 2010
@@ -24,6 +24,7 @@ import org.apache.axiom.soap.SOAPEnvelop
 import org.apache.axiom.soap.impl.llom.soap11.SOAP11Factory;
 import org.apache.axiom.soap.impl.llom.soap12.SOAP12Factory;
 import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
 import org.apache.axis2.addressing.AddressingConstants;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
@@ -237,6 +238,8 @@ public class ClientHandler implements NH
             context.setAttribute(NhttpConstants.ENDPOINT_PREFIX, 
axis2Req.getEndpointURLPrefix());
             context.setAttribute(NhttpConstants.HTTP_REQ_METHOD, 
request.getRequestLine().getMethod());
             context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
+            setServerContextAttribute(NhttpConstants.REQ_DEPARTURE_TIME,
+                        System.currentTimeMillis(), conn);
             conn.submitRequest(request);
         } catch (ConnectionClosedException e) {
             throw e;
@@ -506,6 +509,8 @@ public class ClientHandler implements NH
             }
 
             if (decoder.isCompleted()) {
+                setServerContextAttribute(NhttpConstants.RES_ARRIVAL_TIME,
+                        System.currentTimeMillis(), conn);
                 ClientConnectionDebug ccd = (ClientConnectionDebug)
                         
conn.getContext().getAttribute(CLIENT_CONNECTION_DEBUG);
                 if (ccd != null) {
@@ -1066,4 +1071,15 @@ public class ClientHandler implements NH
         }
         return null;
     }
+
+    private void setServerContextAttribute(String key, Object value, 
NHttpClientConnection conn) {
+        MessageContext msgCtx = getMessageContext(conn);
+        if (msgCtx != null) {
+            Object outTransport = 
msgCtx.getProperty(Constants.OUT_TRANSPORT_INFO);
+            if (outTransport != null && outTransport instanceof ServerWorker) {
+                HttpContext context = ((ServerWorker) 
outTransport).getConn().getContext();
+                context.setAttribute(key, value);
+            }
+        }
+    }
 }

Modified: 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java?rev=964719&r1=964718&r2=964719&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
 (original)
+++ 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
 Fri Jul 16 08:58:18 2010
@@ -109,4 +109,9 @@ public class NhttpConstants {
     public static final String OPEN_CONNNECTIONS_MAP = "OPEN_CONNNECTIONS_MAP";
     /** Configuration in nhttp.properties file for enable connection counting 
*/
     public static final String COUNT_CONNECTIONS = "http.count.connections";
+
+    public static final String REQ_ARRIVAL_TIME = "REQ_ARRIVAL_TIME";
+    public static final String REQ_DEPARTURE_TIME = "REQ_DEPARTURE_TIME";
+    public static final String RES_ARRIVAL_TIME = "RES_ARRIVAL_TIME";
+    public static final String RES_DEPARTURE_TIME = "RES_DEPARTURE_TIME";
 }

Modified: 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java?rev=964719&r1=964718&r2=964719&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
 (original)
+++ 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
 Fri Jul 16 08:58:18 2010
@@ -49,6 +49,7 @@ import org.apache.synapse.commons.evalua
 import org.apache.synapse.commons.evaluators.EvaluatorContext;
 import org.apache.synapse.commons.executors.PriorityExecutor;
 import org.apache.synapse.transport.nhttp.debug.ServerConnectionDebug;
+import org.apache.synapse.transport.nhttp.util.LatencyView;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -97,6 +98,8 @@ public class ServerHandler implements NH
 
     private PriorityExecutor executor = null;
 
+    private LatencyView latencyView = null;
+
     public static final String REQUEST_SINK_BUFFER = 
"synapse.request-sink-buffer";
     public static final String RESPONSE_SOURCE_BUFFER = 
"synapse.response-source-buffer";
     public static final String CONNECTION_CREATION_TIME = 
"synapse.connectionCreationTime";
@@ -115,6 +118,7 @@ public class ServerHandler implements NH
         this.connStrategy = new DefaultConnectionReuseStrategy();
         this.allocator = new HeapByteBufferAllocator();
         this.activeConnections = new ArrayList<NHttpServerConnection>();
+        this.latencyView = new LatencyView(isHttps);
 
         this.cfg = NHttpConfiguration.getInstance();
        if (executor == null)  {
@@ -137,6 +141,7 @@ public class ServerHandler implements NH
     public void requestReceived(final NHttpServerConnection conn) {
 
         HttpContext context = conn.getContext();
+        context.setAttribute(NhttpConstants.REQ_ARRIVAL_TIME, 
System.currentTimeMillis());
         HttpRequest request = conn.getHttpRequest();
         context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
         context.setAttribute(NhttpConstants.MESSAGE_IN_FLIGHT, "true");
@@ -269,6 +274,21 @@ public class ServerHandler implements NH
             }
 
             if (encoder.isCompleted()) {
+
+                if (context.getAttribute(NhttpConstants.REQ_ARRIVAL_TIME) != 
null &&
+                    context.getAttribute(NhttpConstants.REQ_DEPARTURE_TIME) != 
null &&
+                    context.getAttribute(NhttpConstants.RES_ARRIVAL_TIME) != 
null) {
+
+                    latencyView.notifyTimes(
+                        (Long) 
context.getAttribute(NhttpConstants.REQ_ARRIVAL_TIME),
+                        (Long) 
context.getAttribute(NhttpConstants.REQ_DEPARTURE_TIME),
+                        (Long) 
context.getAttribute(NhttpConstants.RES_ARRIVAL_TIME),
+                        System.currentTimeMillis());
+                }
+
+                context.removeAttribute(NhttpConstants.REQ_ARRIVAL_TIME);
+                context.removeAttribute(NhttpConstants.REQ_DEPARTURE_TIME);
+                context.removeAttribute(NhttpConstants.RES_ARRIVAL_TIME);
                 
                 ((ServerConnectionDebug) conn.getContext().getAttribute(
                         
SERVER_CONNECTION_DEBUG)).recordResponseCompletionTime();
@@ -551,6 +571,8 @@ public class ServerHandler implements NH
     }
 
     public void stop() {
+        latencyView.destroy();
+
         try {
             if (workerPool != null) {
                 workerPool.shutdown(1000);

Added: 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/util/LatencyView.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/util/LatencyView.java?rev=964719&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/util/LatencyView.java
 (added)
+++ 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/util/LatencyView.java
 Fri Jul 16 08:58:18 2010
@@ -0,0 +1,261 @@
+/*
+*  Licensed to the Apache Software Foundation (ASF) under one
+*  or more contributor license agreements.  See the NOTICE file
+*  distributed with this work for additional information
+*  regarding copyright ownership.  The ASF licenses this file
+*  to you under the Apache License, Version 2.0 (the
+*  "License"); you may not use this file except in compliance
+*  with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+*  Unless required by applicable law or agreed to in writing,
+*  software distributed under the License is distributed on an
+*   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+*  KIND, either express or implied.  See the License for the
+*  specific language governing permissions and limitations
+*  under the License.
+*/
+
+package org.apache.synapse.transport.nhttp.util;
+
+import org.apache.synapse.commons.jmx.MBeanRegistrar;
+
+import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.*;
+import java.util.Queue;
+import java.util.Date;
+import java.util.Calendar;
+
+/**
+ * <p>LatencyView provides statistical information related to the latency 
(overhead) incurred by
+ * the Synapse NHTTP transport, when mediating messages back and forth. 
Statistics are available
+ * under two main categories, namely short term data and long term data. Short 
term data is
+ * statistical information related to the last 15 minutes of execution and 
these metrics are
+ * updated every 5 seconds. Long term data is related to the last 24 hours of 
execution and
+ * they are updated every 5 minutes. Two timer tasks and a single threaded 
scheduled executor
+ * is used to perform these periodic calculations.</p>
+ *
+ * <p>Latency calculation for a single invocation is carried out by taking 
timestamps on
+ * following events:</p>
+ *
+ * <ul>
+ *  <li>t1 - Receiving a new request (ServerHandler#requestReceived)</li>
+ *  <li>t2 - Obtaining a connection to forward the request 
(Clienthandler#processConnection)</li>
+ *  <li>t3 - Reading the complete response from the backend server 
(ClientHandler#inputReady)</li>
+ *  <li>t4 - Writing the complete response to the client 
(ServerHandler#outputReady)</li>
+ * <ul>
+ *
+ * <p>Having taken these timestamps, the latency for the invocation is 
calculated as follows:<br/>
+ *    Latency = (t4 - t1) - (t3 - t2)
+ * </p>
+ *
+ */
+public class LatencyView implements LatencyViewMBean {
+
+    private static final String NHTTP_LATENCY_VIEW = "NHTTPLatencyView";
+
+    private static final int SMALL_DATA_COLLECTION_PERIOD = 5;
+    private static final int LARGE_DATA_COLLECTION_PERIOD = 5 * 60;
+    private static final int SAMPLES_PER_MINUTE = 60/ 
SMALL_DATA_COLLECTION_PERIOD;
+    private static final int SAMPLES_PER_HOUR = (60 * 
60)/LARGE_DATA_COLLECTION_PERIOD;
+
+    /** Keeps track of th last reported latency value */
+    private AtomicLong lastLatency = new AtomicLong(0);
+
+    /**
+     * Queue of all latency values reported. The short term data collector 
clears this queue up
+     * time to time thus ensuring it doesn't grow indefinitely.
+     */
+    private Queue<Long> latencyDataQueue = new ConcurrentLinkedQueue<Long>();
+
+    /**
+     * Queue of samples collected by the short term data collector. This is 
maintained
+     * as a fixed length queue
+     */
+    private Queue<Long> shortTermLatencyDataQueue = new LinkedList<Long>();
+
+    /**
+     * Queue of samples collected by the long term data collector. This is 
maintained
+     * as a fixed length queue
+     */
+    private Queue<Long> longTermLatencyDataQueue = new LinkedList<Long>();
+
+    /** Scheduled executor on which data collectors are executed */
+    private ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor();
+
+    private double allTimeAvgLatency = 0.0;
+    private int count = 0;
+    private Date resetTime = Calendar.getInstance().getTime();
+
+    private String name;
+
+    public LatencyView(boolean isHttps) {
+        name = "nio-http" + (isHttps ? "s" : "");
+        Runnable shortTermDataCollector = getShortTermDataCollector();
+        Runnable longTermDataCollector = getLongTermDataCollector();
+        scheduler.scheduleAtFixedRate(shortTermDataCollector, 
SMALL_DATA_COLLECTION_PERIOD,
+                SMALL_DATA_COLLECTION_PERIOD, TimeUnit.SECONDS);
+        scheduler.scheduleAtFixedRate(longTermDataCollector, 
LARGE_DATA_COLLECTION_PERIOD,
+                LARGE_DATA_COLLECTION_PERIOD, TimeUnit.SECONDS);
+        MBeanRegistrar.getInstance().registerMBean(this, NHTTP_LATENCY_VIEW, 
name);
+    }
+
+    public void destroy() {
+        MBeanRegistrar.getInstance().unRegisterMBean(NHTTP_LATENCY_VIEW, name);
+        scheduler.shutdownNow();
+    }
+
+    /**
+     * Report the timestamp values captured during mediating messages back and 
forth
+     *
+     * @param reqArrival The request arrival time
+     * @param reqDeparture The request departure time (backend connection 
establishment)
+     * @param resArrival The resoponse arrival time
+     * @param resDeparture The response departure time
+     */
+    public void notifyTimes(long reqArrival, long reqDeparture,
+                            long resArrival, long resDeparture) {
+
+        long latency = (resDeparture - reqArrival) - (resArrival - 
reqDeparture);
+        lastLatency.set(latency);
+        latencyDataQueue.offer(latency);
+    }
+
+    public double getAllTimeAvgLatency() {
+        return allTimeAvgLatency;
+    }
+
+    public double getLastMinuteAvgLatency() {
+        return getAverageLatencyByMinute(1);
+    }
+
+    public double getLast5MinuteAvgLatency() {
+        return getAverageLatencyByMinute(5);
+    }
+
+    public double getLast15MinuteAvgLatency() {
+        return getAverageLatencyByMinute(15);
+    }
+
+    public double getLastHourAvgLatency() {
+        return getAverageLatencyByHour(1);
+    }
+
+    public double getLast8HourAvgLatency() {
+        return getAverageLatencyByHour(8);
+    }
+
+    public double getLast24HourAvgLatency() {
+        return getAverageLatencyByHour(24);
+    }
+
+    public void reset() {
+        lastLatency.set(0);
+        allTimeAvgLatency = 0.0;
+        latencyDataQueue.clear();
+        shortTermLatencyDataQueue.clear();
+        longTermLatencyDataQueue.clear();
+        count = 0;
+        resetTime = Calendar.getInstance().getTime();
+    }
+
+    public Date getLastResetTime() {
+        return resetTime;
+    }
+
+    private double getAverageLatencyByMinute(int n) {
+        int samples = n * SAMPLES_PER_MINUTE;
+        double sum = 0.0;
+        Long[] array = shortTermLatencyDataQueue.toArray(new 
Long[shortTermLatencyDataQueue.size()]);
+
+        if (samples > array.length) {
+            // If we don't have enough samples collected yet
+            // add up everything we have
+            samples = array.length;
+            for (int i = 0; i < array.length; i++) {
+                sum += array[i];
+            }
+        } else {
+            // We have enough samples to make the right calculation
+            // Add up starting from the end of the queue (to give the most 
recent values)
+            for (int i = 0; i < samples; i++) {
+                sum += array[array.length - 1 - i];
+            }
+        }
+
+        if (samples == 0) {
+            return 0.0;
+        }
+        return sum/samples;
+    }
+
+    private double getAverageLatencyByHour(int n) {
+        int samples = n * SAMPLES_PER_HOUR;
+        double sum = 0.0;
+        Long[] array = longTermLatencyDataQueue.toArray(new 
Long[longTermLatencyDataQueue.size()]);
+
+        if (samples > array.length) {
+            samples = array.length;
+            for (int i = 0; i < array.length; i++) {
+                sum += array[i];
+            }
+        } else {
+            for (int i = 0; i < samples; i++) {
+                sum += array[array.length - 1 - i];
+            }
+        }
+
+        if (samples == 0) {
+            return 0.0;
+        }
+        return sum/samples;
+    }
+
+    private Runnable getShortTermDataCollector() {
+        return new Runnable() {
+            public void run() {
+                long latency = lastLatency.get();
+
+                // calculate all time average latency
+                int size = latencyDataQueue.size();
+                if (size > 0) {
+                    long sum = 0;
+                    for (int i = 0; i < size; i++) {
+                        sum += latencyDataQueue.poll();
+                    }
+                    allTimeAvgLatency = (allTimeAvgLatency * count + 
sum)/(count + size);
+                    count = count + size;
+                }
+
+                if (shortTermLatencyDataQueue.size() == 0 && latency == 0) {
+                    // we haven't started collecting data yet - skip ahead...
+                    return;
+                }
+
+                // take a sample for the short term latency calculation
+                if (shortTermLatencyDataQueue.size() == SAMPLES_PER_MINUTE * 
15) {
+                    shortTermLatencyDataQueue.remove();
+                }
+                shortTermLatencyDataQueue.offer(latency);
+            }
+        };
+    }
+
+    private Runnable getLongTermDataCollector() {
+        return new Runnable() {
+            public void run() {
+                long latency = lastLatency.get();
+                if (longTermLatencyDataQueue.size() == 0 && latency == 0) {
+                    return;
+                }
+
+                if (longTermLatencyDataQueue.size() == SAMPLES_PER_HOUR * 24) {
+                    longTermLatencyDataQueue.remove();
+                }
+                longTermLatencyDataQueue.offer(latency);
+            }
+        };
+    }
+}

Added: 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/util/LatencyViewMBean.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/util/LatencyViewMBean.java?rev=964719&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/util/LatencyViewMBean.java
 (added)
+++ 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/util/LatencyViewMBean.java
 Fri Jul 16 08:58:18 2010
@@ -0,0 +1,36 @@
+/*
+*  Licensed to the Apache Software Foundation (ASF) under one
+*  or more contributor license agreements.  See the NOTICE file
+*  distributed with this work for additional information
+*  regarding copyright ownership.  The ASF licenses this file
+*  to you under the Apache License, Version 2.0 (the
+*  "License"); you may not use this file except in compliance
+*  with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+*  Unless required by applicable law or agreed to in writing,
+*  software distributed under the License is distributed on an
+*   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+*  KIND, either express or implied.  See the License for the
+*  specific language governing permissions and limitations
+*  under the License.
+*/
+
+package org.apache.synapse.transport.nhttp.util;
+
+import java.util.Date;
+
+public interface LatencyViewMBean {
+
+    public double getAllTimeAvgLatency();
+    public double getLastMinuteAvgLatency();
+    public double getLast5MinuteAvgLatency();
+    public double getLast15MinuteAvgLatency();
+    public double getLastHourAvgLatency();
+    public double getLast8HourAvgLatency();
+    public double getLast24HourAvgLatency();
+    public void reset();
+    public Date getLastResetTime();
+
+}


Reply via email to