Author: fhanik
Date: Thu Mar 2 12:02:33 2006
New Revision: 382491
URL: http://svn.apache.org/viewcvs?rev=382491&view=rev
Log:
Refactored and removed all JMX and stats counting. Stats can be easily created
by building a StatsInterceptor.
Removing these two, the class reduced 50% in size and becomes easier to
maintain and manage
Added:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenderFactory.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SinglePointSender.java
Added:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java?rev=382491&view=auto
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
(added)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
Thu Mar 2 12:02:33 2006
@@ -0,0 +1,34 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.tcp;
+
+/**
+ * <p>Title: </p>
+ *
+ * <p>Description: </p>
+ *
+ * <p>Copyright: Copyright (c) 2005</p>
+ *
+ * <p>Company: </p>
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public interface DataSender {
+ public void connect() throws java.io.IOException;
+ public void disconnect();
+
+}
\ No newline at end of file
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenderFactory.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenderFactory.java?rev=382491&r1=382490&r2=382491&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenderFactory.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenderFactory.java
Thu Mar 2 12:02:33 2006
@@ -45,6 +45,7 @@
public static final String ASYNC_MODE = "asynchronous";
public static final String POOLED_SYNC_MODE = "pooled";
public static final String FAST_ASYNC_QUEUE_MODE = "fastasyncqueue";
+ public static final String PARALLEL_MODE = "parallel";
/**
* The string manager for this package.
@@ -91,12 +92,10 @@
* @return new sender object
* @throws java.io.IOException
*/
- public synchronized static SinglePointSender getIDataSender(String mode,
- Member mbr) throws java.io.IOException {
+ public synchronized static SinglePointSender getSingleSender(String
mode,Member mbr) throws java.io.IOException {
// Identify the class name of the DataSender we should configure
- SinglePointSender sender = factory.getSender(mode,mbr);
- if(sender == null)
- throw new java.io.IOException("Invalid replication mode=" + mode);
+ SinglePointSender sender = factory.getSinglePointSender(mode,mbr);
+ if(sender == null) throw new java.io.IOException("Invalid replication
mode=" + mode);
return sender ;
}
@@ -126,7 +125,7 @@
return senderModes != null && senderModes.containsKey(mode) ;
}
- private SinglePointSender getSender(String mode,Member mbr) {
+ private SinglePointSender getSinglePointSender(String mode,Member mbr) {
SinglePointSender sender = null;
String senderName = null;
senderName = senderModes.getProperty(mode);
@@ -164,10 +163,7 @@
// Load our mapping properties if necessary
if (senderModes == null) {
try {
- InputStream is = SinglePointSender.class
- .getClassLoader()
- .getResourceAsStream(
- DATASENDERS_PROPERTIES);
+ InputStream is =
SinglePointSender.class.getClassLoader().getResourceAsStream(DATASENDERS_PROPERTIES);
if (is != null) {
senderModes = new Properties();
senderModes.load(is);
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java?rev=382491&r1=382490&r2=382491&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java
Thu Mar 2 12:02:33 2006
@@ -327,12 +327,10 @@
.getHostAddress();
}
if (log.isDebugEnabled())
- log.debug("Starting replication listener on address:"
- + tcpListenAddress);
+ log.debug("Starting replication listener on address:"+
tcpListenAddress);
bind = java.net.InetAddress.getByName(tcpListenAddress);
} catch (IOException ioe) {
- log.error("Failed bind replication listener on address:"
- + tcpListenAddress, ioe);
+ log.error("Failed bind replication listener on address:"+
tcpListenAddress, ioe);
}
}
return bind;
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java?rev=382491&r1=382490&r2=382491&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
Thu Mar 2 12:02:33 2006
@@ -36,7 +36,6 @@
* type
*
* @author Filip Hanik
- * @author Peter Rossbach
* @version $Revision: 379956 $ $Date: 2006-02-22 16:57:35 -0600 (Wed, 22 Feb
2006) $
*/
public class ReplicationTransmitter implements ChannelSender,IDynamicProperty {
@@ -69,31 +68,6 @@
}
/**
- * number of transmitted messages>
- */
- private long nrOfRequests = 0;
-
- /**
- * number of transmitted bytes
- */
- private long totalBytes = 0;
-
- /**
- * number of failure
- */
- private long failureCounter = 0;
-
- /**
- * Iteration count for background processing.
- */
- private int count = 0;
-
- /**
- * Frequency of the check sender keepAlive Socket Status.
- */
- protected int processSenderFrequency = 2;
-
- /**
* current sender replication mode
*/
private String replicationMode;
@@ -111,25 +85,7 @@
/**
* autoConnect sender when next message send
*/
- private boolean autoConnect = false; /**
- * doTransmitterProcessingStats
- */
- protected boolean doTransmitterProcessingStats = false;
-
- /**
- * proessingTime
- */
- protected long processingTime = 0;
-
- /**
- * min proessingTime
- */
- protected long minProcessingTime = Long.MAX_VALUE ;
-
- /**
- * max proessingTime
- */
- protected long maxProcessingTime = 0;
+ private boolean autoConnect = false;
/**
* dynamic sender <code>properties</code>
@@ -137,11 +93,6 @@
private Map properties = new HashMap();
- /**
- * Transmitter Mbean name
- */
- private ObjectName objectName;
-
// ------------------------------------------------------------- Properties
/**
@@ -153,26 +104,7 @@
return (info);
}
- /**
- * @return Returns the nrOfRequests.
- */
- public long getNrOfRequests() {
- return nrOfRequests;
- }
-
- /**
- * @return Returns the totalBytes.
- */
- public long getTotalBytes() {
- return totalBytes;
- }
-
- /**
- * @return Returns the failureCounter.
- */
- public long getFailureCounter() {
- return failureCounter;
- }
+
/**
* current replication mode
@@ -200,62 +132,9 @@
}
- /**
- * @return Returns the avg processingTime/nrOfRequests.
- */
- public double getAvgProcessingTime() {
- return ((double)processingTime) / nrOfRequests;
- }
-
- /**
- * @return Returns the maxProcessingTime.
- */
- public long getMaxProcessingTime() {
- return maxProcessingTime;
- }
-
- /**
- * @return Returns the minProcessingTime.
- */
- public long getMinProcessingTime() {
- return minProcessingTime;
- }
- /**
- * @return Returns the processingTime.
- */
- public long getProcessingTime() {
- return processingTime;
- }
-
- /**
- * @return Returns the doTransmitterProcessingStats.
- */
- public boolean isDoTransmitterProcessingStats() {
- return doTransmitterProcessingStats;
- }
-
- /**
- * @param doProcessingStats The doTransmitterProcessingStats to set.
- */
- public void setDoTransmitterProcessingStats(boolean doProcessingStats) {
- this.doTransmitterProcessingStats = doProcessingStats;
- }
-
/**
- * Transmitter ObjectName
- *
- * @param name
- */
- public void setObjectName(ObjectName name) {
- objectName = name;
- }
-
- public ObjectName getObjectName() {
- return objectName;
- }
- /**
* @return Returns the autoConnect.
*/
public boolean isAutoConnect() {
@@ -304,13 +183,6 @@
}
- /**
- * @return Returns the processSenderFrequency.
- */
- public int getProcessSenderFrequency() {
- return processSenderFrequency;
- }
-
public int getTxBufSize() {
return txBufSize;
}
@@ -323,13 +195,6 @@
return "parallel".equals(replicationMode);
}
- /**
- * @param processSenderFrequency The processSenderFrequency to set.
- */
- public void setProcessSenderFrequency(int processSenderFrequency) {
- this.processSenderFrequency = processSenderFrequency;
- }
-
public void setTxBufSize(int txBufSize) {
this.txBufSize = txBufSize;
}
@@ -340,11 +205,12 @@
/**
* @return True if synchronized sender
- * @deprecated since version 5.5.7
*/
public boolean getIsSenderSynchronized() {
- return DataSenderFactory.SYNC_MODE.equals(replicationMode)
- || DataSenderFactory.POOLED_SYNC_MODE.equals(replicationMode);
+ return
+ DataSenderFactory.SYNC_MODE.equals(replicationMode) ||
+ DataSenderFactory.POOLED_SYNC_MODE.equals(replicationMode) ||
+ (DataSenderFactory.PARALLEL_MODE.equals(replicationMode) &&
waitForAck);
}
// ------------------------------------------------------------- dynamic
@@ -415,49 +281,12 @@
}
public void sendMessage(ChannelMessage message, Member destination) throws
IOException {
- long time = 0 ;
- if(doTransmitterProcessingStats) {
- time = System.currentTimeMillis();
- }
- try {
- Object key = getKey(destination);
- SinglePointSender sender = (SinglePointSender) map.get(key);
- sendMessageData(message, sender);
- } finally {
- if (doTransmitterProcessingStats) {
- addProcessingStats(time);
- }
- }
+ Object key = getKey(destination);
+ SinglePointSender sender = (SinglePointSender) map.get(key);
+ sendMessageData(message, sender);
}
/**
- * send message to all senders (broadcast)
- * @see
org.apache.catalina.tribes.ClusterSender#sendMessage(org.apache.catalina.tribes.ClusterMessage)
- */
-// public void sendMessage(ChannelMessage message, boolean domainOnly)
throws IOException {
-// long time = 0;
-// if (doTransmitterProcessingStats) {
-// time = System.currentTimeMillis();
-// }
-// try {
-// IDataSender[] senders = getSenders();
-// for (int i = 0; i < senders.length; i++) {
-// IDataSender sender = senders[i];
-// //domain filter
-// String domain = message.getAddress().getDomain();
-// if ( domainOnly && !(domain.equals(sender.getDomain())) )
continue;
-// sendMessageData(message, sender);
-// }
-// } finally {
-// if (doTransmitterProcessingStats) {
-// addProcessingStats(time);
-// }
-// }
-// }
-
-
-
- /**
* start the sender and register transmitter mbean
*
* @see org.apache.catalina.tribes.ClusterSender#start()
@@ -488,11 +317,9 @@
*
* @see SimpleTcpCluster#backgroundProcess()
*/
+
public void heartbeat() {
- count = (count + 1) % processSenderFrequency;
- if (count == 0) {
- checkKeepAlive();
- }
+ checkKeepAlive();
}
/**
@@ -531,18 +358,6 @@
}
/**
- * Reset sender statistics
- */
- public synchronized void resetStatistics() {
- nrOfRequests = 0;
- totalBytes = 0;
- failureCounter = 0;
- processingTime = 0;
- minProcessingTime = Long.MAX_VALUE;
- maxProcessingTime = 0;
- }
-
- /**
* add new cluster member and create sender ( s. replicationMode) transfer
* current properties to sender
*
@@ -552,7 +367,7 @@
try {
Object key = getKey(member);
if (!map.containsKey(key)) {
- SinglePointSender sender =
DataSenderFactory.getIDataSender(replicationMode, member);
+ SinglePointSender sender =
DataSenderFactory.getSingleSender(replicationMode, member);
if ( sender!= null ) {
transferSenderProperty(sender);
sender.setRxBufSize(getRxBufSize());
@@ -583,22 +398,6 @@
// ------------------------------------------------------------- protected
/**
- * calc number of requests and transfered bytes. Log stats all 100 requets
- *
- * @param length
- */
- protected synchronized void addStats(int length) {
- nrOfRequests++;
- totalBytes += length;
- if (log.isDebugEnabled() && (nrOfRequests % 100) == 0) {
- log.debug("Nr of bytes sent=" + totalBytes + " over "
- + nrOfRequests + "; avg=" + (totalBytes / nrOfRequests)
- + " bytes/request; failures=" + failureCounter);
- }
-
- }
-
- /**
* Transfer all properties from transmitter to concrete sender
*
* @param sender
@@ -643,13 +442,11 @@
// deprecated not needed DataSender#pushMessage can handle
connection
if (autoConnect) {
synchronized(sender) {
- if(!sender.isConnected())
- sender.connect();
+ if(!sender.isConnected()) sender.connect();
}
}
sender.sendMessage(data);
sender.setSuspect(false);
- addStats(data.getMessage().getLength());
} catch (IOException x) {
if (!sender.getSuspect()) {
if (log.isErrorEnabled() ) log.error("Unable to send
replicated message, is member ["+sender.toString()+"] down?",x);
@@ -657,23 +454,8 @@
log.debug("Unable to send replicated message, is member
["+sender.toString()+"] down?",x);
}
sender.setSuspect(true);
- failureCounter++;
throw x;
}
}
- /**
- * Add processing stats times
- * @param startTime
- */
- protected void addProcessingStats(long startTime) {
- long time = System.currentTimeMillis() - startTime ;
- if(time < minProcessingTime)
- minProcessingTime = time ;
- if( time > maxProcessingTime)
- maxProcessingTime = time ;
- processingTime += time ;
- }
-
-
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SinglePointSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SinglePointSender.java?rev=382491&r1=382490&r2=382491&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SinglePointSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SinglePointSender.java
Thu Mar 2 12:02:33 2006
@@ -25,14 +25,12 @@
* @since 5.5.7
*/
-public interface SinglePointSender
+public interface SinglePointSender extends DataSender
{
public void setAddress(java.net.InetAddress address);
public java.net.InetAddress getAddress();
public void setPort(int port);
public int getPort();
- public void connect() throws java.io.IOException;
- public void disconnect();
public void sendMessage(ChannelMessage data) throws java.io.IOException;
public boolean isConnected();
public void setSuspect(boolean suspect);
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]