Jason, thank you for your patch, please open an incident and attach it there

https://issues.apache.org/bugzilla/index.cgi

Filip

Jason wrote:
Adds a threadsafe latch to the DeltaManager which is used to block processing of cluster messages until local applications have completed initialization.

Includes changes to the DeltaManager to create the latch based on configuration, changes to Catalina to automatically open the latch when initialization is complete, a constant used to as the attribute key in the ServletContext and an modification to the mbean-descriptor for the new DeltaManager attribute.

Also includes a handful of style cleanups interspersed (spelling, removing compiler warnings about type checking, removing spaces before semi colons and blank lines before and after braces) throughout.

  - Jason

"That's the problem. He's a brilliant lunatic and you can't tell
which way he'll jump --
like his game he's impossible to analyse --
you can't dissect him, predict him --
which of course means he's not a lunatic at all."


Index: catalina/Globals.java
===================================================================
--- catalina/Globals.java    (revision 719433)
+++ catalina/Globals.java    (working copy)
@@ -36,6 +36,14 @@
         "org.apache.catalina.deploy.alt_dd";
/** + * The servlet context attribute under which we store the concurrent latch + * used to block processing cluster messages until after local application
+     * initialization
+     */
+    public static final String CLUSTER_DELAY =
+        "org.apache.catalina.ha.delay";
+
+    /**
* The request attribute under which we store the array of X509Certificate * objects representing the certificate chain presented by our client,
      * if any.
Index: catalina/ha/session/DeltaManager.java
===================================================================
--- catalina/ha/session/DeltaManager.java    (revision 719433)
+++ catalina/ha/session/DeltaManager.java    (working copy)
@@ -26,11 +26,15 @@
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.Iterator;
+import java.util.concurrent.CountDownLatch;
+
+import javax.servlet.ServletContext;
import org.apache.catalina.Cluster;
 import org.apache.catalina.Container;
 import org.apache.catalina.Context;
 import org.apache.catalina.Engine;
+import org.apache.catalina.Globals;
 import org.apache.catalina.Host;
 import org.apache.catalina.LifecycleException;
 import org.apache.catalina.LifecycleListener;
@@ -38,13 +42,13 @@
 import org.apache.catalina.Valve;
 import org.apache.catalina.core.StandardContext;
 import org.apache.catalina.ha.CatalinaCluster;
+import org.apache.catalina.ha.ClusterManager;
 import org.apache.catalina.ha.ClusterMessage;
 import org.apache.catalina.ha.tcp.ReplicationValve;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.io.ReplicationStream;
 import org.apache.catalina.util.LifecycleSupport;
 import org.apache.catalina.util.StringManager;
-import org.apache.catalina.ha.ClusterManager;
/** * The DeltaManager manages replicated sessions by only replicating the deltas
@@ -62,10 +66,11 @@
  * @author Craig R. McClanahan
  * @author Jean-Francois Arcand
  * @author Peter Rossbach
+ * @author Jason Lunn
  * @version $Revision$ $Date$
  */
