Author: fhanik
Date: Thu Mar 2 16:28:32 2006
New Revision: 382589
URL: http://svn.apache.org/viewcvs?rev=382589&view=rev
Log:
Forgot to check in the following
Added:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java
Added:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java?rev=382589&view=auto
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java
(added)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java
Thu Mar 2 16:28:32 2006
@@ -0,0 +1,38 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.tcp;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.Member;
+
+/**
+ * @author Filip Hanik
+ * @version $Revision: 303993 $ $Date: 2005-07-16 16:05:54 -0500 (Sat, 16 Jul
2005) $
+ * @since 5.5.16
+ */
+
+public interface MultiPointSender extends DataSender
+{
+ public void sendMessage(Member[] destination, ChannelMessage data) throws
ChannelException;
+ public void setRxBufSize(int size);
+ public void setTxBufSize(int size);
+ public void setMaxRetryAttempts(int attempts);
+ public void setUseDirectBuffer(boolean directBuf);
+ public void setSuspect(boolean suspect);
+ public boolean getSuspect();
+
+}
Added:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java?rev=382589&view=auto
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java
(added)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java
Thu Mar 2 16:28:32 2006
@@ -0,0 +1,217 @@
+package org.apache.catalina.tribes.tcp;
+
+import java.util.LinkedList;
+
+import org.apache.catalina.tribes.ChannelException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.List;
+
+/**
+ * <p>Title: </p>
+ *
+ * <p>Description: </p>
+ *
+ * <p>Copyright: Copyright (c) 2005</p>
+ *
+ * <p>Company: </p>
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public abstract class PooledSender implements DataSender {
+
+ private SenderQueue queue = null;
+ private boolean connected;
+ private int rxBufSize;
+ private int txBufSize;
+ private boolean waitForAck;
+ private long timeout;
+
+ public PooledSender(int queueSize) {
+ queue = new SenderQueue(this,queueSize);
+ }
+
+ public abstract DataSender getNewDataSender();
+
+ public DataSender getSender() {
+ return queue.getSender(0);
+ }
+
+ public void returnSender(DataSender sender) {
+ sender.checkKeepAlive();
+ queue.returnSender(sender);
+ }
+
+ public synchronized void connect() throws ChannelException {
+ //do nothing, happens in the socket sender itself
+ queue.open();
+ setConnected(true);
+ }
+
+ public synchronized void disconnect() {
+ queue.close();
+ setConnected(false);
+ }
+
+
+ public int getInPoolSize() {
+ return queue.getInPoolSize();
+ }
+
+ public int getInUsePoolSize() {
+ return queue.getInUsePoolSize();
+ }
+
+ public void setConnected(boolean connected) {
+ this.connected = connected;
+ }
+
+ public void setRxBufSize(int rxBufSize) {
+ this.rxBufSize = rxBufSize;
+ }
+
+ public void setTxBufSize(int txBufSize) {
+ this.txBufSize = txBufSize;
+ }
+
+ public void setWaitForAck(boolean waitForAck) {
+ this.waitForAck = waitForAck;
+ }
+
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+
+ public boolean isConnected() {
+ return connected;
+ }
+
+ public int getRxBufSize() {
+ return rxBufSize;
+ }
+
+ public int getTxBufSize() {
+ return txBufSize;
+ }
+
+ public boolean getWaitForAck() {
+ return waitForAck;
+ }
+
+ public long getTimeout() {
+ return timeout;
+ }
+
+ public boolean checkKeepAlive() {
+ //do nothing, the pool checks on every return
+ return false;
+ }
+
+
+
+ // ----------------------------------------------------- Inner Class
+
+ private class SenderQueue {
+ private int limit = 25;
+
+ PooledSender parent = null;
+
+ private List notinuse = null;
+
+ private List inuse = null;
+
+ private boolean isOpen = true;
+
+ public SenderQueue(PooledSender parent, int limit) {
+ this.limit = limit;
+ this.parent = parent;
+ notinuse = new java.util.LinkedList();
+ inuse = new java.util.LinkedList();
+ }
+
+ /**
+ * @return Returns the limit.
+ */
+ public int getLimit() {
+ return limit;
+ }
+ /**
+ * @param limit The limit to set.
+ */
+ public void setLimit(int limit) {
+ this.limit = limit;
+ }
+ /**
+ * @return
+ */
+ public int getInUsePoolSize() {
+ return inuse.size();
+ }
+
+ /**
+ * @return
+ */
+ public int getInPoolSize() {
+ return notinuse.size();
+ }
+
+ public synchronized DataSender getSender(long timeout) {
+ if ( !isOpen ) throw new IllegalStateException("Queue is closed");
+ DataSender sender = null;
+ if ( notinuse.size() == 0 && inuse.size()<limit) {
+ sender = parent.getNewDataSender();
+ } else if (notinuse.size() > 0) {
+ sender = (DataSender) notinuse.remove(0);
+ }
+ if ( sender != null ) inuse.add(sender);
+// System.out.println("get: in use:"+inuse.size()+"
not:"+notinuse.size()+" thread:"+Thread.currentThread().getName());
+ return sender;
+ }
+
+ public synchronized void returnSender(DataSender sender) {
+ if ( !isOpen) {
+ sender.disconnect();
+ return;
+ }
+ //to do
+ inuse.remove(sender);
+ notinuse.add(sender);
+// System.out.println("return: in use:"+inuse.size()+"
not:"+notinuse.size()+" thread:"+Thread.currentThread().getName());
+ }
+
+ public synchronized void close() {
+ isOpen = false;
+ Object[] unused = notinuse.toArray();
+ Object[] used = inuse.toArray();
+ for (int i = 0; i < unused.length; i++) {
+ DataSender sender = (DataSender) unused[i];
+ sender.disconnect();
+ }//for
+ for (int i = 0; i < used.length; i++) {
+ DataSender sender = (DataSender) used[i];
+ sender.disconnect();
+ }//for
+ notinuse.clear();
+ inuse.clear();
+
+
+
+ }
+
+ public synchronized void open() {
+ isOpen = true;
+ }
+ }
+
+ public static void printArr(Object[] arr) {
+ System.out.print("[");
+ for (int i=0; i<arr.length; i++ ) {
+ System.out.print(arr[i]);
+ if ( (i+1)<arr.length )System.out.print(", ");
+ }
+ System.out.println("]");
+ }
+
+
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]