Author: toad
Date: 2008-09-18 21:20:19 +0000 (Thu, 18 Sep 2008)
New Revision: 22696
Modified:
trunk/freenet/src/freenet/node/PacketSender.java
trunk/freenet/src/freenet/node/PeerNode.java
Log:
Factor out PeerNode.maybeSendSomething().
Identical to functionality in PacketSender that was removed.
Well perhaps a tiny timing change.
The next commits will be more interesting...
Modified: trunk/freenet/src/freenet/node/PacketSender.java
===================================================================
--- trunk/freenet/src/freenet/node/PacketSender.java 2008-09-18 20:37:22 UTC
(rev 22695)
+++ trunk/freenet/src/freenet/node/PacketSender.java 2008-09-18 21:20:19 UTC
(rev 22696)
@@ -230,109 +230,13 @@
continue;
}
- boolean mustSend = false;
-
- // Any urgent notifications to send?
+ pn.maybeSendSomething(now, rpiTemp, rpiIntTemp);
+
long urgentTime = pn.getNextUrgentTime();
// Should spam the logs, unless there is a
deadlock
if(urgentTime < Long.MAX_VALUE && logMINOR)
Logger.minor(this, "Next urgent time: "
+ urgentTime + " for " + pn.getPeer());
- if(urgentTime <= now)
- mustSend = true;
- else
- nextActionTime =
Math.min(nextActionTime, urgentTime);
-
- pn.checkTrackerTimeout();
-
- // Any packets to resend?
- for(int j = 0; j < 2; j++) {
- KeyTracker kt;
- if(j == 0)
- kt = pn.getCurrentKeyTracker();
- else if(j == 1)
- kt = pn.getPreviousKeyTracker();
- else
- break; // impossible
- if(kt == null)
- continue;
- int[] tmp =
kt.grabResendPackets(rpiTemp, rpiIntTemp);
- if(tmp == null)
- continue;
- rpiIntTemp = tmp;
- for(int k = 0; k < rpiTemp.size(); k++)
{
- ResendPacketItem item =
(ResendPacketItem) rpiTemp.get(k);
- if(item == null)
- continue;
- try {
- if(logMINOR)
-
Logger.minor(this, "Resending " + item.packetNumber + " to " + item.kt);
-
pn.getOutgoingMangler().resend(item);
- mustSend = false;
- } catch(KeyChangedException e) {
- Logger.error(this,
"Caught " + e + " resending packets to " + kt);
-
pn.requeueResendItems(rpiTemp);
- break;
- } catch(NotConnectedException
e) {
- Logger.normal(this,
"Caught " + e + " resending packets to " + kt);
-
pn.requeueResendItems(rpiTemp);
- break;
- } catch(PacketSequenceException
e) {
- Logger.error(this,
"Caught " + e + " - disconnecting", e);
- // PSE is fairly
drastic, something is broken between us, but maybe we can resync
-
pn.forceDisconnect(false);
- } catch(WouldBlockException e) {
- Logger.error(this,
"Impossible: " + e, e);
- }
- }
-
- }
-
- // Any messages to send?
- MessageItem[] messages = null;
- messages = pn.grabQueuedMessageItems();
- if((messages != null) && (messages.length > 0))
{
- long l = Long.MAX_VALUE;
- int sz =
pn.getOutgoingMangler().fullHeadersLengthOneMessage(); // includes UDP headers
- for(int j = 0; j < messages.length;
j++) {
- if(l > messages[j].submitted)
- l =
messages[j].submitted;
- sz += 2 + /* FIXME only 2? */
messages[j].getData(pn).length;
- }
- if(node.enablePacketCoalescing && (l +
MAX_COALESCING_DELAY > now) &&
- (sz < ((PacketSocketHandler)
pn.getSocketHandler()).getPacketSendThreshold())) {
- // Don't send immediately
- if(nextActionTime > (l +
MAX_COALESCING_DELAY))
- nextActionTime = l +
MAX_COALESCING_DELAY;
-
pn.requeueMessageItems(messages, 0, messages.length, true, "TrafficCoalescing");
- } else {
- for(int j = 0; j <
messages.length; j++)
- if(logMINOR)
-
Logger.minor(this, "PS Sending: " + (messages[j].msg == null ? "(not a
Message)" : messages[j].msg.getSpec().getName())+" to "+pn);
- // Send packets, right now,
blocking, including any active notifications
-
pn.getOutgoingMangler().processOutgoingOrRequeue(messages, pn, true, false);
- continue;
- }
- }
-
- if(mustSend)
- // Send them
-
- try {
-
pn.sendAnyUrgentNotifications(false);
- } catch(PacketSequenceException e) {
- Logger.error(this, "Caught " +
e + " - while sending urgent notifications : disconnecting", e);
- pn.forceDisconnect(false);
- }
-
- // Need to send a keepalive packet?
- if(now - pn.lastSentPacketTime() >
Node.KEEPALIVE_INTERVAL) {
- if(logMINOR)
- Logger.minor(this, "Sending
keepalive");
- // Force packet to have a sequence
number.
- Message m = DMT.createFNPVoid();
-
pn.addToLocalNodeSentMessagesToStatistic(m);
-
pn.getOutgoingMangler().processOutgoingOrRequeue(new MessageItem[]{new
MessageItem(m, null, 0, null)}, pn, true, true);
- }
+ nextActionTime = Math.min(nextActionTime,
urgentTime);
} else
// Not connected
Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java 2008-09-18 20:37:22 UTC
(rev 22695)
+++ trunk/freenet/src/freenet/node/PeerNode.java 2008-09-18 21:20:19 UTC
(rev 22696)
@@ -46,6 +46,7 @@
import freenet.io.comm.Message;
import freenet.io.comm.MessageFilter;
import freenet.io.comm.NotConnectedException;
+import freenet.io.comm.PacketSocketHandler;
import freenet.io.comm.Peer;
import freenet.io.comm.PeerContext;
import freenet.io.comm.PeerParseException;
@@ -4037,4 +4038,100 @@
public long getMainJarOfferedVersion() {
return offeredMainJarVersion;
}
+
+ public void maybeSendSomething(long now, Vector rpiTemp, int[]
rpiIntTemp) {
+ boolean mustSend = false;
+
+ // Any urgent notifications to send?
+ long urgentTime = getNextUrgentTime();
+ checkTrackerTimeout();
+
+ // Any packets to resend?
+ for(int j = 0; j < 2; j++) {
+ KeyTracker kt;
+ if(j == 0)
+ kt = getCurrentKeyTracker();
+ else if(j == 1)
+ kt = getPreviousKeyTracker();
+ else
+ break; // impossible
+ if(kt == null)
+ continue;
+ int[] tmp = kt.grabResendPackets(rpiTemp, rpiIntTemp);
+ if(tmp == null)
+ continue;
+ rpiIntTemp = tmp;
+ for(int k = 0; k < rpiTemp.size(); k++) {
+ ResendPacketItem item = (ResendPacketItem)
rpiTemp.get(k);
+ if(item == null)
+ continue;
+ try {
+ if(logMINOR)
+ Logger.minor(this, "Resending "
+ item.packetNumber + " to " + item.kt);
+ getOutgoingMangler().resend(item);
+ mustSend = false;
+ } catch(KeyChangedException e) {
+ Logger.error(this, "Caught " + e + "
resending packets to " + kt);
+ requeueResendItems(rpiTemp);
+ break;
+ } catch(NotConnectedException e) {
+ Logger.normal(this, "Caught " + e + "
resending packets to " + kt);
+ requeueResendItems(rpiTemp);
+ break;
+ } catch(PacketSequenceException e) {
+ Logger.error(this, "Caught " + e + " -
disconnecting", e);
+ // PSE is fairly drastic, something is
broken between us, but maybe we can resync
+ forceDisconnect(false);
+ } catch(WouldBlockException e) {
+ Logger.error(this, "Impossible: " + e,
e);
+ }
+ }
+
+ }
+
+ // Any messages to send?
+ MessageItem[] messages = null;
+ messages = grabQueuedMessageItems();
+ if((messages != null) && (messages.length > 0)) {
+ long l = Long.MAX_VALUE;
+ int sz =
getOutgoingMangler().fullHeadersLengthOneMessage(); // includes UDP headers
+ for(int j = 0; j < messages.length; j++) {
+ if(l > messages[j].submitted)
+ l = messages[j].submitted;
+ sz += 2 + /* FIXME only 2? */
messages[j].getData(this).length;
+ }
+ if(node.enablePacketCoalescing && (l +
PacketSender.MAX_COALESCING_DELAY > now) &&
+ (sz < ((PacketSocketHandler)
getSocketHandler()).getPacketSendThreshold())) {
+ // Don't send immediately
+ requeueMessageItems(messages, 0,
messages.length, true, "TrafficCoalescing");
+ } else {
+ for(int j = 0; j < messages.length; j++)
+ if(logMINOR)
+ Logger.minor(this, "PS Sending:
" + (messages[j].msg == null ? "(not a Message)" :
messages[j].msg.getSpec().getName())+" to "+this);
+ // Send packets, right now, blocking, including
any active notifications
+
getOutgoingMangler().processOutgoingOrRequeue(messages, this, true, false);
+ return;
+ }
+ }
+
+ if(mustSend)
+ // Send them
+
+ try {
+ sendAnyUrgentNotifications(false);
+ } catch(PacketSequenceException e) {
+ Logger.error(this, "Caught " + e + " - while
sending urgent notifications : disconnecting", e);
+ forceDisconnect(false);
+ }
+
+ // Need to send a keepalive packet?
+ if(now - lastSentPacketTime() > Node.KEEPALIVE_INTERVAL) {
+ if(logMINOR)
+ Logger.minor(this, "Sending keepalive");
+ // Force packet to have a sequence number.
+ Message m = DMT.createFNPVoid();
+ addToLocalNodeSentMessagesToStatistic(m);
+ getOutgoingMangler().processOutgoingOrRequeue(new
MessageItem[]{new MessageItem(m, null, 0, null)}, this, true, true);
+ }
+ }
}