Update of /cvsroot/freenet/Freenet0.7Rewrite/src/freenet/io/comm
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv7319/src/freenet/io/comm
Modified Files:
Message.java DMT.java
Log Message:
Build 136:
Initial publish/subscribe support.
Publish works. Subscribe requests are not yet routed; we just subscribe
locally, if the stream happens to pass through us.
Index: Message.java
===================================================================
RCS file: /cvsroot/freenet/Freenet0.7Rewrite/src/freenet/io/comm/Message.java,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -w -r1.10 -r1.11
--- Message.java 25 Aug 2005 17:28:19 -0000 1.10
+++ Message.java 15 Sep 2005 18:16:04 -0000 1.11
@@ -68,7 +68,7 @@
}
}
} catch (EOFException e) {
- Logger.normal(Message.class,"Message packet ends
prematurely while deserialising "+mspec.getName());
+ Logger.normal(Message.class,"Message packet ends
prematurely while deserialising "+mspec.getName(), e);
return null;
} catch (IOException e) {
Logger.error(Message.class, "WTF?: "+e+" reading from
buffer stream", e);
Index: DMT.java
===================================================================
RCS file: /cvsroot/freenet/Freenet0.7Rewrite/src/freenet/io/comm/DMT.java,v
retrieving revision 1.17
retrieving revision 1.18
diff -u -w -r1.17 -r1.18
--- DMT.java 7 Sep 2005 15:22:16 -0000 1.17
+++ DMT.java 15 Sep 2005 18:16:04 -0000 1.18
@@ -23,6 +23,7 @@
import java.util.List;
import freenet.keys.Key;
+import freenet.keys.PublishStreamKey;
import freenet.support.BitArray;
import freenet.support.Buffer;
import freenet.support.ShortBuffer;
@@ -77,6 +78,7 @@
public static final String RETURN_LOCATION = "returnLocation";
public static final String BLOCK_HEADERS = "blockHeaders";
public static final String DATA_INSERT_REJECTED_REASON =
"dataInsertRejectedReason";
+ public static final String STREAM_SEQNO = "streamSequenceNumber";
//Diagnostic
public static final MessageType ping = new MessageType("ping") {{
@@ -774,6 +776,62 @@
return msg;
}
+ public static final MessageType FNPPublishData = new
MessageType("FNPPublishData") {{
+ addField(UID, Long.class);
+ addField(HTL, Short.class);
+ addField(DATA, ShortBuffer.class);
+ addField(KEY, PublishStreamKey.class);
+ addField(STREAM_SEQNO, Long.class);
+ addField(NEAREST_LOCATION, Double.class);
+ }};
+
+ public static final Message createFNPPublishData(short htl, byte[] data,
PublishStreamKey key, long seqNo, long id, double nearestLoc) {
+ Message msg = new Message(FNPPublishData);
+ msg.set(HTL, htl);
+ msg.set(KEY, key);
+ msg.set(DATA, new ShortBuffer(data));
+ msg.set(STREAM_SEQNO, seqNo);
+ msg.set(UID, id);
+ msg.set(NEAREST_LOCATION, nearestLoc);
+ return msg;
+ }
+
+ public static final MessageType FNPPublishDataSucceeded = new
MessageType("FNPPublishDataSucceeded") {{
+ addField(UID, Long.class);
+ }};
+
+ public static final Message createFNPPublishDataSucceeded(long id) {
+ Message msg = new Message(FNPPublishDataSucceeded);
+ msg.set(UID, id);
+ return msg;
+ }
+
+ public static final MessageType FNPPublishDataInvalid = new
MessageType("FNPPublishDataInvalid") {{
+ addField(UID, Long.class);
+ }};
+
+ public static final Message createFNPPublishDataInvalid(long id) {
+ Message msg = new Message(FNPPublishDataInvalid);
+ msg.set(UID, id);
+ return msg;
+ }
+
+// public static final MessageType FNPSubscribeRequest = new
MessageType("FNPSubscribeRequest") {{
+// addField(UID, Long.class);
+// addField(HTL, Short.class);
+// addField(KEY, PublishStreamKey.class);
+// addField(STREAM_SEQNO, Long.class);
+// }};
+//
+// public static final Message createFNPSubscribeRequest(long uid, short
htl, PublishStreamKey key, long seqNo) {
+// Message msg = new Message(FNPSubscribeRequest);
+// msg.set(UID, uid);
+// msg.set(HTL, htl);
+// msg.set(KEY, key);
+// msg.set(STREAM_SEQNO, seqNo);
+// return msg;
+// }
+//
public static void init() { }
}