http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/MembershipListener.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/MembershipListener.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/MembershipListener.java deleted file mode 100644 index 297d8fa..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/MembershipListener.java +++ /dev/null @@ -1,53 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -// $Id: MembershipListener.java,v 1.4 2005/07/17 11:38:05 chrislott Exp $ - -package com.gemstone.org.jgroups; - - - -/** - * Allows a listener to be notified when group membership changes. - * These callbacks are used in {@link com.gemstone.org.jgroups.blocks.PullPushAdapter}. - * <p> - * The MembershipListener interface is similar to the {@link MessageListener} - * interface: every time a new view, a suspicion message, or a - * block event is received, the corresponding method of the class implementing - * MembershipListener will be called. - * Oftentimes the only method containing any functionality will be viewAccepted() - * which notifies the receiver that a new member has joined the group or that an - * existing member has left or crashed. - */ -public interface MembershipListener { - - /** - * Called when a change in membership has occurred. - * <b>No long running actions should be done in this callback.</b> - * If some long running action needs to be performed, it should be done in a separate thread. - */ - void viewAccepted(View new_view); - - /** - * Called whenever a member is suspected of having crashed, - * but has not yet been excluded. - */ - void suspect(SuspectMember suspected); // GemStoneAddition - - /** - * Called whenever the member needs to stop sending messages. - * When the next view is received (viewAccepted()), the member can resume sending - * messages. If a member does not comply, the message(s) sent between a block() - * and a matching viewAccepted() callback will probably be delivered in the next view. - * The block() callback is only needed by the Virtual Synchrony suite of protocols - * (FLUSH protocol)3.2, otherwise it will never be invoked. - */ - void block(); - - /** - * GemStoneAddition - Called when the channel is closing - */ - void channelClosing(Channel channel, Exception e); - -}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/MergeView.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/MergeView.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/MergeView.java deleted file mode 100644 index bf2bb61..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/MergeView.java +++ /dev/null @@ -1,166 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -// $Id: MergeView.java,v 1.5 2005/11/21 13:33:08 belaban Exp $ - - -package com.gemstone.org.jgroups; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.Iterator; -import java.util.Vector; - - -/** - * A view that is sent as a result of a merge. - * Whenever a group splits into subgroups, e.g., due to a network partition, - * and later the subgroups merge back together, a MergeView instead of a View - * will be received by the application. The MergeView class is a subclass of - * View and contains as additional instance variable: the list of views that - * were merged. For example, if the group denoted by view V1:(p,q,r,s,t) - * splits into subgroups V2:(p,q,r) and V2:(s,t), the merged view might be - * V3:(p,q,r,s,t). In this case the MergeView would contain a list of 2 views: - * V2:(p,q,r) and V2:(s,t). - */ -public class MergeView extends View { - protected Vector subgroups=null; // subgroups that merged into this single view (a list of Views) - - - /** - * Used by externalization - */ - public MergeView() { - } - - - /** - * Creates a new view - * - * @param vid The view id of this view (can not be null) - * @param members Contains a list of all the members in the view, can be empty but not null. - * @param subgroups A list of Views representing the former subgroups - */ - public MergeView(ViewId vid, Vector members, Vector subgroups) { - super(vid, members); - this.subgroups=subgroups; - } - - - /** - * Creates a new view - * - * @param creator The creator of this view (can not be null) - * @param id The lamport timestamp of this view - * @param members Contains a list of all the members in the view, can be empty but not null. - * @param subgroups A list of Views representing the former subgroups - */ - public MergeView(Address creator, long id, Vector members, Vector subgroups) { - super(creator, id, members); - this.subgroups=subgroups; - } - - @Override // GemStoneAddition - public boolean equals(Object o) { // GemStoneAddition for findbugs - return super.equals(o); - } - - @Override // GemStoneAddition - public int hashCode() { // GemStoneAddition for findbugs - return super.hashCode(); - } - - public Vector getSubgroups() { - return subgroups; - } - - - /** - * creates a copy of this view - * - * @return a copy of this view - */ - @Override // GemStoneAddition - public Object clone() { - ViewId vid2=vid != null ? (ViewId)vid.clone() : null; - Vector members2=members != null ? (Vector)members.clone() : null; - Vector subgroups2=subgroups != null ? (Vector)subgroups.clone() : null; - return new MergeView(vid2, members2, subgroups2); - } - - - @Override // GemStoneAddition - public String toString() { - StringBuffer sb=new StringBuffer(); - sb.append("MergeView::" + super.toString()); - sb.append(", subgroups=" + subgroups); - return sb.toString(); - } - - - @Override // GemStoneAddition - public void writeExternal(ObjectOutput out) throws IOException { - super.writeExternal(out); - out.writeObject(subgroups); - } - - - @Override // GemStoneAddition - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - super.readExternal(in); - subgroups=(Vector)in.readObject(); - } - - - @Override // GemStoneAddition - public void writeTo(DataOutputStream out) throws IOException { - super.writeTo(out); - - // write subgroups - int len=subgroups != null? subgroups.size() : 0; - out.writeShort(len); - if(len == 0) - return; - View v; - for(Iterator it=subgroups.iterator(); it.hasNext();) { - v=(View)it.next(); - v.writeTo(out); - } - } - - @Override // GemStoneAddition - public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException { - super.readFrom(in); - short len=in.readShort(); - if(len > 0) { - View v; - subgroups=new Vector(); - for(int i=0; i < len; i++) { - v=new View(); - v.readFrom(in); - subgroups.add(v); - } - } - } - - @Override // GemStoneAddition - public int serializedSize(short version) { - int retval=super.serializedSize(version); - retval+=Global.SHORT_SIZE; // for size of subgroups vector - - if(subgroups == null) - return retval; - View v; - for(Iterator it=subgroups.iterator(); it.hasNext();) { - v=(View)it.next(); - retval+=v.serializedSize(version); - } - return retval; - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/Message.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/Message.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/Message.java deleted file mode 100644 index 61945af..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/Message.java +++ /dev/null @@ -1,784 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -// $Id: Message.java,v 1.43 2005/11/07 13:37:23 belaban Exp $ - -package com.gemstone.org.jgroups; - - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectInputStream; -import java.io.ObjectOutput; -import java.io.ObjectOutputStream; -import java.io.Serializable; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; - -import com.gemstone.org.jgroups.conf.ClassConfigurator; -import java.util.concurrent.ConcurrentHashMap; -import com.gemstone.org.jgroups.stack.IpAddress; -import com.gemstone.org.jgroups.util.GemFireTracer; -import com.gemstone.org.jgroups.util.ExternalStrings; -import com.gemstone.org.jgroups.util.Streamable; -import com.gemstone.org.jgroups.util.Util; - - -/** - * A Message encapsulates data sent to members of a group. It contains among other things the - * address of the sender, the destination address, a payload (byte buffer) and a list of - * headers. Headers are added by protocols on the sender side and removed by protocols - * on the receiver's side. - * <p> - * The byte buffer can point to a reference, and we can subset it using index and length. However, - * when the message is serialized, we only write the bytes between index and length. - * @author Bela Ban - */ -public class Message implements Externalizable, Streamable { - // the version to use for multicast messages. This - // is currently established by TP.determineMulticastVersion during - // view processing and defaults to GFXD_10 for multicast discovery - // during rolling upgrade - public static volatile short multicastVersion = JGroupsVersion.GFE_71_ORDINAL; - - protected Address dest_addr=null; - protected Address src_addr=null; - - /** The payload */ - private byte[] buf=null; - - /** The index into the payload (usually 0) */ - protected transient int offset=0; - - /** The number of bytes in the buffer (usually buf.length is buf not equal to null). */ - protected transient int length=0; - - /** Map key=String value=Header */ - protected Map headers; - - /** GemStoneAddition - serialization version for this message */ - private short version; - - protected static final GemFireTracer log=GemFireTracer.getLog(Message.class); - - static final long serialVersionUID=-1137364035832847034L; - - static final HashSet nonStreamableHeaders=new HashSet(); // todo: remove when all headers are streamable - - /** Map key=Address, value=Address. Maintains mappings to canonical addresses */ -// private static final Map canonicalAddresses=new ConcurrentReaderHashMap(); - - /** can this message be enqueued and bundled with other messages? - some messages shouldn't be bundled - e.g., unordered messages and - messages requiring a response */ - public transient boolean bundleable; // GemstoneAddition - - /** added to help analyse where the time goes in jgroups messaging */ - public transient long timeStamp; // GemStoneAddition - - public boolean isCacheOperation; // GemStoneAddition - public boolean isHighPriority; // GemStoneAddition - public transient boolean isJoinResponse; - - /** Public constructor - * @param dest Address of receiver. If it is <em>null</em> or a <em>string</em>, then - * it is sent to the group (either to current group or to the group as given - * in the string). If it is a Vector, then it contains a number of addresses - * to which it must be sent. Otherwise, it contains a single destination.<p> - * Addresses are generally untyped (all are of type <em>Object</em>. A channel - * instance must know what types of addresses it expects and downcast - * accordingly. - * not allowed), since we don't copy the contents on clopy() or clone(). - */ - public Message(Address dest, Address src, byte[] buf) { - dest_addr=dest; - src_addr=src; - setBuffer(buf); - headers=createHeaders(7); - } - - /** - * Constructs a message. The index and length parameters allow to provide a <em>reference</em> to - * a byte buffer, rather than a copy, and refer to a subset of the buffer. This is important when - * we want to avoid copying. When the message is serialized, only the subset is serialized. - * @param dest Address of receiver. If it is <em>null</em> or a <em>string</em>, then - * it is sent to the group (either to current group or to the group as given - * in the string). If it is a Vector, then it contains a number of addresses - * to which it must be sent. Otherwise, it contains a single destination.<p> - * Addresses are generally untyped (all are of type <em>Object</em>. A channel - * instance must know what types of addresses it expects and downcast - * accordingly. - * @param src Address of sender - * @param buf A reference to a byte buffer - * @param offset The index into the byte buffer - * @param length The number of bytes to be used from <tt>buf</tt>. Both index and length are checked for - * array index violations and an ArrayIndexOutOfBoundsException will be thrown if invalid - */ - public Message(Address dest, Address src, byte[] buf, int offset, int length) { - dest_addr=dest; - src_addr=src; - setBuffer(buf, offset, length); - headers=createHeaders(7); - } - - /** - * GemStoneAddition - set the flag determining whether this is a - * distributed cache operation message that can be ignored in - * admin-only virtual machines - */ - public void setIsDistributedCacheOperation(boolean flag) { - isCacheOperation = flag; - } - - /** - * GemStoneAddition - get the flag that states whether this is a - * distributed cache operation message that can be ignored in - * admin-only virtual machines - */ - public boolean getIsDistributedCacheOperation() { - return isCacheOperation; - } - - /** - * GemStoneAddition - get the flag that states whether this is a - * high priority distribution message (unordered execution) - */ - public boolean isHighPriority() { - return isHighPriority; - } - - /** Public constructor - * @param dest Address of receiver. If it is <em>null</em> or a <em>string</em>, then - * it is sent to the group (either to current group or to the group as given - * in the string). If it is a Vector, then it contains a number of addresses - * to which it must be sent. Otherwise, it contains a single destination.<p> - * Addresses are generally untyped (all are of type <em>Object</em>. A channel - * instance must know what types of addresses it expects and downcast - * accordingly. - * @param src Address of sender - * @param obj The object will be serialized into the byte buffer. <em>Object - * has to be serializable </em>! Note that the resulting buffer must not be modified - * (e.g. buf[0]=0 is not allowed), since we don't copy the contents on clopy() or clone(). - */ - public Message(Address dest, Address src, Serializable obj) { - dest_addr=dest; - src_addr=src; - setObject(obj); - headers=createHeaders(7); - } - - - public Message() { - headers=createHeaders(7); - } - - - public Message(boolean create_headers) { - if(create_headers) - headers=createHeaders(7); - } - - public Address getDest() { - return dest_addr; - } - - public void setDest(Address new_dest) { - dest_addr=canonicalAddress(new_dest); - } - - public Address getSrc() { - return src_addr; - } - - public void setSrc(Address new_src) { - src_addr=canonicalAddress(new_src); - } - - /** - * Returns a <em>reference</em> to the payload (byte buffer). Note that this buffer should not be modified as - * we do not copy the buffer on copy() or clone(): the buffer of the copied message is simply a reference to - * the old buffer.<br/> - * Even if offset and length are used: we return the <em>entire</em> buffer, not a subset. - */ - public byte[] getRawBuffer() { - return buf; - } - - /** - * Returns a copy of the buffer if offset and length are used, otherwise a reference. - * @return byte array with a copy of the buffer. - */ - public byte[] getBuffer() { - if(buf == null) - return null; - if(offset == 0 && length == buf.length) - return buf; - else { - byte[] retval=new byte[length]; - System.arraycopy(buf, offset, retval, 0, length); - return retval; - } - } - - public void setBuffer(byte[] b) { - buf=b; - if(buf != null) { - offset=0; - length=buf.length; - } - else { - offset=length=0; - } - } - - /** - * Set the internal buffer to point to a subset of a given buffer - * @param b The reference to a given buffer. If null, we'll reset the buffer to null - * @param offset The initial position - * @param length The number of bytes - */ - public void setBuffer(byte[] b, int offset, int length) { - buf=b; - if(buf != null) { - if(offset < 0 || offset > buf.length) - throw new ArrayIndexOutOfBoundsException(offset); - if((offset + length) > buf.length) - throw new ArrayIndexOutOfBoundsException((offset+length)); - this.offset=offset; - this.length=length; - } - else { -// offset=length=0; GemStoneAddition (dead stores) - } - } - - /** Returns the offset into the buffer at which the data starts */ - public int getOffset() { - return offset; - } - - /** Returns the number of bytes in the buffer */ - public int getLength() { - return length; - } - - public Map getHeaders() { - return headers; - } - - public short getDestVersionOrdinal() { - if (this.version > 0) { - return JChannel.getGfFunctions().getSerializationVersionOrdinal(version); - } - short result = JChannel.getGfFunctions().getCurrentVersionOrdinal(); - if (dest_addr != null && !dest_addr.isMulticastAddress()) { - if (((IpAddress)dest_addr).getVersionOrdinal() < result) { - result = JChannel.getGfFunctions().getSerializationVersionOrdinal(((IpAddress)dest_addr).getVersionOrdinal()); - } - } else { - result = JChannel.getGfFunctions().getSerializationVersionOrdinal(multicastVersion); - } - return result; - } - - - public void setObject(Serializable obj) { - if(obj == null) return; - byte[] serialized = JChannel.getGfFunctions().serializeWithVersion(obj, getDestVersionOrdinal()); - setBuffer(serialized); - } - - public <T> T getObject() { - if(buf == null) return null; - try { - ByteArrayInputStream in_stream=new ByteArrayInputStream(buf, offset, length); - // ObjectInputStream in=new ObjectInputStream(in_stream); - //ObjectInputStream in=new ContextObjectInputStream(in_stream); // put it back on norbert's request - return JChannel.getGfFunctions().readObject(new DataInputStream(in_stream)); //in.readObject(); // GemStoneAddition - } - catch(Exception ex) { - // GemStoneAddition - show why we couldn't deserialize the object - RuntimeException e = new IllegalArgumentException(ex.toString()); - e.initCause(ex); - throw e; - } - } - - - /** - * Nulls all fields of this message so that the message can be reused. Removes all headers from the - * hashmap, but keeps the hashmap - */ - public void reset() { - dest_addr=src_addr=null; - setBuffer(null); - headers.clear(); - } - - /*---------------------- Used by protocol layers ----------------------*/ - - /** Puts a header given a key into the hashmap. Overwrites potential existing entry. */ - public void putHeader(String key, Header hdr) { - headers.put(key, hdr); - } - - public Header removeHeader(String key) { - return (Header)headers.remove(key); - } - - public void removeHeaders() { - headers.clear(); - } - - public <T> T getHeader(String key) { - return (T)headers.get(key); - } - /*---------------------------------------------------------------------*/ - - - public Message copy() { - return copy(true); - } - - /** - * Create a copy of the message. If offset and length are used (to refer to another buffer), the copy will - * contain only the subset offset and length point to, copying the subset into the new copy. - * @param copy_buffer - * @return Message with specified data - */ - public Message copy(boolean copy_buffer) { - Message retval=new Message(false); - retval.dest_addr=dest_addr; - retval.src_addr=src_addr; - - if(copy_buffer && buf != null) { - - // change bela Feb 26 2004: we don't resolve the reference - retval.setBuffer(buf, offset, length); - } - - retval.headers=createHeaders(headers); - /** GemStone Addition - copy fields added for GemFire */ - retval.bundleable = bundleable; - retval.timeStamp = timeStamp; - retval.isCacheOperation = isCacheOperation; - retval.isHighPriority = isHighPriority; - retval.version = version; - /** end GemStone addition - copy fields added for GemFire */ - return retval; - } - - - @Override // GemStoneAddition - protected Object clone() throws CloneNotSupportedException { - return copy(); - } - - public Message makeReply() { - return new Message(src_addr, null, null); - } - - - @Override // GemStoneAddition - public String toString() { - StringBuffer ret=new StringBuffer(64); - ret.append("[dst: "); - if(dest_addr == null) - ret.append("<null>"); - else - ret.append(dest_addr); - ret.append(", src: "); - if(src_addr == null) - ret.append("<null>"); - else - ret.append(src_addr); - -// int size; -// if(headers != null && (size=headers.size()) > 0) -// ret.append(" (").append(size).append(" headers)"); - - ret.append(", oob=" + this.isHighPriority); - - ret.append(", size="); - ret.append(String.valueOf(size())); -// ret.append(" bytes"); - ret.append(']'); - - if(headers != null) { - for(Iterator it=headers.entrySet().iterator(); it.hasNext();) { - Map.Entry entry=(Map.Entry)it.next(); - ret.append(", ").append(entry.getKey()).append(": ").append(entry.getValue()); - } - } - - //if(buf != null && length > 0) - // ret.append(length); - //else - // ret.append('0'); - //GemStoneAddition - print actual message size, not buffer size - return ret.toString(); - } - - public short getVersion() { - return this.version; - } - - public void setVersion(short v) { - this.version = v; - } - - - /** Tries to read an object from the message's buffer and prints it */ - public String toStringAsObject() { - Object obj; - - if(buf == null) return null; - try { - obj=getObject(); - return obj != null ? obj.toString() : ""; - } - catch(Exception e) { // it is not an object - return ""; - } - } - - - /** - * Returns size of buffer, plus some constant overhead for src and dest, plus number of headers time - * some estimated size/header. The latter is needed because we don't want to marshal all headers just - * to find out their size requirements. If a header implements Sizeable, the we can get the correct - * size.<p> Size estimations don't have to be very accurate since this is mainly used by FRAG to - * determine whether to fragment a message or not. Fragmentation will then serialize the message, - * therefore getting the correct value. - */ - public long size() { - long retval=Global.BYTE_SIZE // leading byte - + length // buffer - + (buf != null? Global.INT_SIZE : 0); // if buf != null 4 bytes for length - - short destVersion = this.version; - if (destVersion == 0) { - destVersion = (short)JChannel.getGfFunctions().getCurrentVersionOrdinal(); //Version.CURRENT_ORDINAL; - } - if (dest_addr != null) { - if (0 < ((IpAddress)dest_addr).getVersionOrdinal() && ((IpAddress)dest_addr).getVersionOrdinal() < destVersion) { - // size() must reflect the size when serialized for the version supported - // by the destination address. - destVersion = ((IpAddress)dest_addr).getVersionOrdinal(); - } - } - // if(dest_addr != null) - // retval+=dest_addr.size(); - if(src_addr != null) - retval+=(src_addr).size(destVersion); - - Map.Entry entry; - String key; - Header hdr; - retval+=Global.SHORT_SIZE; // size (short) - for(Iterator it=headers.entrySet().iterator(); it.hasNext();) { - entry=(Map.Entry)it.next(); - key=(String)entry.getKey(); - retval+=key.length() +2; // not the same as writeUTF(), but almost - hdr=(Header)entry.getValue(); - retval+=5; // 1 for presence of magic number, 4 for magic number - retval+=hdr.size(destVersion); - } - if (dest_addr == null || dest_addr.isMulticastAddress()) { - retval += 3; // Version.uncompressedSize(); - } - retval += 1; // GemStoneAddition - for isCacheOperation and isDirectAck - return retval; - } - - - public String printObjectHeaders() { - StringBuffer sb=new StringBuffer(); - Map.Entry entry; - - if(headers != null) { - for(Iterator it=headers.entrySet().iterator(); it.hasNext();) { - entry=(Map.Entry)it.next(); - sb.append(entry.getKey()).append(": ").append(entry.getValue()).append('\n'); - } - } - return sb.toString(); - } - - - - /* ----------------------------------- Interface Externalizable ------------------------------- */ - - public void writeExternal(ObjectOutput out) throws IOException { - if (true) throw new UnsupportedOperationException("messages are not externalizable"); - -// int len; -// Externalizable hdr; -// Map.Entry entry; -// -// if(dest_addr != null) { -// out.writeBoolean(true); -// Marshaller.write(dest_addr, out); -// } -// else { -// out.writeBoolean(false); -// } -// -// if(src_addr != null) { -// out.writeBoolean(true); -// Marshaller.write(src_addr, out); -// } -// else { -// out.writeBoolean(false); -// } -// -// // GemStoneAddition - more flags -// byte gfFlags = 0; -// if (isCacheOperation) -// gfFlags += CACHE_OP; -// if (isHighPriority) -// gfFlags += HIGH_PRIORITY; -// out.write(gfFlags); -// -// -// if(buf == null) -// out.writeInt(0); -// else { -// out.writeInt(length); -// out.write(buf, offset, length); -// } -// -// len=headers.size(); -// out.writeInt(len); -// // GemStoneAddition - create a versioned stream if src has a different -// // version than CURRENT -// Version srcVersion; -// if (dest_addr != null -// && !Version.CURRENT.equals(srcVersion = dest_addr.getVersion())) { -// out = new VersionedObjectOutput(out, srcVersion); -// } -// for(Iterator it=headers.entrySet().iterator(); it.hasNext();) { -// entry=(Map.Entry)it.next(); -// out.writeUTF((String)entry.getKey()); -// hdr=(Externalizable)entry.getValue(); -// Marshaller.write(hdr, out); -// } - } - - - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - - throw new UnsupportedOperationException("messages are not externalizable"); - -// int len; -// boolean destAddressExist=in.readBoolean(); -// boolean srcAddressExist; -// Object key, value; -// if(destAddressExist) { -// dest_addr=(Address)Marshaller.read(in); -// } -// -// srcAddressExist=in.readBoolean(); -// if(srcAddressExist) { -// src_addr=(Address)Marshaller.read(in); -// } -// -// // GemStoneAddition -// byte gfFlags = in.readByte(); -// if ( (gfFlags & CACHE_OP) != 0 ) -// isCacheOperation = true; -// if ( (gfFlags & HIGH_PRIORITY) != 0 ) -// isHighPriority = true; -// -// int i=in.readInt(); -// if(i != 0) { -// buf=new byte[i]; -// in.readFully(buf); -// offset=0; -// length=buf.length; -// } -// -// len=in.readInt(); -// // GemStoneAddition - create a versioned stream if src has a different -// // version than CURRENT -// Version srcVersion; -// if (src_addr != null -// && !Version.CURRENT.equals(srcVersion = src_addr.getVersion())) { -// in = new VersionedObjectInput(in, srcVersion); -// } -// while(len-- > 0) { -// key=in.readUTF(); -// value=Marshaller.read(in); -// headers.put(key, value); -// } - } - - /* --------------------------------- End of Interface Externalizable ----------------------------- */ - - - /* ----------------------------------- Interface Streamable ------------------------------- */ - - /** - * Streams all members (dest and src addresses, buffer and headers) to the output stream. - * @param out - * @throws IOException - */ - public void writeTo(DataOutputStream out) throws IOException { - JChannel.getGfFunctions().serializeJGMessage(this, out); - } - - public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException { - JChannel.getGfFunctions().deserializeJGMessage(this, in); - } - - - - /* --------------------------------- End of Interface Streamable ----------------------------- */ - - - - /* ----------------------------------- Private methods ------------------------------- */ - - public void writeHeader(Header value, DataOutputStream out) throws IOException { - int magic_number; - String classname; - ObjectOutput oos=null; - try { - magic_number=ClassConfigurator.getInstance(false).getMagicNumber(value.getClass()); - // write the magic number or the class name - if(magic_number == -1) { - out.writeBoolean(false); - classname=value.getClass().getName(); - out.writeUTF(classname); - } - else { - out.writeBoolean(true); - out.writeInt(magic_number); - } - - // write the contents - if(value instanceof Streamable) { - ((Streamable)value).writeTo(out); - } - else { - oos = JChannel.getGfFunctions().getObjectOutput(out); - value.writeExternal(oos); - if(!nonStreamableHeaders.contains(value.getClass())) { - nonStreamableHeaders.add(value.getClass()); - if(log.isTraceEnabled()) - log.trace("encountered non-Streamable header: " + value.getClass()); - } - } - } - catch(ChannelException e) { - log.error(ExternalStrings.Message_FAILED_WRITING_THE_HEADER, e); - } - finally { - if(oos != null) - oos.close(); // this is a no-op on ByteArrayOutputStream - } - } - - - public Header readHeader(DataInputStream in) throws IOException { - Header hdr; - boolean use_magic_number=in.readBoolean(); - int magic_number; - String classname; - Class clazz; - ObjectInput ois=null; - - try { - if(use_magic_number) { - magic_number=in.readInt(); - clazz=ClassConfigurator.getInstance(false).get(magic_number); - if(clazz == null) - log.error(ExternalStrings.Message_MAGIC_NUMBER__0__IS_NOT_AVAILABLE_IN_MAGIC_MAP, magic_number); - } - else { - classname=in.readUTF(); - clazz=ClassConfigurator.getInstance(false).get(classname); - } - hdr=(Header)clazz.newInstance(); - if(hdr instanceof Streamable) { - ((Streamable)hdr).readFrom(in); - } - else { - ois = JChannel.getGfFunctions().getObjectInput(in); - hdr.readExternal(ois); - } - } - catch(Exception ex) { - IOException e = new IOException( - ExternalStrings.Message_FAILED_TO_READ_HEADER.toLocalizedString()); - e.initCause(ex); - throw e; - } - finally { - // if(ois != null) // we cannot close this because other readers depend on it - // ois.close(); - } - return hdr; - } - - public void setHeaders(Map hdrs) { - this.headers = hdrs; - } - - public Map createHeaders(int size) { - return size > 0? new ConcurrentHashMap(size) : new ConcurrentHashMap(); - } - - - private Map createHeaders(Map m) { - return new ConcurrentHashMap(m); - } - - /** canonicalize addresses to some extent. There are race conditions - * allowed in this method, so it may not fully canonicalize an address - * @param nonCanonicalAddress - * @return canonical representation of the address - */ - private static Address canonicalAddress(Address nonCanonicalAddress) { -// Address result=null; -// if(nonCanonicalAddress == null) { -// return null; -// } -// // do not synchronize between get/put on the canonical map to avoid cost of contention -// // this can allow multiple equivalent addresses to leak out, but it's worth the cost savings -// try { -// result=(Address)canonicalAddresses.get(nonCanonicalAddress); -// } -// catch(NullPointerException npe) { -// // no action needed -// } -// if(result == null) { -// result=nonCanonicalAddress; -// canonicalAddresses.put(nonCanonicalAddress, result); -// } -// return result; - return nonCanonicalAddress; - } - - - /* ------------------------------- End of Private methods ---------------------------- */ - - public void dumpPayload() { // GemStoneAddition - for debugging - try { - DataInputStream di = new DataInputStream(new ByteArrayInputStream(buf, offset, length)); - Object o = JChannel.getGfFunctions().readObject(di); - log.getLogWriter().warning(ExternalStrings.Message_MESSAGEDUMPPAYLOAD__0, o); - } - catch (Exception e) { - log.warn("message.dumpPayload error: " + e.getMessage()); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/Message.java.old ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/Message.java.old b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/Message.java.old deleted file mode 100644 index fec30a7..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/Message.java.old +++ /dev/null @@ -1,711 +0,0 @@ -// $Id: Message.java,v 1.35 2005/07/15 05:27:21 belaban Exp $ - -package org.jgroups; - - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.jgroups.conf.ClassConfigurator; -import org.jgroups.stack.IpAddress; -import org.jgroups.util.ContextObjectInputStream; -import org.jgroups.util.Marshaller; -import org.jgroups.util.Streamable; -import org.jgroups.util.Util; - -import java.io.*; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; - -import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap; - - -/** - * A Message encapsulates data sent to members of a group. It contains among other things the - * address of the sender, the destination address, a payload (byte buffer) and a list of - * headers. Headers are added by protocols on the sender side and removed by protocols - * on the receiver's side.<br/> - * The byte buffer can point to a reference, and we can subset it using index and length. However, - * when the message is serialized, we only write the bytes between index and length. - * @author Bela Ban - */ -public class Message implements Externalizable, Streamable { - protected Address dest_addr=null; - protected Address src_addr=null; - - /** The payload */ - private byte[] buf=null; - - /** The index into the payload (usually 0) */ - protected transient int offset=0; - - /** The number of bytes in the buffer (usually buf.length is buf != null) */ - protected transient int length=0; - - /** Map<String,Header> */ - protected Map headers; - - protected static final Log log=LogFactory.getLog(Message.class); - - static final long serialVersionUID=-1137364035832847034L; - - static final byte DEST_SET=1; - static final byte SRC_SET=2; - static final byte BUF_SET=4; - static final byte HDRS_SET=8; - static final byte IPADDR_DEST=16; - static final byte IPADDR_SRC=32; - static final byte SRC_HOST_NULL=64; - - static final HashSet nonStreamableHeaders=new HashSet(); // todo: remove when all headers are streamable - - - - /** Public constructor - * @param dest Address of receiver. If it is <em>null</em> or a <em>string</em>, then - * it is sent to the group (either to current group or to the group as given - * in the string). If it is a Vector, then it contains a number of addresses - * to which it must be sent. Otherwise, it contains a single destination.<p> - * Addresses are generally untyped (all are of type <em>Object</em>. A channel - * instance must know what types of addresses it expects and downcast - * accordingly. - * @param src Address of sender - * @param buf Message to be sent. Note that this buffer must not be modified (e.g. buf[0]=0 is - * not allowed), since we don't copy the contents on clopy() or clone(). - */ - public Message(Address dest, Address src, byte[] buf) { - dest_addr=dest; - src_addr=src; - setBuffer(buf); - headers=createHeaders(5); - } - - /** - * Constructs a message. The index and length parameters allow to provide a <em>reference</em> to - * a byte buffer, rather than a copy, and refer to a subset of the buffer. This is important when - * we want to avoid copying. When the message is serialized, only the subset is serialized. - * @param dest Address of receiver. If it is <em>null</em> or a <em>string</em>, then - * it is sent to the group (either to current group or to the group as given - * in the string). If it is a Vector, then it contains a number of addresses - * to which it must be sent. Otherwise, it contains a single destination.<p> - * Addresses are generally untyped (all are of type <em>Object</em>. A channel - * instance must know what types of addresses it expects and downcast - * accordingly. - * @param src Address of sender - * @param buf A reference to a byte buffer - * @param offset The index into the byte buffer - * @param length The number of bytes to be used from <tt>buf</tt>. Both index and length are checked for - * array index violations and an ArrayIndexOutOfBoundsException will be thrown if invalid - */ - public Message(Address dest, Address src, byte[] buf, int offset, int length) { - dest_addr=dest; - src_addr=src; - setBuffer(buf, offset, length); - headers=createHeaders(5); - } - - - /** Public constructor - * @param dest Address of receiver. If it is <em>null</em> or a <em>string</em>, then - * it is sent to the group (either to current group or to the group as given - * in the string). If it is a Vector, then it contains a number of addresses - * to which it must be sent. Otherwise, it contains a single destination.<p> - * Addresses are generally untyped (all are of type <em>Object</em>. A channel - * instance must know what types of addresses it expects and downcast - * accordingly. - * @param src Address of sender - * @param obj The object will be serialized into the byte buffer. <em>Object - * has to be serializable </em>! Note that the resulting buffer must not be modified - * (e.g. buf[0]=0 is not allowed), since we don't copy the contents on clopy() or clone(). - */ - public Message(Address dest, Address src, Serializable obj) { - dest_addr=dest; - src_addr=src; - setObject(obj); - headers=createHeaders(5); - } - - - /** Only used for Externalization (creating an initial object) */ - public Message() { - } // should not be called as normal constructor - - public Address getDest() { - return dest_addr; - } - - public void setDest(Address new_dest) { - dest_addr=new_dest; - } - - public Address getSrc() { - return src_addr; - } - - public void setSrc(Address new_src) { - src_addr=new_src; - } - - /** - * Returns a <em>reference</em> to the payload (byte buffer). Note that this buffer should not be modified as - * we do not copy the buffer on copy() or clone(): the buffer of the copied message is simply a reference to - * the old buffer.<br/> - * Even if offset and length are used: we return the <em>entire</em> buffer, not a subset. - */ - public byte[] getRawBuffer() { - return buf; - } - - /** - * Returns a copy of the buffer if offset and length are used, otherwise a reference - * @return - */ - public byte[] getBuffer() { - if(buf == null) - return null; - if(offset == 0 && length == buf.length) - return buf; - else { - byte[] retval=new byte[length]; - System.arraycopy(buf, offset, retval, 0, length); - return retval; - } - } - - public void setBuffer(byte[] b) { - buf=b; - if(buf != null) { - offset=0; - length=buf.length; - } - else { - offset=length=0; - } - } - - /** - * Set the internal buffer to point to a subset of a given buffer - * @param b The reference to a given buffer. If null, we'll reset the buffer to null - * @param offset The initial position - * @param length The number of bytes - */ - public void setBuffer(byte[] b, int offset, int length) { - buf=b; - if(buf != null) { - if(offset < 0 || offset > buf.length) - throw new ArrayIndexOutOfBoundsException(offset); - if((offset + length) > buf.length) - throw new ArrayIndexOutOfBoundsException((offset+length)); - this.offset=offset; - this.length=length; - } - else { - offset=length=0; - } - } - - /** Returns the offset into the buffer at which the data starts */ - public int getOffset() { - return offset; - } - - /** Returns the number of bytes in the buffer */ - public int getLength() { - return length; - } - - public Map getHeaders() { - return headers; - } - - public void setObject(Serializable obj) { - if(obj == null) return; - try { - ByteArrayOutputStream out_stream=new ByteArrayOutputStream(); - ObjectOutputStream out=new ObjectOutputStream(out_stream); - out.writeObject(obj); - setBuffer(out_stream.toByteArray()); - } - catch(IOException ex) { - throw new IllegalArgumentException(ex.toString()); - } - } - - public Object getObject() { - if(buf == null) return null; - try { - ByteArrayInputStream in_stream=new ByteArrayInputStream(buf, offset, length); - // ObjectInputStream in=new ObjectInputStream(in_stream); - ObjectInputStream in=new ContextObjectInputStream(in_stream); // put it back on norbert's request - return in.readObject(); - } - catch(Exception ex) { - throw new IllegalArgumentException(ex.toString()); - } - } - - - /** - * Nulls all fields of this message so that the message can be reused. Removes all headers from the - * hashmap, but keeps the hashmap - */ - public void reset() { - dest_addr=src_addr=null; - setBuffer(null); - headers.clear(); - } - - /*---------------------- Used by protocol layers ----------------------*/ - - /** Puts a header given a key into the hashmap. Overwrites potential existing entry. */ - public void putHeader(String key, Header hdr) { - headers.put(key, hdr); - } - - public Header removeHeader(String key) { - return (Header)headers.remove(key); - } - - public void removeHeaders() { - headers.clear(); - } - - public Header getHeader(String key) { - return (Header)headers.get(key); - } - /*---------------------------------------------------------------------*/ - - - public Message copy() { - return copy(true); - } - - /** - * Create a copy of the message. If offset and length are used (to refer to another buffer), the copy will - * contain only the subset offset and length point to, copying the subset into the new copy. - * @param copy_buffer - * @return - */ - public Message copy(boolean copy_buffer) { - Message retval=new Message(); - retval.dest_addr=dest_addr; - retval.src_addr=src_addr; - - if(copy_buffer && buf != null) { - - // change bela Feb 26 2004: we don't resolve the reference - retval.setBuffer(buf, offset, length); - } - - retval.headers=createHeaders(headers); - return retval; - } - - - protected Object clone() throws CloneNotSupportedException { - return copy(); - } - - public Message makeReply() { - return new Message(src_addr, null, null); - } - - - public String toString() { - StringBuffer ret=new StringBuffer(64); - ret.append("[dst: "); - if(dest_addr == null) - ret.append("<null>"); - else - ret.append(dest_addr); - ret.append(", src: "); - if(src_addr == null) - ret.append("<null>"); - else - ret.append(src_addr); - - int size; - if(headers != null && (size=headers.size()) > 0) - ret.append(" (" + size + " headers)"); - - ret.append(", size = "); - if(buf != null && length > 0) - ret.append(length); - else - ret.append('0'); - ret.append(" bytes"); - ret.append(']'); - return ret.toString(); - } - - - /** Tries to read an object from the message's buffer and prints it */ - public String toStringAsObject() { - Object obj; - - if(buf == null) return null; - try { - obj=getObject(); - return obj != null ? obj.toString() : ""; - } - catch(Exception e) { // it is not an object - return ""; - } - } - - - /** - * Returns size of buffer, plus some constant overhead for src and dest, plus number of headers time - * some estimated size/header. The latter is needed because we don't want to marshal all headers just - * to find out their size requirements. If a header implements Sizeable, the we can get the correct - * size.<p> Size estimations don't have to be very accurate since this is mainly used by FRAG to - * determine whether to fragment a message or not. Fragmentation will then serialize the message, - * therefore getting the correct value. - */ - public long size() { - long retval=Global.BYTE_SIZE // leading byte - + length // buffer - + (buf != null? Global.INT_SIZE : 0); // if buf != null 4 bytes for length - - if(dest_addr != null) - retval+=dest_addr.size(); - if(src_addr != null) - retval+=(src_addr).size(); - - Map.Entry entry; - String key; - Header hdr; - retval+=Global.INT_SIZE; // size (int) - for(Iterator it=headers.entrySet().iterator(); it.hasNext();) { - entry=(Map.Entry)it.next(); - key=(String)entry.getKey(); - retval+=key.length() +2; // not the same as writeUTF(), but almost - hdr=(Header)entry.getValue(); - retval+=5; // 1 for presence of magic number, 4 for magic number - retval+=hdr.size(); - } - return retval; - } - - - public String printObjectHeaders() { - StringBuffer sb=new StringBuffer(); - Map.Entry entry; - - if(headers != null) { - for(Iterator it=headers.entrySet().iterator(); it.hasNext();) { - entry=(Map.Entry)it.next(); - sb.append(entry.getKey()).append(": ").append(entry.getValue()).append('\n'); - } - } - return sb.toString(); - } - - - - /* ----------------------------------- Interface Externalizable ------------------------------- */ - - public void writeExternal(ObjectOutput out) throws IOException { - int len; - Externalizable hdr; - Map.Entry entry; - - if(dest_addr != null) { - out.writeBoolean(true); - Marshaller.write(dest_addr, out); - } - else { - out.writeBoolean(false); - } - - if(src_addr != null) { - out.writeBoolean(true); - Marshaller.write(src_addr, out); - } - else { - out.writeBoolean(false); - } - - if(buf == null) - out.writeInt(0); - else { - out.writeInt(length); - out.write(buf, offset, length); - } - - if(headers == null) - out.writeInt(0); - else { - len=headers.size(); - out.writeInt(len); - for(Iterator it=headers.entrySet().iterator(); it.hasNext();) { - entry=(Map.Entry)it.next(); - out.writeUTF((String)entry.getKey()); - hdr=(Externalizable)entry.getValue(); - Marshaller.write(hdr, out); - } - } - } - - - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - int len; - boolean destAddressExist=in.readBoolean(); - boolean srcAddressExist; - Object key, value; - - if(destAddressExist) { - dest_addr=(Address)Marshaller.read(in); - } - - srcAddressExist=in.readBoolean(); - if(srcAddressExist) { - src_addr=(Address)Marshaller.read(in); - } - - int i=in.readInt(); - if(i != 0) { - buf=new byte[i]; - in.readFully(buf); - offset=0; - length=buf.length; - } - - len=in.readInt(); - if(len > 0) headers=createHeaders(len); - while(len-- > 0) { - key=in.readUTF(); - value=Marshaller.read(in); - headers.put(key, value); - } - } - - /* --------------------------------- End of Interface Externalizable ----------------------------- */ - - - /* ----------------------------------- Interface Streamable ------------------------------- */ - - /** - * Streams all members (dest and src addresses, buffer and headers to the output stream - * @param outstream - * @throws IOException - */ - public void writeTo(DataOutputStream out) throws IOException { - Map.Entry entry; - - byte leading=0; - if(dest_addr != null) { - leading+=DEST_SET; - if(dest_addr instanceof IpAddress) - leading+=IPADDR_DEST; - } - if(src_addr != null) { - leading+=SRC_SET; - if(src_addr instanceof IpAddress) { - leading+=IPADDR_SRC; - if(((IpAddress)src_addr).getIpAddress() == null) { - leading+=SRC_HOST_NULL; - } - } - } - if(buf != null) - leading+=BUF_SET; - if(headers != null && headers.size() > 0) - leading+=HDRS_SET; - - // 1. write the leading byte first - out.write(leading); - - // 2. dest_addr - if(dest_addr != null) { - if(dest_addr instanceof IpAddress) - dest_addr.writeTo(out); - else - Util.writeAddress(dest_addr, out); - } - - // 3. src_addr - if(src_addr != null) { - if(src_addr instanceof IpAddress) { - src_addr.writeTo(out); - } - else { - Util.writeAddress(src_addr, out); - } - } - - // 4. buf - if(buf != null) { - out.writeInt(length); - out.write(buf, offset, length); - } - - // 5. headers - int size; - if(headers != null && (size=headers.size()) > 0) { - out.writeShort(size); - for(Iterator it=headers.entrySet().iterator(); it.hasNext();) { - entry=(Map.Entry)it.next(); - out.writeUTF((String)entry.getKey()); - writeHeader((Header)entry.getValue(), out); - } - } - } - - - public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException { - int len, leading; - String hdr_name; - Header hdr; - - - // 1. read the leading byte first - leading=in.readByte(); - - // 1. dest_addr - if((leading & DEST_SET) == DEST_SET) { - if((leading & IPADDR_DEST) == IPADDR_DEST) { - dest_addr=new IpAddress(); - dest_addr.readFrom(in); - } - else { - dest_addr=Util.readAddress(in); - } - } - - // 2. src_addr - if((leading & SRC_SET) == SRC_SET) { - if((leading & IPADDR_SRC) == IPADDR_SRC) { - src_addr=new IpAddress(); - src_addr.readFrom(in); - } - else { - src_addr=Util.readAddress(in); - } - } - - // 3. buf - if((leading & BUF_SET) == BUF_SET) { - len=in.readInt(); - buf=new byte[len]; - in.read(buf, 0, len); - length=len; - } - - // 4. headers - if((leading & HDRS_SET) == HDRS_SET) { - len=in.readShort(); - headers(len); - for(int i=0; i < len; i++) { - hdr_name=in.readUTF(); - hdr=readHeader(in); - headers.put(hdr_name, hdr); - } - } - } - - - - /* --------------------------------- End of Interface Streamable ----------------------------- */ - - - - /* ----------------------------------- Private methods ------------------------------- */ - - private Map headers(int len) { - return headers != null ? headers : (headers=createHeaders(len)); - } - - private void writeHeader(Header value, DataOutputStream out) throws IOException { - int magic_number; - String classname; - ObjectOutputStream oos=null; - try { - magic_number=ClassConfigurator.getInstance(false).getMagicNumber(value.getClass()); - // write the magic number or the class name - if(magic_number == -1) { - out.writeBoolean(false); - classname=value.getClass().getName(); - out.writeUTF(classname); - } - else { - out.writeBoolean(true); - out.writeInt(magic_number); - } - - // write the contents - if(value instanceof Streamable) { - ((Streamable)value).writeTo(out); - } - else { - oos=new ObjectOutputStream(out); - value.writeExternal(oos); - if(!nonStreamableHeaders.contains(value.getClass())) { - nonStreamableHeaders.add(value.getClass()); - if(log.isTraceEnabled()) - log.trace("encountered non-Streamable header: " + value.getClass()); - } - } - } - catch(ChannelException e) { - log.error("failed writing the header", e); - } - finally { - if(oos != null) - oos.close(); - } - } - - - private Header readHeader(DataInputStream in) throws IOException { - Header hdr; - boolean use_magic_number=in.readBoolean(); - int magic_number; - String classname; - Class clazz; - ObjectInputStream ois=null; - - try { - if(use_magic_number) { - magic_number=in.readInt(); - clazz=ClassConfigurator.getInstance(false).get(magic_number); - } - else { - classname=in.readUTF(); - clazz=ClassConfigurator.getInstance(false).get(classname); - } - hdr=(Header)clazz.newInstance(); - if(hdr instanceof Streamable) { - ((Streamable)hdr).readFrom(in); - } - else { - ois=new ObjectInputStream(in); - hdr.readExternal(ois); - } - } - catch(Exception ex) { - throw new IOException("failed read header: " + ex.toString()); - } - finally { - if(ois != null) - ois.close(); - } - - return hdr; - } - - private Map createHeaders(int size) { - return new HashMap(size); - } - - - private Map createHeaders(Map m) { - return new HashMap(m); - } - - /* ------------------------------- End of Private methods ---------------------------- */ - - - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/MessageListener.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/MessageListener.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/MessageListener.java deleted file mode 100644 index 89d90c4..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/MessageListener.java +++ /dev/null @@ -1,34 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -// $Id: MessageListener.java,v 1.2 2005/07/17 11:38:05 chrislott Exp $ - -package com.gemstone.org.jgroups; - -/** - * Allows a listener to be notified when a message arrives. - * Contrary to the pull-style of channels, some building blocks - * (e.g., {@link com.gemstone.org.jgroups.blocks.PullPushAdapter}) provide an - * event-like, push-style message delivery model. - * In this case, the entity to be notified of message reception needs to - * provide a callback to be invoked whenever a message has been received. - * The MessageListener interface provides a method to do so. - */ -public interface MessageListener { - /** - * Called when a message is received. - * @param msg - */ - void receive(Message msg); - /** - * Answers the group state; e.g., when joining. - * @return byte[] - */ - byte[] getState(); - /** - * Sets the group state; e.g., when joining. - * @param state - */ - void setState(byte[] state); -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/Receiver.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/Receiver.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/Receiver.java deleted file mode 100644 index 93a9d59..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/Receiver.java +++ /dev/null @@ -1,13 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -package com.gemstone.org.jgroups; - -/** - * Defines the callbacks that are invoked when messages, views etc are received on a channel - * @author Bela Ban - * @version $Id: Receiver.java,v 1.1 2005/11/08 10:40:16 belaban Exp $ - */ -public interface Receiver extends MessageListener, MembershipListener { -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/ReceiverAdapter.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/ReceiverAdapter.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/ReceiverAdapter.java deleted file mode 100644 index e8b78da..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/ReceiverAdapter.java +++ /dev/null @@ -1,35 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -package com.gemstone.org.jgroups; - -/** - * @author Bela Ban - * @version $Id: ReceiverAdapter.java,v 1.1 2005/11/08 10:43:38 belaban Exp $ - */ -public class ReceiverAdapter implements Receiver { - - public void receive(Message msg) { - } - - public byte[] getState() { - return null; - } - - public void setState(byte[] state) { - } - - public void viewAccepted(View new_view) { - } - - public void suspect(SuspectMember suspected_mbr) { - } - - public void block() { - } - - public void channelClosing(Channel c, Exception e) {} // GemStoneAddition - - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/SetStateEvent.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/SetStateEvent.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/SetStateEvent.java deleted file mode 100644 index 8b3a25d..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/SetStateEvent.java +++ /dev/null @@ -1,37 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -// $Id: SetStateEvent.java,v 1.4 2005/07/17 11:38:05 chrislott Exp $ - -package com.gemstone.org.jgroups; - - - - - - -/** - * Encapsulates a state returned by <code>Channel.receive()</code>, as requested by - * <code>Channel.getState(s)</code> previously. State could be a single state (as requested by - * <code>Channel.getState()</code>) or a vector of states (as requested by - * <code>Channel.getStates()</code>). - * @author Bela Ban - */ -public class SetStateEvent { - byte[] state=null; // state - - - public SetStateEvent(byte[] state) { - this.state=state; - } - - public byte[] getArg() {return state;} - - @Override // GemStoneAddition - public String toString() {return "SetStateEvent[state=" + -// state - (state == null ? "null" : "(" + state.length + " bytes)") // GemStoneAddition - + ']';} - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/ShunnedAddressException.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/ShunnedAddressException.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/ShunnedAddressException.java deleted file mode 100755 index b16ab7f..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/ShunnedAddressException.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.gemstone.org.jgroups; - -/** - * GemStoneAddition - connection attempt failed due to reuse - * of IpAddress. This can happen on Windows & less frequently - * on Unix - * - * @author bruce - * - */ -public class ShunnedAddressException extends RuntimeException -{ - private static final long serialVersionUID = 6638258566493306758L; -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/SuspectEvent.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/SuspectEvent.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/SuspectEvent.java deleted file mode 100644 index 407c87a..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/SuspectEvent.java +++ /dev/null @@ -1,26 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -// $Id: SuspectEvent.java,v 1.3 2005/07/17 11:38:05 chrislott Exp $ - -package com.gemstone.org.jgroups; - -/** - * Represents a suspect event. - * Gives access to the suspected member. - */ -public class SuspectEvent { - final Object suspected_mbr; - final Object who_suspected; // GemStoneAddition - - public SuspectEvent(SuspectMember suspected_mbr) { - this.suspected_mbr=suspected_mbr.suspectedMember; - this.who_suspected=suspected_mbr.whoSuspected; - } - - public Object getMember() {return suspected_mbr;} - public Object getSuspector() { return who_suspected; } // GemStoneAddition - @Override // GemStoneAddition - public String toString() {return "SuspectEvent";} -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/SuspectMember.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/SuspectMember.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/SuspectMember.java deleted file mode 100644 index 5ff5db6..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/SuspectMember.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - */ -package com.gemstone.org.jgroups; - -/** - * GemStoneAddition. This class is used in SUSPECT_WITH_ORIGIN events to - * hold both the suspected members and the origin of suspicion - * - * @author bruce - */ -public class SuspectMember -{ - /** the source of suspicion */ - public Address whoSuspected; - - /** suspected member */ - public Address suspectedMember; - - /** create a new SuspectMember */ - public SuspectMember(Address whoSuspected, Address suspectedMember) { - this.whoSuspected = whoSuspected; - this.suspectedMember = suspectedMember; - } - - @Override // GemStoneAddition - public String toString() { - return "{source="+whoSuspected+"; suspect="+suspectedMember+"}"; - } - - @Override // GemStoneAddition - public int hashCode() { - return this.suspectedMember.hashCode(); - } - - @Override // GemStoneAddition - public boolean equals(Object other) { - if ( !(other instanceof SuspectMember) ) { - return false; - } - return this.suspectedMember.equals(((SuspectMember)other).suspectedMember); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/SuspectedException.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/SuspectedException.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/SuspectedException.java deleted file mode 100644 index 32bce8d..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/SuspectedException.java +++ /dev/null @@ -1,21 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -// $Id: SuspectedException.java,v 1.2 2005/07/17 11:38:05 chrislott Exp $ - -package com.gemstone.org.jgroups; - -/** - * Thrown if a message is sent to a suspected member. - */ -public class SuspectedException extends Exception { -private static final long serialVersionUID = -7362834003171175180L; - Object suspect=null; - - public SuspectedException() {} - public SuspectedException(Object suspect) {this.suspect=suspect;} - - @Override // GemStoneAddition - public String toString() {return "SuspectedException";} -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/TimeoutException.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/TimeoutException.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/TimeoutException.java deleted file mode 100644 index 0f65a99..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/TimeoutException.java +++ /dev/null @@ -1,43 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -// $Id: TimeoutException.java,v 1.3 2005/07/17 11:38:05 chrislott Exp $ - -package com.gemstone.org.jgroups; - -import java.util.List; - - -/** - * Thrown if members fail to respond in time. - */ -public class TimeoutException extends Exception { -private static final long serialVersionUID = 4078270477623063306L; - List failed_mbrs=null; // members that failed responding - - public TimeoutException() { - super("TimeoutException"); - } - - public TimeoutException(String msg) { - super(msg); - } - - public TimeoutException(List failed_mbrs) { - super("TimeoutException"); - this.failed_mbrs=failed_mbrs; - } - - - @Override // GemStoneAddition - public String toString() { - StringBuffer sb=new StringBuffer(); - - sb.append(super.toString()); - - if(failed_mbrs != null && failed_mbrs.size() > 0) - sb.append(" (failed members: ").append(failed_mbrs); - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/Transport.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/Transport.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/Transport.java deleted file mode 100644 index 42af6d1..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/Transport.java +++ /dev/null @@ -1,22 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -// $Id: Transport.java,v 1.2 2005/07/17 11:38:05 chrislott Exp $ - -package com.gemstone.org.jgroups; - -/** - * Defines a very small subset of the functionality of a channel, - * essentially only the methods for sending and receiving messages. - * Many building blocks require nothing else than a - * bare-bones facility to send and receive messages; therefore the Transport - * interface was created. It increases the genericness and portability of - * building blocks: being so simple, the Transport interface can easily be - * ported to a different toolkit, without requiring any modifications to - * building blocks. - */ -public interface Transport { - void send(Message msg) throws Exception; - Object receive(long timeout) throws Exception; -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/UpHandler.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/UpHandler.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/UpHandler.java deleted file mode 100644 index a5f605c..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/UpHandler.java +++ /dev/null @@ -1,18 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -// $Id: UpHandler.java,v 1.2 2005/07/17 11:38:05 chrislott Exp $ - -package com.gemstone.org.jgroups; - -/** - * Provides a way of taking over a channel's tasks. - */ -public interface UpHandler { - /** - * Invoked for all channel events except connection management and state transfer. - * @param evt - */ - void up(Event evt); -}