-public class DeltaManager extends ClusterManagerBase{
+public class DeltaManager extends ClusterManagerBase {
// ---------------------------------------------------- Security Classes public static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(DeltaManager.class);
@@ -106,6 +111,18 @@
     protected LifecycleSupport lifecycle = new LifecycleSupport(this);
/** + * Flag indicating that messageReceived(ClusterMessage) should block until
+     * all local applications have completed initialization
+     */
+    protected boolean delay = false;
+ + /** + * Barrier used to receive notification from other threads that it is okay
+     * to process incoming messages from the cluster
+     */
+    private CountDownLatch gate = null;
+ + /**
      * The maximum number of active Sessions allowed, or -1 for no limit.
      */
     private int maxActiveSessions = -1;
@@ -121,8 +138,8 @@
     /**
      * wait time between send session block (default 2 sec)
      */
-    private int sendAllSessionsWaitTime = 2 * 1000 ;
-    private ArrayList receivedMessageQueue = new ArrayList() ;
+    private int sendAllSessionsWaitTime = 2 * 1000;
+ private ArrayList<SessionMessage> receivedMessageQueue = new ArrayList<SessionMessage>() ;
     private boolean receiverQueue = false ;
     private boolean stateTimestampDrop = true ;
     private long stateTransferCreateSendTime;
@@ -175,6 +192,27 @@
     public String getName() {
         return name;
     }
+ + /** + * Set the member that indicates processing messages should wait for local
+     * initialization of applications to complete
+     *
+ * @param delayed If true, processing messageReceived(ClusterMessage) will
+     * block until local web applications have completed initialization.
+     */
+    public void setDelay ( boolean delay ) {
+        this.delay = delay;
+    }
+ + /**
+     * Gets the member the delayed flag
+     * @return delayed - boolean flag indicating that
+ * messageReceived(ClusterMessage) should block until local initialization
+     * of applications has completed
+     */
+    public boolean getDelay () {
+        return delay;
+    }
/**
      * @return Returns the counterSend_EVT_GET_ALL_SESSIONS.
@@ -781,6 +819,24 @@
         lifecycle.removeLifecycleListener(listener);
     }
+ @Override
+    /**
+     * If this.delay is true, create a gate that will be opened when
+ * local application initialization is complete + */
+    public void init () {
+        if (delay && container != null) {
+            if (container instanceof StandardContext) {
+ ServletContext servletContext = ((StandardContext) container).getServletContext();
+                if (servletContext != null) {
+                    gate = new CountDownLatch(1);
+ servletContext.setAttribute(Globals.CLUSTER_DELAY, gate);
+                }
+            }
+        }
+        super.init();
+    }
+
     /**
* Prepare for the beginning of active use of the public methods of this * component. This method should be called after <code>configure()</code>,
@@ -885,8 +941,8 @@
                 waitForSendAllSessions(beforeSendTime);
             } finally {
                 synchronized(receivedMessageQueue) {
- for (Iterator iter = receivedMessageQueue.iterator(); iter.hasNext();) { - SessionMessage smsg = (SessionMessage) iter.next(); + for (Iterator<SessionMessage> iter = receivedMessageQueue.iterator(); iter.hasNext();) {
+                        SessionMessage smsg = iter.next();
                         if (!stateTimestampDrop) {
messageReceived(smsg, smsg.getAddress() != null ? (Member) smsg.getAddress() : null);
                         } else {
@@ -1068,6 +1124,18 @@
      *            the message received.
      */
     public void messageDataReceived(ClusterMessage cmsg) {
+        // Block processing until local application initialization has
+        // completed, if a gate has been erected
+        if(gate != null) {
+            try {
+                gate.await();
+                gate = null;
+            }
+            catch(InterruptedException e) {
+                log.error(e, e);
+            }
+        }
+
         if (cmsg != null && cmsg instanceof SessionMessage) {
             SessionMessage msg = (SessionMessage) cmsg;
             switch (msg.getEventType()) {
@@ -1535,7 +1603,8 @@
         result.sendAllSessionsWaitTime = sendAllSessionsWaitTime ;
         result.receiverQueue = receiverQueue ;
         result.stateTimestampDrop = stateTimestampDrop ;
- result.stateTransferCreateSendTime = stateTransferCreateSendTime;
+        result.stateTransferCreateSendTime = stateTransferCreateSendTime;
+        result.delay = delay;
         return result;
     }
 }
Index: catalina/ha/session/mbeans-descriptors.xml
===================================================================
--- catalina/ha/session/mbeans-descriptors.xml    (revision 719433)
+++ catalina/ha/session/mbeans-descriptors.xml    (working copy)
@@ -268,7 +268,7 @@
     <attribute
       name="expireSessionsOnShutdown"
       is="true"
- description="exipre all sessions cluster wide as one node goes down" + description="expire all sessions cluster wide as one node goes down"
       type="boolean"/>
     <attribute
       name="notifyListenersOnReplication"
@@ -293,6 +293,10 @@
       name="sendAllSessionsWaitTime"
       description="wait time between send session block (default 2 sec)"
       type="int"/>
+    <attribute
+      name="delay"
+ description="wait until local applications have initialized before processing cluster messages"
+      type="boolean"/>
     <operation
       name="listSessionIds"
       description="Return the list of active session ids"
Index: catalina/startup/Catalina.java
===================================================================
--- catalina/startup/Catalina.java    (revision 719433)
+++ catalina/startup/Catalina.java    (working copy)
@@ -28,11 +28,18 @@
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import javax.servlet.ServletContext;
import org.apache.catalina.Container;
+import org.apache.catalina.Globals;
 import org.apache.catalina.Lifecycle;
 import org.apache.catalina.LifecycleException;
 import org.apache.catalina.Server;
+import org.apache.catalina.Service;
+import org.apache.catalina.core.ContainerBase;
+import org.apache.catalina.core.StandardContext;
 import org.apache.catalina.core.StandardServer;
 import org.apache.tomcat.util.digester.Digester;
 import org.apache.tomcat.util.digester.Rule;
@@ -56,6 +63,7 @@
  *
  * @author Craig R. McClanahan
  * @author Remy Maucherat
+ * @author Jason Lunn
  * @version $Revision$ $Date$
  */
@@ -579,7 +587,18 @@
                 log.error("Catalina.start: ", e);
             }
         }
-
+ + // Open any gates stored in ServletContexts to allow the processing of
+        // cluster messages once local applications have been initialized
+        Service [] services = server.findServices();
+        if ( services != null ) {
+            for ( Service service : services ) {
+                if ( service != null ) {
+                    openContainerGates( service.getContainer() );
+                }
+            }
+        }
+ long t2 = System.nanoTime();
         if(log.isInfoEnabled())
log.info <http://log.info>("Server startup in " + ((t2 - t1) / 1000000) + " ms");
@@ -601,7 +620,6 @@
             await();
             stop();
         }
