Author: fhanik
Date: Wed Mar 8 13:23:10 2006
New Revision: 384333
URL: http://svn.apache.org/viewcvs?rev=384333&view=rev
Log:
Working on the lazy replicated hash map
Added:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/DirectByteArrayOutputStream.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/Diffable.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/Streamable.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/InterceptorPayload.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/InterceptorPayload.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/InterceptorPayload.java?rev=384333&r1=384332&r2=384333&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/InterceptorPayload.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/InterceptorPayload.java
Wed Mar 8 13:23:10 2006
@@ -15,19 +15,11 @@
*/
package org.apache.catalina.tribes;
+import java.util.Stack;
+
/**
- * <p>Title: </p>
- *
- * <p>Description: </p>
- *
- * <p>Copyright: Copyright (c) 2005</p>
- *
- * <p>Company: </p>
- *
* @author Filip Hanik
* @version 1.0
*/
-public class InterceptorPayload {
- public InterceptorPayload() {
- }
+public class InterceptorPayload extends Stack {
}
Added:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/DirectByteArrayOutputStream.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/DirectByteArrayOutputStream.java?rev=384333&view=auto
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/DirectByteArrayOutputStream.java
(added)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/DirectByteArrayOutputStream.java
Wed Mar 8 13:23:10 2006
@@ -0,0 +1,62 @@
+/*
+ * Copyright 1999,2004-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Byte array output stream that exposes the byte array directly
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class DirectByteArrayOutputStream extends OutputStream {
+
+ private XByteBuffer buffer;
+
+ public DirectByteArrayOutputStream(int size) {
+ buffer = new XByteBuffer(size,false);
+ }
+
+ /**
+ * Writes the specified byte to this output stream.
+ *
+ * @param b the <code>byte</code>.
+ * @throws IOException if an I/O error occurs. In particular, an
+ * <code>IOException</code> may be thrown if the output stream has
+ * been closed.
+ * @todo Implement this java.io.OutputStream method
+ */
+ public void write(int b) throws IOException {
+ buffer.append((byte)b);
+ }
+
+ public int size() {
+ return buffer.getLength();
+ }
+
+ public byte[] getArrayDirect() {
+ return buffer.getBytesDirect();
+ }
+
+ public byte[] getArray() {
+ return buffer.getBytes();
+ }
+
+
+}
\ No newline at end of file
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=384333&r1=384332&r2=384333&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
Wed Mar 8 13:23:10 2006
@@ -515,9 +515,14 @@
public static Serializable deserialize(byte[] data)
throws IOException, ClassNotFoundException, ClassCastException {
+ return deserialize(data,0,data.length);
+ }
+
+ public static Serializable deserialize(byte[] data, int offset, int
length)
+ throws IOException, ClassNotFoundException, ClassCastException {
Object message = null;
if (data != null) {
- InputStream instream = new ByteArrayInputStream(data);
+ InputStream instream = new
ByteArrayInputStream(data,offset,length);
ReplicationStream stream = new ReplicationStream(instream,new
ClassLoader[] {XByteBuffer.class.getClassLoader()});
message = stream.readObject();
instream.close();
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java?rev=384333&r1=384332&r2=384333&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
Wed Mar 8 13:23:10 2006
@@ -40,8 +40,7 @@
* @version $Revision: 379956 $ $Date: 2006-02-22 16:57:35 -0600 (Wed, 22 Feb
2006) $
*/
public class ReplicationTransmitter implements ChannelSender,IDynamicProperty {
- private static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory
- .getLog(ReplicationTransmitter.class);
+ private static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory.getLog(ReplicationTransmitter.class);
/**
* The descriptive information about this implementation.
Added:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/Diffable.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/Diffable.java?rev=384333&view=auto
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/Diffable.java
(added)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/Diffable.java
Wed Mar 8 13:23:10 2006
@@ -0,0 +1,35 @@
+/*
+ * Copyright 1999,2004-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.tipis;
+
+import java.io.Serializable;
+import java.io.IOException;
+
+/**
+ *
+ *
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public interface Diffable extends Serializable {
+
+ public byte[] getDiff() throws IOException;
+
+ public void applyDiff(byte[] diff, int offset, int length) throws
IOException;
+
+ public boolean hasDiff();
+
+}
\ No newline at end of file
Added:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=384333&view=auto
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
(added)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
Wed Mar 8 13:23:10 2006
@@ -0,0 +1,354 @@
+/*
+ * Copyright 1999,2004-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.tipis;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.catalina.tribes.Channel;
+import java.io.Serializable;
+import org.apache.catalina.tribes.Member;
+import java.io.UnsupportedEncodingException;
+import java.io.IOException;
+import org.apache.catalina.tribes.io.DirectByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import java.util.Set;
+import java.util.LinkedHashMap;
+import org.apache.catalina.tribes.ChannelListener;
+import java.util.Collection;
+import org.apache.catalina.tribes.MembershipListener;
+
+/**
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public class LazyReplicatedMap extends LinkedHashMap
+ implements RpcCallback, ChannelListener, MembershipListener {
+ protected static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory.getLog(LazyReplicatedMap.class);
+
+//------------------------------------------------------------------------------
+// INSTANCE VARIABLES
+//------------------------------------------------------------------------------
+
+ private Channel channel;
+ private RpcChannel rpcChannel;
+
+
+//------------------------------------------------------------------------------
+// CONSTRUCTORS / DESTRUCTORS
+//------------------------------------------------------------------------------
+ public LazyReplicatedMap(Channel channel, String mapContextName, int
initialCapacity, float loadFactor) {
+ super(initialCapacity,loadFactor);
+ init(channel,mapContextName);
+ }
+
+ public LazyReplicatedMap(Channel channel, String mapContextName, int
initialCapacity) {
+ super(initialCapacity);
+ init(channel,mapContextName);
+ }
+
+ public LazyReplicatedMap(Channel channel, String mapContextName) {
+ super();
+ init(channel,mapContextName);
+ }
+
+ void init(Channel channel, String mapContextName) {
+ final String chset = "ISO-8859-1";
+ this.channel = channel;
+ try {
+ this.rpcChannel = new RpcChannel(mapContextName.getBytes(chset),
channel, this);
+ }catch (UnsupportedEncodingException x) {
+ log.warn("Unable to encode mapContextName["+mapContextName+"]
using getBytes("+chset+") using default getBytes()",x);
+ this.rpcChannel = new RpcChannel(mapContextName.getBytes(),
channel, this);
+ }
+ this.channel.addChannelListener(this);
+ this.channel.addMembershipListener(this);
+
+ }
+
+ public void breakDown() {
+ finalize();
+ }
+
+ public void finalize() {
+ if ( this.rpcChannel!=null ) {
+ this.rpcChannel.breakDown();
+ }
+ if ( this.channel != null ) {
+ this.channel.removeChannelListener(this);
+ this.channel.removeMembershipListener(this);
+ }
+ this.rpcChannel = null;
+ this.channel = null;
+ }
+
+//------------------------------------------------------------------------------
+// GROUP COM INTERFACES
+//------------------------------------------------------------------------------
+ /**
+ *
+ * @param msg Serializable
+ * @return Serializable - null if no reply should be sent
+ */
+ public Serializable replyRequest(Serializable msg, Member sender) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * If the reply has already been sent to the requesting thread,
+ * the rpc callback can handle any data that comes in after the fact.
+ * @param msg Serializable
+ * @param sender Member
+ */
+ public void leftOver(Serializable msg, Member sender) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void messageReceived(Serializable msg, Member sender) {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean accept(Serializable msg, Member sender) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void memberAdded(Member member) {
+
+ }
+ public void memberDisappeared(Member member) {
+
+ }
+
+//------------------------------------------------------------------------------
+// METHODS TO OVERRIDE
+//------------------------------------------------------------------------------
+
+ public Object get(Object key) {
+ return super.get(key);
+ }
+
+ public boolean containsKey(Object key) {
+ return super.containsKey(key);
+ }
+
+ public Object put(Object key, Object value) {
+ return super.put(key,value);
+ }
+
+ public void putAll(Map m) {
+ super.putAll(m);
+ }
+
+ public Object remove(Object key) {
+ return super.remove(key);
+ }
+
+ public void clear() {
+ super.clear();
+ }
+
+ public boolean containsValue(Object value) {
+ return super.containsValue(value);
+ }
+
+ public Object clone() {
+ return super.clone();
+ }
+
+ public Set entrySet() {
+ return super.entrySet();
+ }
+
+ public Set keySet() {
+ return super.keySet();
+ }
+
+ public int size() {
+ return super.size();
+ }
+
+ protected boolean removeEldestEntry(Map.Entry eldest) {
+ return false;
+ }
+
+ public boolean isEmpty() {
+ return super.isEmpty();
+ }
+
+ public Collection values() {
+ return super.values();
+ }
+
+
+//------------------------------------------------------------------------------
+// Map Entry class
+//------------------------------------------------------------------------------
+ public static class MapEntry implements Map.Entry {
+ private boolean backup;
+ private boolean proxy;
+ private Member backupNode;
+
+ private Serializable key;
+ private Serializable value;
+
+ public MapEntry(Serializable key, Serializable value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public boolean isBackup() {
+ return backup;
+ }
+
+ public void setBackup(boolean backup) {
+ this.backup = backup;
+ }
+
+ public boolean isProxy() {
+ return proxy;
+ }
+
+ public void setProxy(boolean proxy) {
+ this.proxy = proxy;
+ }
+
+ public boolean isDiffable() {
+ return (value instanceof Diffable);
+ }
+
+ public void setBackupNode(Member node) {
+ this.backupNode = node;
+ }
+
+ public Member getBackupNode() {
+ return backupNode;
+ }
+
+
+
+ public Object getValue() {
+ return value;
+ }
+
+ public Object setValue(Object value) {
+ Object old = this.value;
+ this.value = (Serializable)value;
+ return old;
+ }
+
+ public Object getKey() {
+ return key;
+ }
+
+
+ public byte[] getDiff() throws IOException {
+ if ( isDiffable() ) {
+ return ((Diffable)value).getDiff();
+ } else {
+ return getData();
+ }
+ }
+
+ public int hashCode() {
+ return key.hashCode();
+ }
+
+ public boolean equals(Object o) {
+ return key.equals(o);
+ }
+
+ /**
+ * returns the entire object as a byte array
+ * @return byte[]
+ * @throws IOException
+ */
+ public byte[] getData() throws IOException {
+ return (new ObjectStreamable(value)).getBuf().getArray();
+ }
+
+ /**
+ * apply a diff, or an entire object
+ * @param data byte[]
+ * @param offset int
+ * @param length int
+ * @param diff boolean
+ * @throws IOException
+ * @throws ClassNotFoundException
+ */
+ public void apply(byte[] data, int offset, int length, boolean diff)
throws IOException, ClassNotFoundException {
+ if ( isDiffable() && diff ) {
+ ((Diffable)value).applyDiff(data,offset,length);
+ } else if ( length == 0 ) {
+ value = null;
+ proxy = true;
+ } else {
+ value = XByteBuffer.deserialize(data,offset,length);
+ }
+ }
+
+ }
+
+//------------------------------------------------------------------------------
+// streamable class
+//------------------------------------------------------------------------------
+
+ public static class ObjectStreamable implements Streamable {
+ private DirectByteArrayOutputStream buf;
+ private int pos=0;
+ public ObjectStreamable(Serializable value) throws IOException {
+ buf = new DirectByteArrayOutputStream(1024);
+ ObjectOutputStream out = new ObjectOutputStream(buf);
+ out.writeObject(value);
+ out.flush();
+ }
+
+ /**
+ * returns true if the stream has reached its end
+ * @return boolean
+ */
+ public synchronized boolean eof() {
+ return (pos>=buf.size());
+
+ }
+
+ /**
+ * write data into the byte array starting at offset, maximum bytes
read are (data.length-offset)
+ * @param data byte[] - the array to read data into
+ * @param offset int - start position for writing data
+ * @return int - the number of bytes written into the data buffer
+ */
+ public synchronized int write(byte[] data, int offset) throws
IOException {
+ int length = Math.min(data.length-offset,buf.size()-pos);
+ System.arraycopy(buf.getArrayDirect(),pos,data,offset,length);
+ pos = pos + length;
+ return length;
+ }
+
+ public DirectByteArrayOutputStream getBuf() {
+ return buf;
+ }
+
+ public int size() {
+ return buf.size();
+ }
+
+ public int pos() {
+ return pos;
+ }
+
+ }
+
+}
\ No newline at end of file
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java?rev=384333&r1=384332&r2=384333&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
Wed Mar 8 13:23:10 2006
@@ -123,6 +123,14 @@
}//end if
}
+ public void breakDown() {
+ channel.removeChannelListener(this);
+ }
+
+ public void finalize() {
+ breakDown();
+ }
+
public boolean accept(Serializable msg, Member sender) {
if ( msg instanceof RpcMessage ) {
RpcMessage rmsg = (RpcMessage)msg;
Added:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/Streamable.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/Streamable.java?rev=384333&view=auto
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/Streamable.java
(added)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/Streamable.java
Wed Mar 8 13:23:10 2006
@@ -0,0 +1,50 @@
+/*
+ * Copyright 1999,2004-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.tipis;
+
+import java.io.IOException;
+
+/**
+ * Example usage:
+ * <code><pre>
+ * byte[] data = new byte[1024];
+ * Streamable st = ....;
+ * while ( !st.eof() ) {
+ * int length = st.read(data,0);
+ * String s = new String(data,0,length);
+ * System.out.println(s);
+ * }
+ * </pre></code>
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public interface Streamable {
+
+ /**
+ * returns true if the stream has reached its end
+ * @return boolean
+ */
+ public boolean eof();
+
+ /**
+ * write data into the byte array starting at offset, maximum bytes read
are (data.length-offset)
+ * @param data byte[] - the array to read data into
+ * @param offset int - start position for writing data
+ * @return int - the number of bytes written into the data buffer
+ */
+ public int write(byte[] data, int offset) throws IOException;
+
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]