Added: tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/deploy/FileMessageFactory.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/deploy/FileMessageFactory.java?rev=433703&view=auto ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/deploy/FileMessageFactory.java (added) +++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/deploy/FileMessageFactory.java Tue Aug 22 10:28:09 2006 @@ -0,0 +1,311 @@ +/* + * Copyright 1999,2004 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.catalina.ha.deploy; + +import java.io.File; +import java.io.IOException; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.FileNotFoundException; + +/** + * This factory is used to read files and write files by splitting them up into + * smaller messages. So that entire files don't have to be read into memory. + * <BR> + * The factory can be used as a reader or writer but not both at the same time. + * When done reading or writing the factory will close the input or output + * streams and mark the factory as closed. It is not possible to use it after + * that. <BR> + * To force a cleanup, call cleanup() from the calling object. <BR> + * This class is not thread safe. + * + * @author Filip Hanik + * @version 1.0 + */ +public class FileMessageFactory { + /*--Static Variables----------------------------------------*/ + public static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory + .getLog(FileMessageFactory.class); + + /** + * The number of bytes that we read from file + */ + public static final int READ_SIZE = 1024 * 10; //10kb + + /** + * The file that we are reading/writing + */ + protected File file = null; + + /** + * True means that we are writing with this factory. False means that we are + * reading with this factory + */ + protected boolean openForWrite; + + /** + * Once the factory is used, it can not be reused. + */ + protected boolean closed = false; + + /** + * When openForWrite=false, the input stream is held by this variable + */ + protected FileInputStream in; + + /** + * When openForWrite=true, the output stream is held by this variable + */ + protected FileOutputStream out; + + /** + * The number of messages we have read or written + */ + protected int nrOfMessagesProcessed = 0; + + /** + * The total size of the file + */ + protected long size = 0; + + /** + * The total number of packets that we split this file into + */ + protected long totalNrOfMessages = 0; + + /** + * The bytes that we hold the data in, not thread safe. + */ + protected byte[] data = new byte[READ_SIZE]; + + /** + * Private constructor, either instantiates a factory to read or write. <BR> + * When openForWrite==true, then a the file, f, will be created and an + * output stream is opened to write to it. <BR> + * When openForWrite==false, an input stream is opened, the file has to + * exist. + * + * @param f + * File - the file to be read/written + * @param openForWrite + * boolean - true means we are writing to the file, false means + * we are reading from the file + * @throws FileNotFoundException - + * if the file to be read doesn't exist + * @throws IOException - + * if the system fails to open input/output streams to the file + * or if it fails to create the file to be written to. + */ + private FileMessageFactory(File f, boolean openForWrite) + throws FileNotFoundException, IOException { + this.file = f; + this.openForWrite = openForWrite; + if (log.isDebugEnabled()) + log.debug("open file " + f + " write " + openForWrite); + if (openForWrite) { + if (!file.exists()) + file.createNewFile(); + out = new FileOutputStream(f); + } else { + size = file.length(); + totalNrOfMessages = (size / READ_SIZE) + 1; + in = new FileInputStream(f); + }//end if + + } + + /** + * Creates a factory to read or write from a file. When opening for read, + * the readMessage can be invoked, and when opening for write the + * writeMessage can be invoked. + * + * @param f + * File - the file to be read or written + * @param openForWrite + * boolean - true, means we are writing to the file, false means + * we are reading from it + * @throws FileNotFoundException - + * if the file to be read doesn't exist + * @throws IOException - + * if it fails to create the file that is to be written + * @return FileMessageFactory + */ + public static FileMessageFactory getInstance(File f, boolean openForWrite) + throws FileNotFoundException, IOException { + return new FileMessageFactory(f, openForWrite); + } + + /** + * Reads file data into the file message and sets the size, totalLength, + * totalNrOfMsgs and the message number <BR> + * If EOF is reached, the factory returns null, and closes itself, otherwise + * the same message is returned as was passed in. This makes sure that not + * more memory is ever used. To remember, neither the file message or the + * factory are thread safe. dont hand off the message to one thread and read + * the same with another. + * + * @param f + * FileMessage - the message to be populated with file data + * @throws IllegalArgumentException - + * if the factory is for writing or is closed + * @throws IOException - + * if a file read exception occurs + * @return FileMessage - returns the same message passed in as a parameter, + * or null if EOF + */ + public FileMessage readMessage(FileMessage f) + throws IllegalArgumentException, IOException { + checkState(false); + int length = in.read(data); + if (length == -1) { + cleanup(); + return null; + } else { + f.setData(data, length); + f.setTotalLength(size); + f.setTotalNrOfMsgs(totalNrOfMessages); + f.setMessageNumber(++nrOfMessagesProcessed); + return f; + }//end if + } + + /** + * Writes a message to file. If (msg.getMessageNumber() == + * msg.getTotalNrOfMsgs()) the output stream will be closed after writing. + * + * @param msg + * FileMessage - message containing data to be written + * @throws IllegalArgumentException - + * if the factory is opened for read or closed + * @throws IOException - + * if a file write error occurs + * @return returns true if the file is complete and outputstream is closed, + * false otherwise. + */ + public boolean writeMessage(FileMessage msg) + throws IllegalArgumentException, IOException { + if (!openForWrite) + throw new IllegalArgumentException( + "Can't write message, this factory is reading."); + if (log.isDebugEnabled()) + log.debug("Message " + msg + " data " + msg.getData() + + " data length " + msg.getDataLength() + " out " + out); + if (out != null) { + out.write(msg.getData(), 0, msg.getDataLength()); + nrOfMessagesProcessed++; + out.flush(); + if (msg.getMessageNumber() == msg.getTotalNrOfMsgs()) { + out.close(); + cleanup(); + return true; + }//end if + } else { + if (log.isWarnEnabled()) + log.warn("Receive Message again -- Sender ActTimeout to short [ path: " + + msg.getContextPath() + + " war: " + + msg.getFileName() + + " data: " + + msg.getData() + + " data length: " + msg.getDataLength() + " ]"); + } + return false; + }//writeMessage + + /** + * Closes the factory, its streams and sets all its references to null + */ + public void cleanup() { + if (in != null) + try { + in.close(); + } catch (Exception ignore) { + } + if (out != null) + try { + out.close(); + } catch (Exception ignore) { + } + in = null; + out = null; + size = 0; + closed = true; + data = null; + nrOfMessagesProcessed = 0; + totalNrOfMessages = 0; + } + + /** + * Check to make sure the factory is able to perform the function it is + * asked to do. Invoked by readMessage/writeMessage before those methods + * proceed. + * + * @param openForWrite + * boolean + * @throws IllegalArgumentException + */ + protected void checkState(boolean openForWrite) + throws IllegalArgumentException { + if (this.openForWrite != openForWrite) { + cleanup(); + if (openForWrite) + throw new IllegalArgumentException( + "Can't write message, this factory is reading."); + else + throw new IllegalArgumentException( + "Can't read message, this factory is writing."); + } + if (this.closed) { + cleanup(); + throw new IllegalArgumentException("Factory has been closed."); + } + } + + /** + * Example usage. + * + * @param args + * String[], args[0] - read from filename, args[1] write to + * filename + * @throws Exception + */ + public static void main(String[] args) throws Exception { + + System.out + .println("Usage: FileMessageFactory fileToBeRead fileToBeWritten"); + System.out + .println("Usage: This will make a copy of the file on the local file system"); + FileMessageFactory read = getInstance(new File(args[0]), false); + FileMessageFactory write = getInstance(new File(args[1]), true); + FileMessage msg = new FileMessage(null, args[0], args[0]); + msg = read.readMessage(msg); + System.out.println("Expecting to write " + msg.getTotalNrOfMsgs() + + " messages."); + int cnt = 0; + while (msg != null) { + write.writeMessage(msg); + cnt++; + msg = read.readMessage(msg); + }//while + System.out.println("Actually wrote " + cnt + " messages."); + }///main + + public File getFile() { + return file; + } + +} \ No newline at end of file
Added: tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/deploy/UndeployMessage.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/deploy/UndeployMessage.java?rev=433703&view=auto ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/deploy/UndeployMessage.java (added) +++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/deploy/UndeployMessage.java Tue Aug 22 10:28:09 2006 @@ -0,0 +1,113 @@ +/* + * Copyright 1999,2004 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.catalina.ha.deploy; + +import org.apache.catalina.ha.ClusterMessage; +import org.apache.catalina.tribes.Member; +import java.io.Serializable; +public class UndeployMessage implements ClusterMessage,Serializable { + private Member address; + private long timestamp; + private String uniqueId; + private String contextPath; + private boolean undeploy; + private int resend = 0; + private int compress = 0; + + public UndeployMessage() {} //for serialization + public UndeployMessage(Member address, + long timestamp, + String uniqueId, + String contextPath, + boolean undeploy) { + this.address = address; + this.timestamp= timestamp; + this.undeploy = undeploy; + this.uniqueId = uniqueId; + this.undeploy = undeploy; + this.contextPath = contextPath; + } + + public Member getAddress() { + return address; + } + + public void setAddress(Member address) { + this.address = address; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public String getUniqueId() { + return uniqueId; + } + + public void setUniqueId(String uniqueId) { + this.uniqueId = uniqueId; + } + + public String getContextPath() { + return contextPath; + } + + public void setContextPath(String contextPath) { + this.contextPath = contextPath; + } + + public boolean getUndeploy() { + return undeploy; + } + + public void setUndeploy(boolean undeploy) { + this.undeploy = undeploy; + } + /** + * @return Returns the compress. + * @since 5.5.10 + */ + public int getCompress() { + return compress; + } + /** + * @param compress The compress to set. + * @since 5.5.10 + */ + public void setCompress(int compress) { + this.compress = compress; + } + /** + * @return Returns the resend. + * @since 5.5.10 + */ + public int getResend() { + return resend; + } + /** + * @param resend The resend to set. + * @since 5.5.10 + */ + public void setResend(int resend) { + this.resend = resend; + } + +} Added: tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/deploy/WarWatcher.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/deploy/WarWatcher.java?rev=433703&view=auto ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/deploy/WarWatcher.java (added) +++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/deploy/WarWatcher.java Tue Aug 22 10:28:09 2006 @@ -0,0 +1,238 @@ +/* + * Copyright 1999,2004 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.catalina.ha.deploy; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.Iterator; + +/** + * <p> + * The <b>WarWatcher </b> watches the deployDir for changes made to the + * directory (adding new WAR files->deploy or remove WAR files->undeploy) And + * notifies a listener of the changes made + * </p> + * + * @author Filip Hanik + * @author Peter Rossbach + * @version 1.1 + */ + +public class WarWatcher { + + /*--Static Variables----------------------------------------*/ + public static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory + .getLog(WarWatcher.class); + + /*--Instance Variables--------------------------------------*/ + /** + * Directory to watch for war files + */ + protected File watchDir = null; + + /** + * Parent to be notified of changes + */ + protected FileChangeListener listener = null; + + /** + * Currently deployed files + */ + protected Map currentStatus = new HashMap(); + + /*--Constructor---------------------------------------------*/ + + public WarWatcher() { + } + + public WarWatcher(FileChangeListener listener, File watchDir) { + this.listener = listener; + this.watchDir = watchDir; + } + + /*--Logic---------------------------------------------------*/ + + /** + * check for modification and send notifcation to listener + */ + public void check() { + if (log.isInfoEnabled()) + log.info("check cluster wars at " + watchDir); + File[] list = watchDir.listFiles(new WarFilter()); + if (list == null) + list = new File[0]; + //first make sure all the files are listed in our current status + for (int i = 0; i < list.length; i++) { + addWarInfo(list[i]); + } + + //check all the status codes and update the FarmDeployer + for (Iterator i = currentStatus.entrySet().iterator(); i.hasNext();) { + Map.Entry entry = (Map.Entry) i.next(); + WarInfo info = (WarInfo) entry.getValue(); + int check = info.check(); + if (check == 1) { + listener.fileModified(info.getWar()); + } else if (check == -1) { + listener.fileRemoved(info.getWar()); + //no need to keep in memory + currentStatus.remove(info.getWar()); + } + } + + } + + /** + * add cluster war to the watcher state + * @param warfile + */ + protected void addWarInfo(File warfile) { + WarInfo info = (WarInfo) currentStatus.get(warfile.getAbsolutePath()); + if (info == null) { + info = new WarInfo(warfile); + info.setLastState(-1); //assume file is non existent + currentStatus.put(warfile.getAbsolutePath(), info); + } + } + + /** + * clear watcher state + */ + public void clear() { + currentStatus.clear(); + } + + /** + * @return Returns the watchDir. + */ + public File getWatchDir() { + return watchDir; + } + + /** + * @param watchDir + * The watchDir to set. + */ + public void setWatchDir(File watchDir) { + this.watchDir = watchDir; + } + + /** + * @return Returns the listener. + */ + public FileChangeListener getListener() { + return listener; + } + + /** + * @param listener + * The listener to set. + */ + public void setListener(FileChangeListener listener) { + this.listener = listener; + } + + /*--Inner classes-------------------------------------------*/ + + /** + * File name filter for war files + */ + protected class WarFilter implements java.io.FilenameFilter { + public boolean accept(File path, String name) { + if (name == null) + return false; + return name.endsWith(".war"); + } + } + + /** + * File information on existing WAR files + */ + protected class WarInfo { + protected File war = null; + + protected long lastChecked = 0; + + protected long lastState = 0; + + public WarInfo(File war) { + this.war = war; + this.lastChecked = war.lastModified(); + if (!war.exists()) + lastState = -1; + } + + public boolean modified() { + return war.exists() && war.lastModified() > lastChecked; + } + + public boolean exists() { + return war.exists(); + } + + /** + * Returns 1 if the file has been added/modified, 0 if the file is + * unchanged and -1 if the file has been removed + * + * @return int 1=file added; 0=unchanged; -1=file removed + */ + public int check() { + //file unchanged by default + int result = 0; + + if (modified()) { + //file has changed - timestamp + result = 1; + lastState = result; + } else if ((!exists()) && (!(lastState == -1))) { + //file was removed + result = -1; + lastState = result; + } else if ((lastState == -1) && exists()) { + //file was added + result = 1; + lastState = result; + } + this.lastChecked = System.currentTimeMillis(); + return result; + } + + public File getWar() { + return war; + } + + public int hashCode() { + return war.getAbsolutePath().hashCode(); + } + + public boolean equals(Object other) { + if (other instanceof WarInfo) { + WarInfo wo = (WarInfo) other; + return wo.getWar().equals(getWar()); + } else { + return false; + } + } + + protected void setLastState(int lastState) { + this.lastState = lastState; + } + + } + +} \ No newline at end of file Added: tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/mbeans-descriptors.xml URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/mbeans-descriptors.xml?rev=433703&view=auto ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/mbeans-descriptors.xml (added) +++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/mbeans-descriptors.xml Tue Aug 22 10:28:09 2006 @@ -0,0 +1,94 @@ +<?xml version="1.0"?> +<mbeans-descriptors> + + <mbean name="SimpleTcpCluster" + className="org.apache.catalina.mbeans.ClassNameMBean" + description="Tcp Cluster implementation" + domain="Catalina" + group="Cluster" + type="org.apache.catalina.ha.tcp.SimpleTcpCluster"> + + <attribute name="protocolStack" + description="JavaGroups protocol stack selection" + type="java.lang.String"/> + + </mbean> + + + <mbean name="SimpleTcpReplicationManager" + className="org.apache.catalina.mbeans.ClassNameMBean" + description="Clustered implementation of the Manager interface" + domain="Catalina" + group="Manager" + type="org.apache.catalina.ha.tcp.SimpleTcpReplicationManager"> + + <attribute name="algorithm" + description="The message digest algorithm to be used when generating + session identifiers" + type="java.lang.String"/> + + <attribute name="checkInterval" + description="The interval (in seconds) between checks for expired + sessions" + type="int"/> + + <attribute name="className" + description="Fully qualified class name of the managed object" + type="java.lang.String" + writeable="false"/> + + <attribute name="distributable" + description="The distributable flag for Sessions created by this + Manager" + type="boolean"/> + + <attribute name="entropy" + description="A String initialization parameter used to increase the + entropy of the initialization of our random number + generator" + type="java.lang.String"/> + + <attribute name="managedResource" + description="The managed resource this MBean is associated with" + type="java.lang.Object"/> + + <attribute name="maxActiveSessions" + description="The maximum number of active Sessions allowed, or -1 + for no limit" + type="int"/> + + <attribute name="maxInactiveInterval" + description="The default maximum inactive interval for Sessions + created by this Manager" + type="int"/> + + <attribute name="name" + description="The descriptive name of this Manager implementation + (for logging)" + type="java.lang.String" + writeable="false"/> + + </mbean> + + + +<mbean name="ReplicationValve" + className="org.apache.catalina.mbeans.ClassNameMBean" + description="Valve for simple tcp replication" + domain="Catalina" + group="Valve" + type="org.apache.catalina.ha.tcp.ReplicationValve"> + + <attribute name="className" + description="Fully qualified class name of the managed object" + type="java.lang.String" + writeable="false"/> + + <attribute name="debug" + description="The debugging detail level for this component" + type="int"/> + + </mbean> + + +</mbeans-descriptors> Added: tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/package.html URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/package.html?rev=433703&view=auto ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/package.html (added) +++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/package.html Tue Aug 22 10:28:09 2006 @@ -0,0 +1,11 @@ +<body> + +<p>This package contains code for Clustering, the base class +of a Cluster is <code>org.apache.catalina.Cluster</code> implementations +of this class is done when implementing a new Cluster protocol</p> + +<p>The only Cluster protocol currently implemented is a JavaGroups based<br> + <b>JGCluster.java</b> +</p> + +</body> Added: tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/session/BackupManager.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/session/BackupManager.java?rev=433703&view=auto ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/session/BackupManager.java (added) +++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/session/BackupManager.java Tue Aug 22 10:28:09 2006 @@ -0,0 +1,271 @@ +/* + * Copyright 1999,2004 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.ha.session; + +import java.io.ByteArrayInputStream; +import java.io.IOException; + +import org.apache.catalina.LifecycleException; +import org.apache.catalina.Loader; +import org.apache.catalina.Session; +import org.apache.catalina.ha.CatalinaCluster; +import org.apache.catalina.ha.ClusterManager; +import org.apache.catalina.ha.ClusterMessage; +import org.apache.catalina.session.StandardManager; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.io.ReplicationStream; +import org.apache.catalina.tribes.tipis.LazyReplicatedMap; +import org.apache.catalina.tribes.Channel; + +/** + [EMAIL PROTECTED] Filip Hanik + [EMAIL PROTECTED] 1.0 + */ +public class BackupManager extends StandardManager implements ClusterManager +{ + public static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( BackupManager.class ); + + protected static long DEFAULT_REPL_TIMEOUT = 15000;//15 seconds + + /** Set to true if we don't want the sessions to expire on shutdown */ + protected boolean mExpireSessionsOnShutdown = true; + + /** + * The name of this manager + */ + protected String name; + + /** + * A reference to the cluster + */ + protected CatalinaCluster cluster; + + /** + * Should listeners be notified? + */ + private boolean notifyListenersOnReplication; + /** + * + */ + private int mapSendOptions = Channel.SEND_OPTIONS_SYNCHRONIZED_ACK|Channel.SEND_OPTIONS_USE_ACK; + + /** + * Constructor, just calls super() + * + */ + public BackupManager() { + super(); + } + + +//******************************************************************************/ +// ClusterManager Interface +//******************************************************************************/ + + public void messageDataReceived(ClusterMessage msg) { + } + + public boolean isSendClusterDomainOnly() { + return false; + } + + /** + * @param sendClusterDomainOnly The sendClusterDomainOnly to set. + */ + public void setSendClusterDomainOnly(boolean sendClusterDomainOnly) { + } + + /** + * @return Returns the defaultMode. + */ + public boolean isDefaultMode() { + return false; + } + /** + * @param defaultMode The defaultMode to set. + */ + public void setDefaultMode(boolean defaultMode) { + } + + public void setExpireSessionsOnShutdown(boolean expireSessionsOnShutdown) + { + mExpireSessionsOnShutdown = expireSessionsOnShutdown; + } + + public void setCluster(CatalinaCluster cluster) { + if(log.isDebugEnabled()) + log.debug("Cluster associated with SimpleTcpReplicationManager"); + this.cluster = cluster; + } + + public boolean getExpireSessionsOnShutdown() + { + return mExpireSessionsOnShutdown; + } + + + /** + * Override persistence since they don't go hand in hand with replication for now. + */ + public void unload() throws IOException { + } + + public ClusterMessage requestCompleted(String sessionId) { + if ( !this.started ) return null; + LazyReplicatedMap map = (LazyReplicatedMap)sessions; + map.replicate(sessionId,false); + return null; + } + + +//========================================================================= +// OVERRIDE THESE METHODS TO IMPLEMENT THE REPLICATION +//========================================================================= + + public Session createEmptySession() { + return new DeltaSession(this); + } + + public ClassLoader[] getClassLoaders() { + return ClusterManagerBase.getClassLoaders(this.container); + } + + /** + * Open Stream and use correct ClassLoader (Container) Switch + * ThreadClassLoader + * + * @param data + * @return The object input stream + * @throws IOException + */ + public ReplicationStream getReplicationStream(byte[] data) throws IOException { + return getReplicationStream(data,0,data.length); + } + + public ReplicationStream getReplicationStream(byte[] data, int offset, int length) throws IOException { + ByteArrayInputStream fis = new ByteArrayInputStream(data, offset, length); + return new ReplicationStream(fis, getClassLoaders()); + } + + + + + public String getName() { + return this.name; + } + /** + * Prepare for the beginning of active use of the public methods of this + * component. This method should be called after <code>configure()</code>, + * and before any of the public methods of the component are utilized.<BR> + * Starts the cluster communication channel, this will connect with the other nodes + * in the cluster, and request the current session state to be transferred to this node. + * @exception IllegalStateException if this component has already been + * started + * @exception LifecycleException if this component detects a fatal error + * that prevents this component from being used + */ + public void start() throws LifecycleException { + if ( this.started ) return; + try { + CatalinaCluster catclust = (CatalinaCluster)cluster; + catclust.addManager(getName(), this); + LazyReplicatedMap map = new LazyReplicatedMap(this, + catclust.getChannel(), + DEFAULT_REPL_TIMEOUT, + getMapName(), + getClassLoaders()); + map.setChannelSendOptions(mapSendOptions); + this.sessions = map; + super.start(); + } catch ( Exception x ) { + log.error("Unable to start BackupManager",x); + throw new LifecycleException("Failed to start BackupManager",x); + } + } + + public String getMapName() { + CatalinaCluster catclust = (CatalinaCluster)cluster; + String name = catclust.getManagerName(getName(),this)+"-"+""; + if ( log.isDebugEnabled() ) log.debug("Backup manager, Setting map name to:"+name); + return name; + } + + /** + * Gracefully terminate the active use of the public methods of this + * component. This method should be the last one called on a given + * instance of this component.<BR> + * This will disconnect the cluster communication channel and stop the listener thread. + * @exception IllegalStateException if this component has not been started + * @exception LifecycleException if this component detects a fatal error + * that needs to be reported + */ + public void stop() throws LifecycleException + { + + LazyReplicatedMap map = (LazyReplicatedMap)sessions; + if ( map!=null ) { + map.breakdown(); + } + if ( !this.started ) return; + try { + } catch ( Exception x ){ + log.error("Unable to stop BackupManager",x); + throw new LifecycleException("Failed to stop BackupManager",x); + } finally { + super.stop(); + } + cluster.removeManager(getName(),this); + + } + + public void setDistributable(boolean dist) { + this.distributable = dist; + } + + public boolean getDistributable() { + return distributable; + } + + public void setName(String name) { + this.name = name; + } + public boolean isNotifyListenersOnReplication() { + return notifyListenersOnReplication; + } + public void setNotifyListenersOnReplication(boolean notifyListenersOnReplication) { + this.notifyListenersOnReplication = notifyListenersOnReplication; + } + + public void setMapSendOptions(int mapSendOptions) { + this.mapSendOptions = mapSendOptions; + } + + /* + * @see org.apache.catalina.ha.ClusterManager#getCluster() + */ + public CatalinaCluster getCluster() { + return cluster; + } + + public int getMapSendOptions() { + return mapSendOptions; + } + + public String[] getInvalidatedSessions() { + return new String[0]; + } + +} Added: tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/session/ClusterManagerBase.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/session/ClusterManagerBase.java?rev=433703&view=auto ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/session/ClusterManagerBase.java (added) +++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/session/ClusterManagerBase.java Tue Aug 22 10:28:09 2006 @@ -0,0 +1,74 @@ +/* + * Copyright 1999,2004-2005 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.catalina.ha.session; + +import org.apache.catalina.ha.ClusterManager; +import java.beans.PropertyChangeListener; +import org.apache.catalina.Lifecycle; +import org.apache.catalina.session.ManagerBase; +import org.apache.catalina.Loader; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import org.apache.catalina.tribes.io.ReplicationStream; +import org.apache.catalina.Container; + +/** + * + * @author Filip Hanik + * @version $Revision: 380100 $ $Date: 2006-02-23 06:08:14 -0600 (Thu, 23 Feb 2006) $ + */ + +public abstract class ClusterManagerBase extends ManagerBase implements Lifecycle, PropertyChangeListener, ClusterManager{ + + + public static ClassLoader[] getClassLoaders(Container container) { + Loader loader = null; + ClassLoader classLoader = null; + if (container != null) loader = container.getLoader(); + if (loader != null) classLoader = loader.getClassLoader(); + else classLoader = Thread.currentThread().getContextClassLoader(); + if ( classLoader == Thread.currentThread().getContextClassLoader() ) { + return new ClassLoader[] {classLoader}; + } else { + return new ClassLoader[] {classLoader,Thread.currentThread().getContextClassLoader()}; + } + } + + + public ClassLoader[] getClassLoaders() { + return getClassLoaders(container); + } + + /** + * Open Stream and use correct ClassLoader (Container) Switch + * ThreadClassLoader + * + * @param data + * @return The object input stream + * @throws IOException + */ + public ReplicationStream getReplicationStream(byte[] data) throws IOException { + return getReplicationStream(data,0,data.length); + } + + public ReplicationStream getReplicationStream(byte[] data, int offset, int length) throws IOException { + ByteArrayInputStream fis = new ByteArrayInputStream(data, offset, length); + return new ReplicationStream(fis, getClassLoaders()); + } + + +} \ No newline at end of file Added: tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/session/ClusterSessionListener.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/session/ClusterSessionListener.java?rev=433703&view=auto ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/session/ClusterSessionListener.java (added) +++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/session/ClusterSessionListener.java Tue Aug 22 10:28:09 2006 @@ -0,0 +1,107 @@ +/* + * Copyright 1999,2004 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.catalina.ha.session; + +import java.util.Map; + +import org.apache.catalina.ha.ClusterManager; +import org.apache.catalina.ha.ClusterMessage; +import org.apache.catalina.ha.*; + +/** + * Receive replicated SessionMessage form other cluster node. + * @author Filip Hanik + * @author Peter Rossbach + * @version $Revision: 378258 $ $Date: 2006-02-16 08:42:35 -0600 (Thu, 16 Feb 2006) $ + */ +public class ClusterSessionListener extends ClusterListener { + + /** + * The descriptive information about this implementation. + */ + protected static final String info = "org.apache.catalina.session.ClusterSessionListener/1.1"; + + //--Constructor--------------------------------------------- + + public ClusterSessionListener() { + } + + //--Logic--------------------------------------------------- + + /** + * Return descriptive information about this implementation. + */ + public String getInfo() { + + return (info); + + } + + /** + * Callback from the cluster, when a message is received, The cluster will + * broadcast it invoking the messageReceived on the receiver. + * + * @param myobj + * ClusterMessage - the message received from the cluster + */ + public void messageReceived(ClusterMessage myobj) { + if (myobj != null && myobj instanceof SessionMessage) { + SessionMessage msg = (SessionMessage) myobj; + String ctxname = msg.getContextName(); + //check if the message is a EVT_GET_ALL_SESSIONS, + //if so, wait until we are fully started up + Map managers = cluster.getManagers() ; + if (ctxname == null) { + java.util.Iterator i = managers.keySet().iterator(); + while (i.hasNext()) { + String key = (String) i.next(); + ClusterManager mgr = (ClusterManager) managers.get(key); + if (mgr != null) + mgr.messageDataReceived(msg); + else { + //this happens a lot before the system has started + // up + if (log.isDebugEnabled()) + log.debug("Context manager doesn't exist:" + + key); + } + } + } else { + ClusterManager mgr = (ClusterManager) managers.get(ctxname); + if (mgr != null) + mgr.messageDataReceived(msg); + else if (log.isWarnEnabled()) + log.warn("Context manager doesn't exist:" + ctxname); + } + } + return; + } + + /** + * Accept only SessionMessage + * + * @param msg + * ClusterMessage + * @return boolean - returns true to indicate that messageReceived should be + * invoked. If false is returned, the messageReceived method will + * not be invoked. + */ + public boolean accept(ClusterMessage msg) { + return (msg instanceof SessionMessage); + } +} + Added: tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/session/Constants.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/session/Constants.java?rev=433703&view=auto ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/session/Constants.java (added) +++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/session/Constants.java Tue Aug 22 10:28:09 2006 @@ -0,0 +1,31 @@ +/* + * Copyright 1999,2004 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.catalina.ha.session; + +/** + * Manifest constants for the <code>org.apache.catalina.ha.session</code> + * package. + * + * @author Peter Rossbach Pero + */ + +public class Constants { + + public static final String Package = "org.apache.catalina.ha.session"; + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]