-
     }
@@ -609,7 +627,6 @@
      * Stop an existing server instance.
      */
     public void stop() {
-
         try {
             // Remove the ShutdownHook first so that server.stop()
             // doesn't get invoked twice
@@ -629,7 +646,6 @@
                 log.error("Catalina.stop", e);
             }
         }
-
     }
@@ -637,9 +653,7 @@
      * Await and shutdown.
      */
     public void await() {
-
         server.await();
-
     }
@@ -647,15 +661,50 @@
      * Print usage information for this application.
      */
     protected void usage() {
-
         System.out.println
             ("usage: java org.apache.catalina.startup.Catalina"
              + " [ -config {pathname} ]"
              + " [ -nonaming ] { start | stop }");
+    }
+
+    /**
+     * Traverses the argument container and decrements the CountDownLatch
+ * found in the servlet context with attribute key Globals.CLUSTER_DELAY
+     * if found
+     * @param container Possibly null Container instance
+     */
+    protected void openContainerGates ( Container container ) {
+        if (container == null) {
+            return;
+        }
+
+        if ( container instanceof StandardContext ) {
+            StandardContext context =
+                (StandardContext)container;
+            ServletContext servletContext =
+                context.getServletContext();
+            if (servletContext != null) {
+                Object contextAttribute = servletContext
+                        .getAttribute(Globals.CLUSTER_DELAY);
+                if (contextAttribute != null &&
+                        contextAttribute instanceof CountDownLatch) {
+                    CountDownLatch gate =
+                        (CountDownLatch) contextAttribute;
+                    gate.countDown();
+                }
+            }
+        } else if ( container instanceof ContainerBase ) {
+            ContainerBase base = (ContainerBase)container;
+            Container [] containers = base.findChildren();
+            if ( containers != null ) {
+                for ( Container childContainer : containers ) {
+                    openContainerGates( childContainer );
+                }
+            }
+        }
     }
