Author: toad
Date: 2008-01-12 21:54:21 +0000 (Sat, 12 Jan 2008)
New Revision: 17035
Modified:
trunk/freenet/src/freenet/node/FNPPacketMangler.java
trunk/freenet/src/freenet/node/KeyTracker.java
trunk/freenet/src/freenet/node/MessageItem.java
trunk/freenet/src/freenet/node/OutgoingPacketMangler.java
trunk/freenet/src/freenet/node/PeerNode.java
trunk/freenet/src/freenet/node/ResendPacketItem.java
trunk/freenet/src/freenet/support/LimitedRangeIntByteArrayMap.java
trunk/freenet/src/freenet/support/LimitedRangeIntByteArrayMapElement.java
Log:
MessageItem.msg may be null. We need to pass around the priority explicitly...
Modified: trunk/freenet/src/freenet/node/FNPPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/FNPPacketMangler.java 2008-01-12
21:24:29 UTC (rev 17034)
+++ trunk/freenet/src/freenet/node/FNPPacketMangler.java 2008-01-12
21:54:21 UTC (rev 17035)
@@ -27,6 +27,7 @@
import freenet.crypt.UnsupportedCipherException;
import freenet.crypt.ciphers.Rijndael;
import freenet.io.comm.AsyncMessageCallback;
+import freenet.io.comm.DMT;
import freenet.io.comm.FreenetInetAddress;
import freenet.io.comm.IncomingPacketFilter;
import freenet.io.comm.Message;
@@ -1982,7 +1983,7 @@
return;
}
int packetNumber =
kt.allocateOutgoingPacketNumberNeverBlock();
- this.processOutgoingPreformatted(buf,
0, buf.length, kt, packetNumber, mi.cb, mi.alreadyReportedBytes);
+ this.processOutgoingPreformatted(buf,
0, buf.length, kt, packetNumber, mi.cb, mi.alreadyReportedBytes,
mi.getPriority());
//MARK: onSent()
mi.onSent(buf.length +
fullHeadersLengthOneMessage);
} catch (NotConnectedException e) {
@@ -2045,6 +2046,7 @@
AsyncMessageCallback callbacks[] = new
AsyncMessageCallback[callbacksCount];
x=0;
int alreadyReportedBytes = 0;
+ short priority = DMT.PRIORITY_BULK_DATA;
for(int i=0;i<messages.length;i++) {
if(messages[i].formatted) continue;
if(messages[i].cb != null) {
@@ -2052,6 +2054,8 @@
System.arraycopy(messages[i].cb, 0, callbacks,
x, messages[i].cb.length);
x += messages[i].cb.length;
}
+ short messagePrio = messages[i].getPriority();
+ if(messagePrio < priority) priority = messagePrio;
}
if(x != callbacksCount) throw new IllegalStateException();
@@ -2059,7 +2063,7 @@
(messageData.length < 256)) {
mi_name = null;
try {
- innerProcessOutgoing(messageData, 0,
messageData.length, length, pn, neverWaitForPacketNumber, callbacks,
alreadyReportedBytes);
+ innerProcessOutgoing(messageData, 0,
messageData.length, length, pn, neverWaitForPacketNumber, callbacks,
alreadyReportedBytes, priority);
for(int i=0;i<messageData.length;i++) {
MessageItem mi = newMsgs[i];
mi_name = (mi.msg == null ? "(not a
Message)" : mi.msg.getSpec().getName());
@@ -2107,7 +2111,8 @@
if(lastIndex != i) {
mi_name = null;
try {
-
innerProcessOutgoing(messageData, lastIndex, i-lastIndex, length, pn,
neverWaitForPacketNumber, callbacks, alreadyReportedBytes);
+ // FIXME regenerate
callbacks and priority!
+
innerProcessOutgoing(messageData, lastIndex, i-lastIndex, length, pn,
neverWaitForPacketNumber, callbacks, alreadyReportedBytes, priority);
for(int
j=lastIndex;j<i;j++) {
MessageItem mi
= newMsgs[j];
mi_name =
(mi.msg == null ? "(not a Message)" : mi.msg.getSpec().getName());
@@ -2155,7 +2160,7 @@
* @throws PacketSequenceException
*/
private void innerProcessOutgoing(byte[][] messageData, int start, int
length, int bufferLength,
- PeerNode pn, boolean neverWaitForPacketNumber,
AsyncMessageCallback[] callbacks, int alreadyReportedBytes) throws
NotConnectedException, WouldBlockException, PacketSequenceException {
+ PeerNode pn, boolean neverWaitForPacketNumber,
AsyncMessageCallback[] callbacks, int alreadyReportedBytes, short priority)
throws NotConnectedException, WouldBlockException, PacketSequenceException {
if(logMINOR) Logger.minor(this,
"innerProcessOutgoing(...,"+start+ ',' +length+ ',' +bufferLength+ ')');
byte[] buf = new byte[bufferLength];
buf[0] = (byte)length;
@@ -2168,15 +2173,15 @@
System.arraycopy(data, 0, buf, loc, len);
loc += len;
}
- processOutgoingPreformatted(buf, 0, loc, pn,
neverWaitForPacketNumber, callbacks, alreadyReportedBytes);
+ processOutgoingPreformatted(buf, 0, loc, pn,
neverWaitForPacketNumber, callbacks, alreadyReportedBytes, priority);
}
/* (non-Javadoc)
* @see freenet.node.OutgoingPacketMangler#processOutgoing(byte[], int,
int, freenet.node.KeyTracker, int)
*/
- public void processOutgoing(byte[] buf, int offset, int length,
KeyTracker tracker, int alreadyReportedBytes) throws KeyChangedException,
NotConnectedException, PacketSequenceException, WouldBlockException {
+ public void processOutgoing(byte[] buf, int offset, int length,
KeyTracker tracker, int alreadyReportedBytes, short priority) throws
KeyChangedException, NotConnectedException, PacketSequenceException,
WouldBlockException {
byte[] newBuf = preformat(buf, offset, length);
- processOutgoingPreformatted(newBuf, 0, newBuf.length, tracker,
-1, null, alreadyReportedBytes);
+ processOutgoingPreformatted(newBuf, 0, newBuf.length, tracker,
-1, null, alreadyReportedBytes, priority);
}
/**
@@ -2184,7 +2189,7 @@
* the key changes.
* @throws PacketSequenceException
*/
- void processOutgoingPreformatted(byte[] buf, int offset, int length,
PeerNode peer, boolean neverWaitForPacketNumber, AsyncMessageCallback[]
callbacks, int alreadyReportedBytes) throws NotConnectedException,
WouldBlockException, PacketSequenceException {
+ void processOutgoingPreformatted(byte[] buf, int offset, int length,
PeerNode peer, boolean neverWaitForPacketNumber, AsyncMessageCallback[]
callbacks, int alreadyReportedBytes, short priority) throws
NotConnectedException, WouldBlockException, PacketSequenceException {
KeyTracker last = null;
while(true) {
try {
@@ -2198,7 +2203,7 @@
}
int seqNo = neverWaitForPacketNumber ?
tracker.allocateOutgoingPacketNumberNeverBlock() :
tracker.allocateOutgoingPacketNumber();
- processOutgoingPreformatted(buf, offset,
length, tracker, seqNo, callbacks, alreadyReportedBytes);
+ processOutgoingPreformatted(buf, offset,
length, tracker, seqNo, callbacks, alreadyReportedBytes, priority);
return;
} catch (KeyChangedException e) {
Logger.normal(this, "Key changed(2) for
"+peer.getPeer());
@@ -2231,7 +2236,7 @@
/* (non-Javadoc)
* @see
freenet.node.OutgoingPacketMangler#processOutgoingPreformatted(byte[], int,
int, freenet.node.KeyTracker, int, freenet.node.AsyncMessageCallback[], int)
*/
- public void processOutgoingPreformatted(byte[] buf, int offset, int
length, KeyTracker tracker, int packetNumber, AsyncMessageCallback[] callbacks,
int alreadyReportedBytes) throws KeyChangedException, NotConnectedException,
PacketSequenceException, WouldBlockException {
+ public void processOutgoingPreformatted(byte[] buf, int offset, int
length, KeyTracker tracker, int packetNumber, AsyncMessageCallback[] callbacks,
int alreadyReportedBytes, short priority) throws KeyChangedException,
NotConnectedException, PacketSequenceException, WouldBlockException {
if(logMINOR) {
String log =
"processOutgoingPreformatted("+Fields.hashCode(buf)+", "+offset+ ',' +length+
',' +tracker+ ',' +packetNumber+ ',';
if(callbacks == null) log += "null";
@@ -2436,7 +2441,7 @@
if(seqNumber != -1) {
byte[] saveable = new byte[length];
System.arraycopy(buf, offset, saveable, 0, length);
- tracker.sentPacket(saveable, seqNumber, callbacks);
+ tracker.sentPacket(saveable, seqNumber, callbacks,
priority);
}
if(logMINOR) Logger.minor(this, "Sending... "+seqNumber);
@@ -2545,7 +2550,7 @@
}
public void resend(ResendPacketItem item) throws
PacketSequenceException, WouldBlockException, KeyChangedException,
NotConnectedException {
- processOutgoingPreformatted(item.buf, 0, item.buf.length,
item.kt, item.packetNumber, item.callbacks, 0);
+ processOutgoingPreformatted(item.buf, 0, item.buf.length,
item.kt, item.packetNumber, item.callbacks, 0, item.priority);
}
public int[] supportedNegTypes() {
Modified: trunk/freenet/src/freenet/node/KeyTracker.java
===================================================================
--- trunk/freenet/src/freenet/node/KeyTracker.java 2008-01-12 21:24:29 UTC
(rev 17034)
+++ trunk/freenet/src/freenet/node/KeyTracker.java 2008-01-12 21:54:21 UTC
(rev 17035)
@@ -10,6 +10,7 @@
import freenet.crypt.BlockCipher;
import freenet.io.comm.AsyncMessageCallback;
+import freenet.io.comm.DMT;
import freenet.io.comm.NotConnectedException;
import freenet.io.xfer.PacketThrottle;
import freenet.support.DoublyLinkedList;
@@ -950,14 +951,14 @@
* @param seqNumber The packet number.
* @throws NotConnectedException
*/
- public void sentPacket(byte[] data, int seqNumber, AsyncMessageCallback[]
callbacks) throws NotConnectedException {
+ public void sentPacket(byte[] data, int seqNumber, AsyncMessageCallback[]
callbacks, short priority) throws NotConnectedException {
if(callbacks != null) {
for(int i=0;i<callbacks.length;i++) {
if(callbacks[i] == null)
throw new NullPointerException();
}
}
- sentPacketsContents.add(seqNumber, data, callbacks);
+ sentPacketsContents.add(seqNumber, data, callbacks, priority);
try {
queueAckRequest(seqNumber);
} catch (UpdatableSortedLinkedListKilledException e) {
@@ -1007,7 +1008,7 @@
AsyncMessageCallback[] callbacks = element.callbacks;
// Ignore packet#
if(logMINOR) Logger.minor(this, "Queueing resend of what was once
"+element.packetNumber);
- messages[i] = new MessageItem(buf, callbacks, true, 0, null);
+ messages[i] = new MessageItem(buf, callbacks, true, 0, null,
element.priority);
}
pn.requeueMessageItems(messages, 0, messages.length, true);
@@ -1069,7 +1070,8 @@
continue; // acked already?
}
AsyncMessageCallback[] callbacks =
sentPacketsContents.getCallbacks(packetNo);
- rpiTemp.add(new ResendPacketItem(buf, packetNo, this, callbacks));
+ short priority = sentPacketsContents.getPriority(packetNo,
DMT.PRIORITY_BULK_DATA);
+ rpiTemp.add(new ResendPacketItem(buf, packetNo, this, callbacks,
priority));
}
if(rpiTemp.isEmpty()) return null;
return numbers;
Modified: trunk/freenet/src/freenet/node/MessageItem.java
===================================================================
--- trunk/freenet/src/freenet/node/MessageItem.java 2008-01-12 21:24:29 UTC
(rev 17034)
+++ trunk/freenet/src/freenet/node/MessageItem.java 2008-01-12 21:54:21 UTC
(rev 17035)
@@ -21,6 +21,7 @@
*/
final boolean formatted;
final ByteCounter ctrCallback;
+ private final short priority;
public MessageItem(Message msg2, AsyncMessageCallback[] cb2, int
alreadyReportedBytes, ByteCounter ctr) {
this.alreadyReportedBytes = alreadyReportedBytes;
@@ -30,9 +31,10 @@
formatted = false;
this.ctrCallback = ctr;
this.submitted = System.currentTimeMillis();
+ priority = msg2.getSpec().getPriority();
}
- public MessageItem(byte[] data, AsyncMessageCallback[] cb2, boolean
formatted, int alreadyReportedBytes, ByteCounter ctr) {
+ public MessageItem(byte[] data, AsyncMessageCallback[] cb2, boolean
formatted, int alreadyReportedBytes, ByteCounter ctr, short priority) {
this.alreadyReportedBytes = alreadyReportedBytes;
this.cb = cb2;
this.msg = null;
@@ -42,6 +44,7 @@
throw new NullPointerException();
this.ctrCallback = ctr;
this.submitted = System.currentTimeMillis();
+ this.priority = priority;
}
/**
@@ -74,6 +77,6 @@
}
public short getPriority() {
- return msg.getSpec().getPriority();
+ return priority;
}
}
Modified: trunk/freenet/src/freenet/node/OutgoingPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/OutgoingPacketMangler.java 2008-01-12
21:24:29 UTC (rev 17034)
+++ trunk/freenet/src/freenet/node/OutgoingPacketMangler.java 2008-01-12
21:54:21 UTC (rev 17035)
@@ -40,7 +40,7 @@
* @throws WouldBlockException
*/
public void processOutgoing(byte[] buf, int offset, int length,
- KeyTracker tracker, int alreadyReportedBytes)
+ KeyTracker tracker, int alreadyReportedBytes, short
priority)
throws KeyChangedException, NotConnectedException,
PacketSequenceException, WouldBlockException;
@@ -62,7 +62,7 @@
*/
public void processOutgoingPreformatted(byte[] buf, int offset, int
length,
KeyTracker tracker, int packetNumber,
- AsyncMessageCallback[] callbacks, int
alreadyReportedBytes)
+ AsyncMessageCallback[] callbacks, int
alreadyReportedBytes, short priority)
throws KeyChangedException, NotConnectedException,
PacketSequenceException, WouldBlockException;
Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java 2008-01-12 21:24:29 UTC
(rev 17034)
+++ trunk/freenet/src/freenet/node/PeerNode.java 2008-01-12 21:54:21 UTC
(rev 17035)
@@ -2213,7 +2213,7 @@
long t = tracker.getNextUrgentTime();
if(t < now || forceSendPrimary) {
try {
- outgoingMangler.processOutgoing(null,
0, 0, tracker, 0);
+ outgoingMangler.processOutgoing(null,
0, 0, tracker, 0, DMT.PRIORITY_NOW);
} catch(NotConnectedException e) {
// Ignore
} catch(KeyChangedException e) {
@@ -2228,7 +2228,7 @@
long t = tracker.getNextUrgentTime();
if(t < now)
try {
- outgoingMangler.processOutgoing(null,
0, 0, tracker, 0);
+ outgoingMangler.processOutgoing(null,
0, 0, tracker, 0, DMT.PRIORITY_NOW);
} catch(NotConnectedException e) {
// Ignore
} catch(KeyChangedException e) {
@@ -2412,7 +2412,7 @@
Logger.error(this, "No tracker to resend packet
" + item.packetNumber + " on");
continue;
}
- MessageItem mi = new MessageItem(item.buf,
item.callbacks, true, 0, null);
+ MessageItem mi = new MessageItem(item.buf,
item.callbacks, true, 0, null, item.priority);
requeueMessageItems(new MessageItem[]{mi}, 0, 1, true);
}
}
Modified: trunk/freenet/src/freenet/node/ResendPacketItem.java
===================================================================
--- trunk/freenet/src/freenet/node/ResendPacketItem.java 2008-01-12
21:24:29 UTC (rev 17034)
+++ trunk/freenet/src/freenet/node/ResendPacketItem.java 2008-01-12
21:54:21 UTC (rev 17035)
@@ -10,16 +10,18 @@
* message as byte[].
*/
public class ResendPacketItem {
- public ResendPacketItem(byte[] payload, int packetNumber, KeyTracker k,
AsyncMessageCallback[] callbacks) {
+ public ResendPacketItem(byte[] payload, int packetNumber, KeyTracker k,
AsyncMessageCallback[] callbacks, short priority) {
pn = k.pn;
kt = k;
buf = payload;
this.packetNumber = packetNumber;
this.callbacks = callbacks;
+ this.priority = priority;
}
final PeerNode pn;
final KeyTracker kt;
final byte[] buf;
final int packetNumber;
- final AsyncMessageCallback[] callbacks;
+ final AsyncMessageCallback[] callbacks;
+ final short priority;
}
\ No newline at end of file
Modified: trunk/freenet/src/freenet/support/LimitedRangeIntByteArrayMap.java
===================================================================
--- trunk/freenet/src/freenet/support/LimitedRangeIntByteArrayMap.java
2008-01-12 21:24:29 UTC (rev 17034)
+++ trunk/freenet/src/freenet/support/LimitedRangeIntByteArrayMap.java
2008-01-12 21:54:21 UTC (rev 17035)
@@ -66,6 +66,14 @@
else return -1;
}
+ public short getPriority(int index, short defaultValue) {
+ Integer i = new Integer(index);
+ LimitedRangeIntByteArrayMapElement wrapper =
(LimitedRangeIntByteArrayMapElement) contents.get(i);
+ if(wrapper != null)
+ return wrapper.priority;
+ else return defaultValue;
+ }
+
/**
* Get the time at which an index was re-added last.
*/
@@ -82,7 +90,7 @@
* @return True if we succeeded, false if the index was out
* of range.
*/
- public synchronized boolean add(int index, byte[] data,
AsyncMessageCallback[] callbacks) {
+ public synchronized boolean add(int index, byte[] data,
AsyncMessageCallback[] callbacks, short priority) {
logMINOR = Logger.shouldLog(Logger.MINOR, this);
if(logMINOR) Logger.minor(this, toString()+" add "+index);
if(maxValue == -1) {
@@ -103,7 +111,7 @@
Integer i = new Integer(index);
LimitedRangeIntByteArrayMapElement le =
(LimitedRangeIntByteArrayMapElement) contents.get(i);
if(le == null)
- contents.put(new Integer(index), new
LimitedRangeIntByteArrayMapElement(index, data, callbacks));
+ contents.put(new Integer(index), new
LimitedRangeIntByteArrayMapElement(index, data, callbacks, priority));
else
le.reput();
notifyAll();
Modified:
trunk/freenet/src/freenet/support/LimitedRangeIntByteArrayMapElement.java
===================================================================
--- trunk/freenet/src/freenet/support/LimitedRangeIntByteArrayMapElement.java
2008-01-12 21:24:29 UTC (rev 17034)
+++ trunk/freenet/src/freenet/support/LimitedRangeIntByteArrayMapElement.java
2008-01-12 21:54:21 UTC (rev 17035)
@@ -5,10 +5,11 @@
public class LimitedRangeIntByteArrayMapElement {
- public LimitedRangeIntByteArrayMapElement(int packetNumber, byte[] data2,
AsyncMessageCallback[] callbacks2) {
+ public LimitedRangeIntByteArrayMapElement(int packetNumber, byte[] data2,
AsyncMessageCallback[] callbacks2, short priority) {
this.packetNumber = packetNumber;
this.data = data2;
this.callbacks = callbacks2;
+ this.priority = priority;
createdTime = System.currentTimeMillis();
}
@@ -16,6 +17,7 @@
public final byte[] data;
public final AsyncMessageCallback[] callbacks;
public final long createdTime;
+ public final short priority;
long reputTime;
public void reput() {