Author: fhanik
Date: Mon Feb 13 13:00:05 2006
New Revision: 377484
URL: http://svn.apache.org/viewcvs?rev=377484&view=rev
Log:
Started working on the cluster group, before I can fully do that, I need to
clean up the dependencies between session replication logic and cluster core
code.
Added:
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ChannelException.java
Modified:
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterChannel.java
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterMessage.java
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/MembershipService.java
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/ReplicationStream.java
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterData.java
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterReceiverBase.java
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java
Added:
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ChannelException.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ChannelException.java?rev=377484&view=auto
==============================================================================
---
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ChannelException.java
(added)
+++
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ChannelException.java
Mon Feb 13 13:00:05 2006
@@ -0,0 +1,42 @@
+/*
+ * Copyright 1999,2004-2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.cluster;
+
+/**
+ * Channel Exception
+ * @author Filip Hanik
+ * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul
2005) $
+ */
+
+public class ChannelException
+ extends Exception {
+ public ChannelException() {
+ super();
+ }
+
+ public ChannelException(String message) {
+ super(message);
+ }
+
+ public ChannelException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ChannelException(Throwable cause) {
+ super(cause);
+ }
+
+}
Modified:
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterChannel.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterChannel.java?rev=377484&r1=377483&r2=377484&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterChannel.java
(original)
+++
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterChannel.java
Mon Feb 13 13:00:05 2006
@@ -24,4 +24,53 @@
*/
public interface ClusterChannel {
+ /**
+ * Start and stop sequences can be controlled by these constants
+ */
+ public static final int DEFAULT = 15;
+ public static final int MBR_RX_SEQ = 1;
+ public static final int SND_TX_SEQ = 2;
+ public static final int SND_RX_SEQ = 4;
+ public static final int MBR_TX_SEQ = 8;
+
+ /**
+ * Starts up the channel. This can be called multiple times for individual
services to start
+ * The svc parameter can be the logical or value of any constants
+ * @param svc int value of <BR>
+ * DEFAULT - will start all services <BR>
+ * MBR_RX_SEQ - starts the membership receiver <BR>
+ * MBR_TX_SEQ - starts the membership broadcaster <BR>
+ * SND_TX_SEQ - starts the replication transmitter<BR>
+ * SND_RX_SEQ - starts the replication receiver<BR>
+ * @throws ChannelException if a startup error occurs or the service is
already started.
+ */
+ public void start(int svc) throws ChannelException;
+
+ /**
+ * Shuts down the channel. This can be called multiple times for
individual services to shutdown
+ * The svc parameter can be the logical or value of any constants
+ * @param svc int value of <BR>
+ * DEFAULT - will shutdown all services <BR>
+ * MBR_RX_SEQ - starts the membership receiver <BR>
+ * MBR_TX_SEQ - starts the membership broadcaster <BR>
+ * SND_TX_SEQ - starts the replication transmitter<BR>
+ * SND_RX_SEQ - starts the replication receiver<BR>
+ * @throws ChannelException if a startup error occurs or the service is
already started.
+ */
+ public void stop(int svc) throws ChannelException;
+
+ /**
+ * Send a message to one or more members in the cluster
+ * @param destination Member[] - the destinations, null or zero length
means all
+ * @param msg ClusterMessage - the message to send
+ * @param options int - sender options, see class documentation
+ * @return ClusterMessage[] - the replies from the members, if any.
+ */
+ public ClusterMessage[] send(Member[] destination, ClusterMessage msg, int
options);
+
+
+ public void setClusterSender(ClusterSender sender);
+ public void setClusterReceiver(ClusterReceiver receiver);
+ public void setMembershipService(MembershipService service);
+
}
Modified:
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterMessage.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterMessage.java?rev=377484&r1=377483&r2=377484&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterMessage.java
(original)
+++
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterMessage.java
Mon Feb 13 13:00:05 2006
@@ -18,7 +18,9 @@
import java.io.Serializable;
/**
+ * @author Filip Hanik
* @author Peter Rossbach
+ *
*/
public interface ClusterMessage extends Serializable {
Modified:
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/MembershipService.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/MembershipService.java?rev=377484&r1=377483&r2=377484&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/MembershipService.java
(original)
+++
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/MembershipService.java
Mon Feb 13 13:00:05 2006
@@ -28,7 +28,10 @@
public interface MembershipService {
-
+
+ public static final int MBR_RX = 1;
+ public static final int MBR_TX = 2;
+
/**
* Sets the properties for the membership service. This must be called
before
* the <code>start()</code> method is called.
Modified:
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java?rev=377484&r1=377483&r2=377484&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java
(original)
+++
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java
Mon Feb 13 13:00:05 2006
@@ -16,7 +16,13 @@
package org.apache.catalina.cluster.group;
+import org.apache.catalina.cluster.ChannelException;
import org.apache.catalina.cluster.ClusterChannel;
+import org.apache.catalina.cluster.ClusterReceiver;
+import org.apache.catalina.cluster.ClusterSender;
+import org.apache.catalina.cluster.MembershipService;
+import org.apache.catalina.cluster.ClusterMessage;
+import org.apache.catalina.cluster.Member;
/**
* Channel interface
@@ -26,7 +32,91 @@
* @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul
2005) $
*/
public class GroupChannel implements ClusterChannel {
+ private ClusterReceiver clusterReceiver;
+ private ClusterSender clusterSender;
+ private MembershipService membershipService;
+
public GroupChannel() {
+ }
+
+ /**
+ * Send a message to one or more members in the cluster
+ * @param destination Member[] - the destinations, null or zero length
means all
+ * @param msg ClusterMessage - the message to send
+ * @param options int - sender options, see class documentation
+ * @return ClusterMessage[] - the replies from the members, if any.
+ */
+ public ClusterMessage[] send(Member[] destination, ClusterMessage msg, int
options) {
+ throw new UnsupportedOperationException("Method send not yet
implemented.");
+ }
+
+ /**
+ * Starts up the channel. This can be called multiple times for individual
services to start
+ * The svc parameter can be the logical or value of any constants
+ * @param svc int value of <BR>
+ * DEFAULT - will start all services <BR>
+ * MBR_RX_SEQ - starts the membership receiver <BR>
+ * MBR_TX_SEQ - starts the membership broadcaster <BR>
+ * SND_TX_SEQ - starts the replication transmitter<BR>
+ * SND_RX_SEQ - starts the replication receiver<BR>
+ * @throws ChannelException if a startup error occurs or the service is
already started.
+ */
+ public void start(int svc) throws ChannelException {
+ try {
+ if ( (svc & MBR_RX_SEQ) == MBR_RX_SEQ)
membershipService.start(membershipService.MBR_RX);
+ if ( (svc & SND_RX_SEQ) == SND_RX_SEQ) clusterReceiver.start();
+ if ( (svc & SND_TX_SEQ) == SND_TX_SEQ) clusterSender.start();
+ if ( (svc & MBR_TX_SEQ) == MBR_TX_SEQ)
membershipService.start(membershipService.MBR_TX);
+ }catch ( Exception x ) {
+ throw new ChannelException(x);
+ }
+ }
+
+ /**
+ * Shuts down the channel. This can be called multiple times for
individual services to shutdown
+ * The svc parameter can be the logical or value of any constants
+ * @param svc int value of <BR>
+ * DEFAULT - will shutdown all services <BR>
+ * MBR_RX_SEQ - starts the membership receiver <BR>
+ * MBR_TX_SEQ - starts the membership broadcaster <BR>
+ * SND_TX_SEQ - starts the replication transmitter<BR>
+ * SND_RX_SEQ - starts the replication receiver<BR>
+ * @throws ChannelException if a startup error occurs or the service is
already started.
+ */
+ public void stop(int svc) throws ChannelException {
+ try {
+ if ( (svc & MBR_RX_SEQ) == MBR_RX_SEQ) membershipService.stop();
+ if ( (svc & SND_RX_SEQ) == SND_RX_SEQ) clusterReceiver.stop();
+ if ( (svc & SND_TX_SEQ) == SND_TX_SEQ) clusterSender.stop();
+ if ( (svc & MBR_TX_SEQ) == MBR_RX_SEQ) membershipService.stop();
+ }catch ( Exception x ) {
+ throw new ChannelException(x);
+ }
+
+ }
+
+ public ClusterReceiver getClusterReceiver() {
+ return clusterReceiver;
+ }
+
+ public ClusterSender getClusterSender() {
+ return clusterSender;
+ }
+
+ public MembershipService getMembershipService() {
+ return membershipService;
+ }
+
+ public void setClusterReceiver(ClusterReceiver clusterReceiver) {
+ this.clusterReceiver = clusterReceiver;
+ }
+
+ public void setClusterSender(ClusterSender clusterSender) {
+ this.clusterSender = clusterSender;
+ }
+
+ public void setMembershipService(MembershipService membershipService) {
+ this.membershipService = membershipService;
}
}
Modified:
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java?rev=377484&r1=377483&r2=377484&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java
(original)
+++
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java
Mon Feb 13 13:00:05 2006
@@ -38,8 +38,6 @@
private SocketChannel channel;
- private Selector selector;
-
private ListenCallback callback;
private XByteBuffer buffer;
@@ -52,7 +50,6 @@
*/
public ObjectReader(SocketChannel channel, Selector selector,
ListenCallback callback) {
this.channel = channel;
- this.selector = selector;
this.callback = callback;
this.buffer = new XByteBuffer();
}
Modified:
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java?rev=377484&r1=377483&r2=377484&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java
(original)
+++
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java
Mon Feb 13 13:00:05 2006
@@ -18,6 +18,14 @@
import org.apache.catalina.cluster.ClusterMessage;
import org.apache.catalina.cluster.tcp.ClusterData;
+import java.io.ObjectOutputStream;
+import java.util.zip.GZIPOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.zip.GZIPInputStream;
+import org.apache.catalina.cluster.session.ReplicationStream;
/**
* The XByteBuffer provides a dual functionality.
@@ -337,10 +345,10 @@
* @return - a full package (header,compress,size,data,footer)
*
*/
- public static byte[] createDataPackage(byte[] indata, int compressed)
+ public static byte[] createDataPackage(ClusterData cdata)
throws java.io.IOException {
- byte[] data = indata;
- byte[] comprdata = XByteBuffer.toBytes(compressed);
+ byte[] data = cdata.getMessage();
+ byte[] comprdata = XByteBuffer.toBytes(cdata.getCompress());
int length =
START_DATA.length + //header length
4 + //compression flag
@@ -355,4 +363,63 @@
System.arraycopy(END_DATA, 0, result, START_DATA.length + 8 +
data.length, END_DATA.length);
return result;
}
+
+ public static ClusterMessage deserialize(ClusterData data, boolean
compress)
+ throws IOException, ClassNotFoundException, ClassCastException {
+ Object message = null;
+ if (data != null) {
+ InputStream instream;
+ if (compress ) {
+ instream = new GZIPInputStream(new
ByteArrayInputStream(data.getMessage()));
+ } else {
+ instream = new ByteArrayInputStream(data.getMessage());
+ }
+ ReplicationStream stream = new
ReplicationStream(instream,XByteBuffer.class.getClassLoader());
+ message = stream.readObject();
+ instream.close();
+ }
+ if ( message == null ) {
+ return null;
+ } else if (message instanceof ClusterMessage)
+ return (ClusterMessage) message;
+ else {
+ throw new ClassCastException("Message has the wrong class. It
should implement ClusterMessage, instead it is:"+message.getClass().getName());
+ }
+ }
+
+ /**
+ * Serializes a message into cluster data
+ * @param msg ClusterMessage
+ * @param compress boolean
+ * @return ClusterData
+ * @throws IOException
+ */
+ public static ClusterData serialize(ClusterMessage msg, boolean compress)
throws IOException {
+ msg.setTimestamp(System.currentTimeMillis());
+ ByteArrayOutputStream outs = new ByteArrayOutputStream();
+ ObjectOutputStream out;
+ GZIPOutputStream gout = null;
+ ClusterData data = new ClusterData();
+ data.setType(msg.getClass().getName());
+ data.setUniqueId(msg.getUniqueId());
+ data.setTimestamp(msg.getTimestamp());
+ data.setCompress(msg.getCompress());
+ data.setResend(msg.getResend());
+ if (compress) {
+ gout = new GZIPOutputStream(outs);
+ out = new ObjectOutputStream(gout);
+ } else {
+ out = new ObjectOutputStream(outs);
+ }
+ out.writeObject(msg);
+ // flush out the gzip stream to byte buffer
+ if(gout != null) {
+ gout.flush();
+ gout.close();
+ }
+ data.setMessage(outs.toByteArray());
+ return data;
+ }
+
+
}
Modified:
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/ReplicationStream.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/ReplicationStream.java?rev=377484&r1=377483&r2=377484&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/ReplicationStream.java
(original)
+++
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/ReplicationStream.java
Mon Feb 13 13:00:05 2006
@@ -73,11 +73,11 @@
try
{
if ( tryRepFirst ) return findReplicationClass(name);
- else return findWebappClass(name);
+ else return findExternalClass(name);
}
catch ( Exception x )
{
- if ( tryRepFirst ) return findWebappClass(name);
+ if ( tryRepFirst ) return findExternalClass(name);
else return findReplicationClass(name);
}
} catch (ClassNotFoundException e) {
@@ -90,7 +90,7 @@
return Class.forName(name, false, getClass().getClassLoader());
}
- public Class findWebappClass(String name)
+ public Class findExternalClass(String name)
throws ClassNotFoundException, IOException {
return Class.forName(name, false, classLoader);
}
Modified:
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterData.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterData.java?rev=377484&r1=377483&r2=377484&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterData.java
(original)
+++
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterData.java
Mon Feb 13 13:00:05 2006
@@ -19,11 +19,15 @@
/**
+ * The cluster data class is used to transport around the byte array from
+ * a ClusterMessage object. This is just a utility class to avoid having to
+ * serialize and deserialize the ClusterMessage more than once.
* @author Peter Rossbach
+ * @author Filip Hanik
* @version $Revision$ $Date$
* @since 5.5.10
*/
-public class ClusterData {
+public class ClusterData {
private int resend = ClusterMessage.FLAG_DEFAULT ;
private int compress = ClusterMessage.FLAG_DEFAULT ;
Modified:
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterReceiverBase.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterReceiverBase.java?rev=377484&r1=377483&r2=377484&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterReceiverBase.java
(original)
+++
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterReceiverBase.java
Mon Feb 13 13:00:05 2006
@@ -34,6 +34,7 @@
import org.apache.catalina.cluster.session.ReplicationStream;
import org.apache.catalina.core.StandardHost;
import org.apache.catalina.util.StringManager;
+import org.apache.catalina.cluster.io.XByteBuffer;
/**
* FIXME i18n log messages
@@ -438,32 +439,16 @@
//protected ClusterMessage deserialize(byte[] data)
protected ClusterMessage deserialize(ClusterData data)
throws IOException, ClassNotFoundException {
- Object message = null;
+ boolean compress = isCompress() || data.getCompress() ==
ClusterMessage.FLAG_ALLOWED;
+ ClusterMessage message = null;
if (data != null) {
- InputStream instream;
- if (isCompress() || data.getCompress() ==
ClusterMessage.FLAG_ALLOWED ) {
- instream = new GZIPInputStream(new
ByteArrayInputStream(data.getMessage()));
- } else {
- instream = new ByteArrayInputStream(data.getMessage());
- }
- ReplicationStream stream = new ReplicationStream(instream,
- getClass().getClassLoader());
- message = stream.readObject();
+ message = XByteBuffer.deserialize(data, compress);
// calc stats really received bytes
totalReceivedBytes += data.getMessage().length;
//totalReceivedBytes += data.length;
nrOfMsgsReceived++;
- instream.close();
- }
- if (message instanceof ClusterMessage)
- return (ClusterMessage) message;
- else {
- if (log.isDebugEnabled())
- log.debug("Message " + message.toString() + " from type "
- + message.getClass().getName()
- + " transfered but is not a cluster message");
- return null;
}
+ return message;
}
// --------------------------------------------- Performance Stats
Modified:
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java?rev=377484&r1=377483&r2=377484&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java
Mon Feb 13 13:00:05 2006
@@ -865,9 +865,8 @@
isMessageTransferStarted = true ;
}
try {
- byte[] message = data.getMessage();
OutputStream out = socket.getOutputStream();
-
out.write(XByteBuffer.createDataPackage(message,data.getCompress()));
+ out.write(XByteBuffer.createDataPackage(data));
out.flush();
if (isWaitForAck())
waitForAck(ackTimeout);
Modified:
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java?rev=377484&r1=377483&r2=377484&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java
(original)
+++
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java
Mon Feb 13 13:00:05 2006
@@ -35,13 +35,14 @@
import org.apache.catalina.core.StandardHost;
import org.apache.catalina.util.StringManager;
import org.apache.tomcat.util.IntrospectionUtils;
+import org.apache.catalina.cluster.io.XByteBuffer;
/**
- * Transmit message to ohter cluster members create sender from replicationMode
+ * Transmit message to other cluster members
+ * Actual senders are created based on the replicationMode
* type
* FIXME i18n log messages
* FIXME compress data depends on message type and size
- * FIXME send very big messages at some block see FarmWarDeployer!
* TODO pause and resume senders
*
* @author Peter Rossbach
@@ -806,32 +807,9 @@
* @since 5.5.10
*/
protected ClusterData serialize(ClusterMessage msg) throws IOException {
- msg.setTimestamp(System.currentTimeMillis());
- ByteArrayOutputStream outs = new ByteArrayOutputStream();
- ObjectOutputStream out;
- GZIPOutputStream gout = null;
- ClusterData data = new ClusterData();
- data.setType(msg.getClass().getName());
- data.setUniqueId(msg.getUniqueId());
- data.setTimestamp(msg.getTimestamp());
- data.setCompress(msg.getCompress());
- data.setResend(msg.getResend());
- // FIXME add stats: How much comress and uncompress messages and bytes
are transfered
- if ((isCompress() && msg.getCompress() !=
ClusterMessage.FLAG_FORBIDDEN)
- || msg.getCompress() == ClusterMessage.FLAG_ALLOWED) {
- gout = new GZIPOutputStream(outs);
- out = new ObjectOutputStream(gout);
- } else {
- out = new ObjectOutputStream(outs);
- }
- out.writeObject(msg);
- // flush out the gzip stream to byte buffer
- if(gout != null) {
- gout.flush();
- gout.close();
- }
- data.setMessage(outs.toByteArray());
- return data;
+ boolean compress = ((isCompress() && msg.getCompress() !=
ClusterMessage.FLAG_FORBIDDEN)
+ || msg.getCompress() ==
ClusterMessage.FLAG_ALLOWED);
+ return XByteBuffer.serialize(msg,compress);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]