- // --------------------------------------- CatalinaShutdownHook Inner Class // XXX Should be moved to embedded !
@@ -709,6 +758,4 @@
         top.setParentClassLoader(parentClassLoader);
}
-
-
 }



On Fri, Nov 21, 2008 at 10:42 PM, Peter Rossbach <[EMAIL PROTECTED] <mailto:[EMAIL PROTECTED]>> wrote:

    Hi Jason,

    send us your implementation and let us review your stuff :-)

    You can also register a ContextListener at
    DeltaManager.setContainer() to control your latch.
    Are your sure that session sync message (GET ALL Session) is
    received before first request at second node
    is processed?

    I think your feature is an extension of the current reveivedQueue
    usage!

    Regards
    Peter



    Am 20.11.2008 um 22:54 schrieb Jason:


        This message is targeted at Filip Hanik, Craig R. McClanahan,
        Jean-Francois
        Arcand, Peter Rossbach or anyone with a direct interest in the
        DeltaManager
        implementation in Tomcat 6.

        A vendor (who will remain nameless) whose product I support
        for a client
        recently gave me an idea for a patch to DeltaManager to
        address what the
        vendor claims is a Tomcat specific issue related to session
        replication. I'm
        wondering if it would be of value to the community or if the
        "problem" it is
        trying to remedy is an intentional "feature".

        The primary issue is that, according to vendor engineering
        support, the
        other application containers the vendor supports deploying
        their product on,
        including WebSphere, WebLogic, et al, wait until after local
        applications
        have been initialized before processing incoming messages from
        the cluster
        that could include deserializing remote sessions and the
        objects therein. I
        have not confirmed this by examining the other containers mind
        you, but am
        pretty confident that this is an accurate statement in so far
        that vendor's
        product works in those environments but does not work in a
        clustered tomcat
        environment.

        The reason it fails in tomcat is that some of the objects in
        the serialized
        session make calls at construction time to the vendor's (archaic)
        preferences API's static methods, which are not initialized
        properly until
        the web application itself is started. The result is that the
        first node in
        the cluster starts up fine, but the 2nd-Nth nodes die a
        horrible death
        trying to deserialize remote sessions populated by the first node.

        The workaround we've implemented locally is a simple one: we
        extend the
        DeltaManager with a custom class. Therein, we create a latch
        (java.util.concurrent.CountDownLatch, to be specific) and save
        it in the
        ServletContext. The only overridden method is
        messageDataReceived(), which
        uses the latch.await() method to block before calling the original
        implementation of the parent messageDataReceived() method.

        The vendor's application (or, more properly, the custom
        extensions we've
        built on their platform) looks at the ServletContext for a
        latch after the
        preferences have been initialized locally, and calls
        latch.countDown(),
        allowing any blocked calls to messageDataReceived() to start
        executing as
        normally.

        Without breaking the current sequence of initializing the session
        replication code before local applications that Tomcat
        developers may have
        come to expect, it seems like there is a potential solution
        here that might
        enable applications like the one I've got to support to choose
        to configure
        the session replication to wait to process incoming messages
        until after the
        application has started.

        I think it would be pretty trivial for me to offer a patch to
        DeltaManager
        that created a latch based on a configuration element. One
        could imagine an
        automatic mechanism for toggling the latch by the container
        after the
        application initialization, or deferring to the application to
        deactivate.
        The question is, does anybody want such functionality besides
        me? The
        corollary is, if being able to choose when session replication
        begins is a
        desirable feature, is this the right tactic to implement it?

        Sincerely,

         - Jason Lunn

        "That's the problem. He's a brilliant lunatic and you can't tell
        which way he'll jump --
        like his game he's impossible to analyse --
        you can't dissect him, predict him --
        which of course means he's not a lunatic at all."



    ---------------------------------------------------------------------
    To unsubscribe, e-mail: [EMAIL PROTECTED]
    <mailto:[EMAIL PROTECTED]>
    For additional commands, e-mail: [EMAIL PROTECTED]
    <mailto:[EMAIL PROTECTED]>


------------------------------------------------------------------------

---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]


---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to