Author: toad
Date: 2008-09-19 16:53:54 +0000 (Fri, 19 Sep 2008)
New Revision: 22707
Modified:
trunk/freenet/src/freenet/node/FNPPacketMangler.java
trunk/freenet/src/freenet/node/KeyTracker.java
trunk/freenet/src/freenet/node/OutgoingPacketMangler.java
trunk/freenet/src/freenet/node/PacketSender.java
trunk/freenet/src/freenet/node/PeerNode.java
Log:
Send one packet, and then go around the loop again.
If nobody else needs a packet, then send another packet.
This should be much fairer when one node has a severe backlog, or when sending
a packet is taking ages for some reason.
And it fits well with PacketSender-level throttling (coming soon).
Modified: trunk/freenet/src/freenet/node/FNPPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/FNPPacketMangler.java 2008-09-19
15:59:04 UTC (rev 22706)
+++ trunk/freenet/src/freenet/node/FNPPacketMangler.java 2008-09-19
16:53:54 UTC (rev 22707)
@@ -2000,7 +2000,7 @@
/* (non-Javadoc)
* @see
freenet.node.OutgoingPacketMangler#processOutgoingOrRequeue(freenet.node.MessageItem[],
freenet.node.PeerNode, boolean, boolean)
*/
- public void processOutgoingOrRequeue(MessageItem[] messages, PeerNode
pn, boolean neverWaitForPacketNumber, boolean dontRequeue) {
+ public void processOutgoingOrRequeue(MessageItem[] messages, PeerNode
pn, boolean neverWaitForPacketNumber, boolean dontRequeue, boolean onePacket) {
String requeueLogString = "";
if(!dontRequeue) {
requeueLogString = ", requeueing";
@@ -2192,6 +2192,10 @@
pn.requeueMessageItems(messages, lastIndex, messages.length - lastIndex, false,
"Throwable(3)");
return;
}
+ if(onePacket) {
+
pn.requeueMessageItems(messages, i, messageData.length - i, true, "Didn't fit
in single packet");
+ return;
+ }
}
lastIndex = i;
if(i != messageData.length) {
Modified: trunk/freenet/src/freenet/node/KeyTracker.java
===================================================================
--- trunk/freenet/src/freenet/node/KeyTracker.java 2008-09-19 15:59:04 UTC
(rev 22706)
+++ trunk/freenet/src/freenet/node/KeyTracker.java 2008-09-19 16:53:54 UTC
(rev 22707)
@@ -1054,6 +1054,12 @@
return null;
return numbers;
}
+
+ public boolean hasPacketsToResend() {
+ synchronized(packetsToResend) {
+ return !packetsToResend.isEmpty();
+ }
+ }
public boolean isDeprecated() {
return this.isDeprecated;
Modified: trunk/freenet/src/freenet/node/OutgoingPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/OutgoingPacketMangler.java 2008-09-19
15:59:04 UTC (rev 22706)
+++ trunk/freenet/src/freenet/node/OutgoingPacketMangler.java 2008-09-19
16:53:54 UTC (rev 22707)
@@ -24,9 +24,11 @@
* Build one or more packets and send them, from a whole bunch of
messages.
* If any MessageItem's are formatted already, they will be sent as
single packets.
* Any packets which cannot be sent will be requeued on the PeerNode.
+ * @param onePacketOnly If true, we will only send one packet, and will
requeue any
+ * messages that don't fit in that single packet.
*/
public void processOutgoingOrRequeue(MessageItem[] messages, PeerNode
pn,
- boolean neverWaitForPacketNumber, boolean dontRequeue);
+ boolean neverWaitForPacketNumber, boolean dontRequeue,
boolean onePacketOnly);
/**
* Resend a single packet.
Modified: trunk/freenet/src/freenet/node/PacketSender.java
===================================================================
--- trunk/freenet/src/freenet/node/PacketSender.java 2008-09-19 15:59:04 UTC
(rev 22706)
+++ trunk/freenet/src/freenet/node/PacketSender.java 2008-09-19 16:53:54 UTC
(rev 22707)
@@ -230,9 +230,9 @@
continue;
}
- pn.maybeSendSomething(now, rpiTemp, rpiIntTemp);
+ pn.maybeSendPacket(now, rpiTemp, rpiIntTemp);
- long urgentTime = pn.getNextUrgentTime();
+ long urgentTime = pn.getNextUrgentTime(now);
// Should spam the logs, unless there is a
deadlock
if(urgentTime < Long.MAX_VALUE && logMINOR)
Logger.minor(this, "Next urgent time: "
+ urgentTime + " for " + pn.getPeer());
Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java 2008-09-19 15:59:04 UTC
(rev 22706)
+++ trunk/freenet/src/freenet/node/PeerNode.java 2008-09-19 16:53:54 UTC
(rev 22707)
@@ -11,6 +11,7 @@
import java.net.MalformedURLException;
import java.net.UnknownHostException;
import java.security.MessageDigest;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@@ -1308,17 +1309,41 @@
* it means it will only contains ack requests etc., or
* Long.MAX_VALUE if we have no pending ack request/acks/etc.
*/
- public synchronized long getNextUrgentTime() {
+ public long getNextUrgentTime(long now) {
long t = Long.MAX_VALUE;
+ synchronized(this) {
KeyTracker kt = currentTracker;
- if(kt != null)
+ if(kt != null) {
t = Math.min(t, kt.getNextUrgentTime());
+ if(kt.hasPacketsToResend()) return now;
+ }
kt = previousTracker;
- if(kt != null)
+ if(kt != null) {
t = Math.min(t, kt.getNextUrgentTime());
+ if(kt.hasPacketsToResend()) return now;
+ }
+ }
+ synchronized(messagesToSendNow) {
+ for(LinkedList list : messagesToSendNow) {
+ if(list.isEmpty()) continue;
+ MessageItem item = (MessageItem)
list.getFirst();
+ t = Math.min(t, item.submitted +
PacketSender.MAX_COALESCING_DELAY);
+ }
+ }
return t;
}
-
+
+ private synchronized boolean mustSendNotificationsNow(long now) {
+ KeyTracker kt = currentTracker;
+ if(kt != null) {
+ if(kt.getNextUrgentTime() < now) return true;
+ }
+ kt = previousTracker;
+ if(kt != null)
+ if(kt.getNextUrgentTime() < now) return true;
+ return false;
+ }
+
/**
* @return The time at which we last sent a packet.
*/
@@ -4040,7 +4065,7 @@
public void maybeSendPacket(long now, Vector rpiTemp, int[] rpiIntTemp)
{
// If there are any urgent notifications, we must send a packet.
boolean mustSend = false;
- if(getNextUrgentTime() < now) {
+ if(mustSendNotificationsNow(now)) {
mustSend = true;
}
// Any packets to resend? If so, resend ONE packet and then
return.
@@ -4087,104 +4112,136 @@
}
}
- //
- }
-
- public void maybeSendSomething(long now, Vector rpiTemp, int[]
rpiIntTemp) {
- boolean mustSend = false;
-
- // Any urgent notifications to send?
- long urgentTime = getNextUrgentTime();
- if(urgentTime <= now)
+ int minSize =
getOutgoingMangler().fullHeadersLengthOneMessage(); // includes UDP headers
+ int maxSize = ((PacketSocketHandler)
getSocketHandler()).getPacketSendThreshold();
+
+ // If it's a keepalive, we must add an FNPVoid to ensure it has
a packet number.
+ boolean keepalive = false;
+
+ if(now - lastSentPacketTime() > Node.KEEPALIVE_INTERVAL) {
+ if(logMINOR)
+ Logger.minor(this, "Sending keepalive");
+ keepalive = true;
mustSend = true;
- checkTrackerTimeout();
+ }
- // Any packets to resend?
- for(int j = 0; j < 2; j++) {
- KeyTracker kt;
- if(j == 0)
- kt = getCurrentKeyTracker();
- else if(j == 1)
- kt = getPreviousKeyTracker();
- else
- break; // impossible
- if(kt == null)
- continue;
- int[] tmp = kt.grabResendPackets(rpiTemp, rpiIntTemp);
- if(tmp == null)
- continue;
- rpiIntTemp = tmp;
- for(int k = 0; k < rpiTemp.size(); k++) {
- ResendPacketItem item = (ResendPacketItem)
rpiTemp.get(k);
- if(item == null)
- continue;
- try {
- if(logMINOR)
- Logger.minor(this, "Resending "
+ item.packetNumber + " to " + item.kt);
- getOutgoingMangler().resend(item);
- mustSend = false;
- } catch(KeyChangedException e) {
- Logger.error(this, "Caught " + e + "
resending packets to " + kt);
- requeueResendItems(rpiTemp);
- break;
- } catch(NotConnectedException e) {
- Logger.normal(this, "Caught " + e + "
resending packets to " + kt);
- requeueResendItems(rpiTemp);
- break;
- } catch(PacketSequenceException e) {
- Logger.error(this, "Caught " + e + " -
disconnecting", e);
- // PSE is fairly drastic, something is
broken between us, but maybe we can resync
- forceDisconnect(false);
- } catch(WouldBlockException e) {
- Logger.error(this, "Impossible: " + e,
e);
+ ArrayList<MessageItem> messages = new
ArrayList<MessageItem>(10);
+
+ synchronized(messagesToSendNow) {
+
+ // Any urgent messages to send now?
+
+ if(!mustSend) {
+ for(int i=0;i<messagesToSendNow.length;i++) {
+ if(!messagesToSendNow[i].isEmpty()) {
+ if(((MessageItem)
messagesToSendNow[i].getFirst()).submitted +
+
PacketSender.MAX_COALESCING_DELAY <= now) {
+ mustSend = true;
+ break;
+ }
+ }
}
}
-
- }
-
- // Any messages to send?
- MessageItem[] messages = null;
- messages = grabQueuedMessageItems();
- if((messages != null) && (messages.length > 0)) {
- long l = Long.MAX_VALUE;
- int sz =
getOutgoingMangler().fullHeadersLengthOneMessage(); // includes UDP headers
- for(int j = 0; j < messages.length; j++) {
- if(l > messages[j].submitted)
- l = messages[j].submitted;
- sz += 2 + /* FIXME only 2? */
messages[j].getLength();
+
+ if(!mustSend) {
+ // What about total length?
+ int length = minSize;
+ for(LinkedList items : messagesToSendNow) {
+ for(Object o : items) {
+ MessageItem i = (MessageItem) o;
+ if(length + maxSize > maxSize) {
+ mustSend = true;
+ break;
+ } else length += maxSize;
+ }
+ }
}
- if(node.enablePacketCoalescing && (l +
PacketSender.MAX_COALESCING_DELAY > now) &&
- (sz < ((PacketSocketHandler)
getSocketHandler()).getPacketSendThreshold())) {
- // Don't send immediately
- requeueMessageItems(messages, 0,
messages.length, true, "TrafficCoalescing");
- } else {
- for(int j = 0; j < messages.length; j++)
- if(logMINOR)
- Logger.minor(this, "PS Sending:
" + (messages[j].msg == null ? "(not a Message)" :
messages[j].msg.getSpec().getName())+" to "+this);
- // Send packets, right now, blocking, including
any active notifications
-
getOutgoingMangler().processOutgoingOrRequeue(messages, this, true, false);
- return;
+
+ if(mustSend) {
+ int size = minSize;
+ boolean gotEnough = false;
+ while(!gotEnough) {
+ // Urgent messages first.
+ boolean foundNothingUrgent = true;
+ for(LinkedList items :
messagesToSendNow) {
+ while(!gotEnough) {
+ if(items.isEmpty()) {
+ break;
+ }
+ MessageItem item =
(MessageItem) items.getFirst();
+ if(item.submitted +
PacketSender.MAX_COALESCING_DELAY <= now) {
+
foundNothingUrgent = false;
+ int thisSize =
item.getLength();
+ if(size +
thisSize > maxSize) {
+ if(size
== minSize) {
+
// Send it anyway, nothing else to send.
+
size += thisSize;
+
items.removeFirst();
+
messages.add(item);
+
gotEnough = true;
+
break;
+ }
+
gotEnough = true;
+ break;
// More items won't fit.
+ }
+ size +=
thisSize;
+
items.removeFirst();
+
messages.add(item);
+ } else {
+ break;
+ }
+ }
+ }
+ if(foundNothingUrgent) break;
+ }
+
+ // Now the not-so-urgent messages.
+ while(!gotEnough) {
+ boolean foundNothing = true;
+ for(LinkedList items :
messagesToSendNow) {
+ while(!gotEnough) {
+ if(items.isEmpty()) {
+ break;
+ }
+ MessageItem item =
(MessageItem) items.getFirst();
+ foundNothing = false;
+ int thisSize =
item.getLength();
+ if(size + thisSize >
maxSize) {
+ if(size ==
minSize) {
+ // Send
it anyway, nothing else to send.
+ size +=
thisSize;
+
items.removeFirst();
+
messages.add(item);
+
gotEnough = true;
+ break;
+ }
+ gotEnough =
true;
+ break; // More
items won't fit.
+ }
+ size += thisSize;
+ items.removeFirst();
+ messages.add(item);
+ }
+ }
+ if(foundNothing) break;
+ }
}
+
}
-
- if(mustSend)
- // Send them
-
- try {
- sendAnyUrgentNotifications(false);
- } catch(PacketSequenceException e) {
- Logger.error(this, "Caught " + e + " - while
sending urgent notifications : disconnecting", e);
- forceDisconnect(false);
- }
-
- // Need to send a keepalive packet?
- if(now - lastSentPacketTime() > Node.KEEPALIVE_INTERVAL) {
- if(logMINOR)
- Logger.minor(this, "Sending keepalive");
+
+ if(messages.isEmpty() && keepalive) {
// Force packet to have a sequence number.
Message m = DMT.createFNPVoid();
addToLocalNodeSentMessagesToStatistic(m);
- getOutgoingMangler().processOutgoingOrRequeue(new
MessageItem[]{new MessageItem(m, null, 0, null, this)}, this, true, true);
+ messages.add(new MessageItem(m, null, 0, null, this));
}
+
+ if(messages.isEmpty()) return;
+
+ // Send packets, right now, blocking, including any active
notifications
+ // Note that processOutgoingOrRequeue will drop messages from
the end
+ // if necessary to fit the messages into a single packet.
+
getOutgoingMangler().processOutgoingOrRequeue(messages.toArray(new
MessageItem[messages.size()]), this, true, false, true);
+
}
}