Author: nextgens
Date: 2007-12-01 13:45:01 +0000 (Sat, 01 Dec 2007)
New Revision: 16178
Modified:
trunk/freenet/src/freenet/node/PacketSender.java
Log:
indent
Modified: trunk/freenet/src/freenet/node/PacketSender.java
===================================================================
--- trunk/freenet/src/freenet/node/PacketSender.java 2007-12-01 13:43:35 UTC
(rev 16177)
+++ trunk/freenet/src/freenet/node/PacketSender.java 2007-12-01 13:45:01 UTC
(rev 16178)
@@ -29,446 +29,447 @@
private static boolean logMINOR;
private static boolean logDEBUG;
-
/** Maximum time we will queue a message for in millseconds */
static final int MAX_COALESCING_DELAY = 100;
-
/** If opennet is enabled, and there are fewer than this many
connections,
* we MAY attempt to contact old opennet peers (opennet peers we have
* dropped from the routing table but kept around in case we can't
connect). */
static final int MIN_CONNECTIONS_TRY_OLD_OPENNET_PEERS = 5;
-
/** We send connect attempts to old-opennet-peers no more than once
every
* this many milliseconds. */
- static final int MIN_OLD_OPENNET_CONNECT_DELAY_NO_CONNS = 10*1000;
-
+ static final int MIN_OLD_OPENNET_CONNECT_DELAY_NO_CONNS = 10 * 1000;
/** We send connect attempts to old-opennet-peers no more than once
every
* this many milliseconds. */
- static final int MIN_OLD_OPENNET_CONNECT_DELAY = 60*1000;
-
- final LinkedList resendPackets;
- /** ~= Ticker :) */
- private final TreeMap timedJobsByTime;
- final Thread myThread;
- final Node node;
- NodeStats stats;
- long lastClearedOldSwapChains;
- long lastReportedNoPackets;
- long lastReceivedPacketFromAnyNode;
- /** For watchdog. 32-bit to avoid locking. */
- volatile int lastTimeInSeconds;
- private long timeLastSentOldOpennetConnectAttempt;
-
- private Vector rpiTemp;
- private int[] rpiIntTemp;
-
- PacketSender(Node node) {
- resendPackets = new LinkedList();
- timedJobsByTime = new TreeMap();
- this.node = node;
- myThread = new Thread(this, "PacketSender thread for
"+node.getDarknetPortNumber());
- myThread.setDaemon(true);
- myThread.setPriority(Thread.MAX_PRIORITY);
- logMINOR = Logger.shouldLog(Logger.MINOR, this);
- logDEBUG = Logger.shouldLog(Logger.DEBUG, this);
- rpiTemp = new Vector();
- rpiIntTemp = new int[64];
- }
+ static final int MIN_OLD_OPENNET_CONNECT_DELAY = 60 * 1000;
+ final LinkedList resendPackets;
+ /** ~= Ticker :) */
+ private final TreeMap timedJobsByTime;
+ final Thread myThread;
+ final Node node;
+ NodeStats stats;
+ long lastClearedOldSwapChains;
+ long lastReportedNoPackets;
+ long lastReceivedPacketFromAnyNode;
+ /** For watchdog. 32-bit to avoid locking. */
+ volatile int lastTimeInSeconds;
+ private long timeLastSentOldOpennetConnectAttempt;
+ private Vector rpiTemp;
+ private int[] rpiIntTemp;
-
- /**
- * The main purpose of this thread is to detect the lost-lock deadlocks
that happen occasionally
- * on Sun VMs with NPTL enabled, and restart the node.
- *
- * Consequently it MUST NOT LOCK ANYTHING. That further means it must not
use the Logger, and even
- * System.err/System.out if they have been redirected.
- * @author root
- *
- */
- private class Watchdog implements Runnable {
-
- public void run() {
- freenet.support.Logger.OSThread.logPID(this);
- // Do not lock anything, or we may be caught up with a
lost-lock deadlock.
- while(true) {
- try {
+ PacketSender(Node node) {
+ resendPackets = new LinkedList();
+ timedJobsByTime = new TreeMap();
+ this.node = node;
+ myThread = new Thread(this, "PacketSender thread for " +
node.getDarknetPortNumber());
+ myThread.setDaemon(true);
+ myThread.setPriority(Thread.MAX_PRIORITY);
+ logMINOR = Logger.shouldLog(Logger.MINOR, this);
+ logDEBUG = Logger.shouldLog(Logger.DEBUG, this);
+ rpiTemp = new Vector();
+ rpiIntTemp = new int[64];
+ }
+
+ /**
+ * The main purpose of this thread is to detect the lost-lock deadlocks
that happen occasionally
+ * on Sun VMs with NPTL enabled, and restart the node.
+ *
+ * Consequently it MUST NOT LOCK ANYTHING. That further means it must
not use the Logger, and even
+ * System.err/System.out if they have been redirected.
+ * @author root
+ *
+ */
+ private class Watchdog implements Runnable {
+
+ public void run() {
+ freenet.support.Logger.OSThread.logPID(this);
+ // Do not lock anything, or we may be caught up with a
lost-lock deadlock.
+ while(true) {
+ try {
Thread.sleep(5000);
- } catch (InterruptedException e) {
- // Ignore
+ } catch(InterruptedException e) {
+ // Ignore
}
long now = System.currentTimeMillis();
- long recordedTime = ((long)lastTimeInSeconds) *
1000;
+ long recordedTime = ((long) lastTimeInSeconds)
* 1000;
long diff = now - recordedTime;
- if((diff > 3*60*1000) && node.isHasStarted()) {
+ if((diff > 3 * 60 * 1000) &&
node.isHasStarted()) {
FileLoggerHook flh =
Node.logConfigHandler.getFileLoggerHook();
boolean redirected = flh != null &&
!flh.hasRedirectedStdOutErrNoLock();
if(!redirected)
- System.err.println("Restarting
node: PacketSender froze for 3 minutes! ("+diff+ ')');
-
+ System.err.println("Restarting
node: PacketSender froze for 3 minutes! (" + diff + ')');
+
try {
- if(node.isUsingWrapper()){
+ if(node.isUsingWrapper()) {
WrapperManager.requestThreadDump();
WrapperManager.restart();
- }else{
+ } else {
if(!redirected)
System.err.println("Exiting on deadlock, but not running in the wrapper! Please
restart the node manually.");
-
+
// No wrapper : we
don't want to let it harm the network!
node.exit("PacketSender
deadlock");
}
- } catch (Throwable t) {
+ } catch(Throwable t) {
if(!Node.logConfigHandler.getFileLoggerHook().hasRedirectedStdOutErrNoLock()) {
System.err.println("Error : can't restart the node : consider installing the
wrapper. PLEASE REPORT THAT ERROR TO devl at freenetproject.org");
t.printStackTrace();
}
node.exit("PacketSender
deadlock and error");
}
-
+
}
-
- }
- }
- }
-
- void start(NodeStats stats) {
- this.stats = stats;
- Logger.normal(this, "Starting PacketSender");
- System.out.println("Starting PacketSender");
- long now = System.currentTimeMillis();
- long transition = Version.transitionTime;
- if(now < transition) {
- queueTimedJob(new Runnable() {
- public void run() {
-
freenet.support.Logger.OSThread.logPID(this);
- PeerNode[] nodes = node.peers.myPeers;
- for(int i=0;i<nodes.length;i++) {
- PeerNode pn = nodes[i];
- pn.updateShouldDisconnectNow();
- }
- }
- }, transition - now);
- }
- lastTimeInSeconds = (int) (now / 1000);
- if(!node.disableHangCheckers) {
- // Necessary because of sun JVM bugs when NPTL is enabled.
Write once, debug everywhere!
- Thread t1 = new Thread(new Watchdog(), "PacketSender watchdog");
- t1.setDaemon(true);
- t1.start();
- }
- myThread.start();
- }
-
- public void run() {
- freenet.support.Logger.OSThread.logPID(this);
- while(true) {
- lastReceivedPacketFromAnyNode = lastReportedNoPackets;
- try {
- logMINOR = Logger.shouldLog(Logger.MINOR, this);
- realRun();
- } catch (OutOfMemoryError e) {
+
+ }
+ }
+ }
+
+ void start(NodeStats stats) {
+ this.stats = stats;
+ Logger.normal(this, "Starting PacketSender");
+ System.out.println("Starting PacketSender");
+ long now = System.currentTimeMillis();
+ long transition = Version.transitionTime;
+ if(now < transition)
+ queueTimedJob(new Runnable() {
+
+ public void run() {
+
freenet.support.Logger.OSThread.logPID(this);
+ PeerNode[] nodes =
node.peers.myPeers;
+ for(int i = 0; i <
nodes.length; i++) {
+ PeerNode pn = nodes[i];
+
pn.updateShouldDisconnectNow();
+ }
+ }
+ }, transition - now);
+ lastTimeInSeconds = (int) (now / 1000);
+ if(!node.disableHangCheckers) {
+ // Necessary because of sun JVM bugs when NPTL is
enabled. Write once, debug everywhere!
+ Thread t1 = new Thread(new Watchdog(), "PacketSender
watchdog");
+ t1.setDaemon(true);
+ t1.start();
+ }
+ myThread.start();
+ }
+
+ public void run() {
+ freenet.support.Logger.OSThread.logPID(this);
+ while(true) {
+ lastReceivedPacketFromAnyNode = lastReportedNoPackets;
+ try {
+ logMINOR = Logger.shouldLog(Logger.MINOR, this);
+ realRun();
+ } catch(OutOfMemoryError e) {
OOMHandler.handleOOM(e);
System.err.println("Will retry above failed
operation...");
- } catch (Throwable t) {
- Logger.error(this, "Caught in PacketSender: "+t, t);
- System.err.println("Caught in PacketSender: "+t);
- t.printStackTrace();
- }
- }
- }
+ } catch(Throwable t) {
+ Logger.error(this, "Caught in PacketSender: " +
t, t);
+ System.err.println("Caught in PacketSender: " +
t);
+ t.printStackTrace();
+ }
+ }
+ }
- private void realRun() {
- long now = System.currentTimeMillis();
- lastTimeInSeconds = (int) (now / 1000);
- PeerManager pm = node.peers;
- PeerNode[] nodes = pm.myPeers;
- // Run the time sensitive status updater separately
- for(int i = 0; i < nodes.length; i++) {
- PeerNode pn = nodes[i];
- // Only routing backed off nodes should need status updating since
everything else
- // should get updated immediately when it's changed
- if(pn.getPeerNodeStatus() ==
PeerManager.PEER_NODE_STATUS_ROUTING_BACKED_OFF) {
- pn.setPeerNodeStatus(now);
- }
- }
- pm.maybeLogPeerNodeStatusSummary(now);
- pm.maybeUpdateOldestNeverConnectedPeerAge(now);
- stats.maybeUpdatePeerManagerUserAlertStats(now);
- stats.maybeUpdateNodeIOStats(now);
- pm.maybeUpdatePeerNodeRoutableConnectionStats(now);
- long nextActionTime = Long.MAX_VALUE;
- long oldTempNow = now;
- for(int i=0;i<nodes.length;i++) {
- PeerNode pn = nodes[i];
- lastReceivedPacketFromAnyNode =
- Math.max(pn.lastReceivedPacketTime(),
lastReceivedPacketFromAnyNode);
- pn.maybeOnConnect();
- if(pn.isConnected()) {
- // Is the node dead?
- if(now - pn.lastReceivedPacketTime() >
pn.maxTimeBetweenReceivedPackets()) {
- Logger.normal(this, "Disconnecting from "+pn+" -
haven't received packets recently");
- pn.disconnected();
- continue;
- } else if(pn.isRoutable() && pn.noLongerRoutable()) {
- // we don't disconnect but we mark it incompatible
- pn.invalidate();
- pn.setPeerNodeStatus(now);
- Logger.normal(this, "shouldDisconnectNow has returned
true : marking the peer as incompatible");
- continue;
- }
-
- boolean mustSend = false;
-
- // Any urgent notifications to send?
- long urgentTime = pn.getNextUrgentTime();
- // Should spam the logs, unless there is a deadlock
- if(urgentTime < Long.MAX_VALUE && logMINOR)
- Logger.minor(this, "Next urgent time: "+urgentTime+"
for "+pn.getPeer());
- if(urgentTime <= now) {
- mustSend = true;
- } else {
- nextActionTime = Math.min(nextActionTime, urgentTime);
- }
-
- // Any packets to resend?
- for(int j=0;j<2;j++) {
- KeyTracker kt;
- if(j == 0) kt = pn.getCurrentKeyTracker();
- else if(j == 1) kt = pn.getPreviousKeyTracker();
- else break; // impossible
- if(kt == null) continue;
- int[] tmp = kt.grabResendPackets(rpiTemp, rpiIntTemp);
- if(tmp == null) continue;
- rpiIntTemp = tmp;
- for(int k=0;k<rpiTemp.size();k++) {
- ResendPacketItem item = (ResendPacketItem)
rpiTemp.get(k);
- if(item == null) continue;
- try {
- if(logMINOR) Logger.minor(this, "Resending
"+item.packetNumber+" to "+item.kt);
- pn.getOutgoingMangler().resend(item);
- mustSend = false;
- } catch (KeyChangedException e) {
- Logger.error(this, "Caught "+e+" resending packets
to "+kt);
- pn.requeueResendItems(rpiTemp);
- break;
- } catch (NotConnectedException e) {
- Logger.normal(this, "Caught "+e+" resending
packets to "+kt);
- pn.requeueResendItems(rpiTemp);
- break;
- } catch (PacketSequenceException e) {
- Logger.error(this, "Caught "+e+" -
disconnecting", e);
- pn.forceDisconnect();
- } catch (WouldBlockException e)
{
- Logger.error(this,
"Impossible: "+e, e);
+ private void realRun() {
+ long now = System.currentTimeMillis();
+ lastTimeInSeconds = (int) (now / 1000);
+ PeerManager pm = node.peers;
+ PeerNode[] nodes = pm.myPeers;
+ // Run the time sensitive status updater separately
+ for(int i = 0; i < nodes.length; i++) {
+ PeerNode pn = nodes[i];
+ // Only routing backed off nodes should need status
updating since everything else
+ // should get updated immediately when it's changed
+ if(pn.getPeerNodeStatus() ==
PeerManager.PEER_NODE_STATUS_ROUTING_BACKED_OFF)
+ pn.setPeerNodeStatus(now);
+ }
+ pm.maybeLogPeerNodeStatusSummary(now);
+ pm.maybeUpdateOldestNeverConnectedPeerAge(now);
+ stats.maybeUpdatePeerManagerUserAlertStats(now);
+ stats.maybeUpdateNodeIOStats(now);
+ pm.maybeUpdatePeerNodeRoutableConnectionStats(now);
+ long nextActionTime = Long.MAX_VALUE;
+ long oldTempNow = now;
+ for(int i = 0; i < nodes.length; i++) {
+ PeerNode pn = nodes[i];
+ lastReceivedPacketFromAnyNode =
+ Math.max(pn.lastReceivedPacketTime(),
lastReceivedPacketFromAnyNode);
+ pn.maybeOnConnect();
+ if(pn.isConnected()) {
+ // Is the node dead?
+ if(now - pn.lastReceivedPacketTime() >
pn.maxTimeBetweenReceivedPackets()) {
+ Logger.normal(this, "Disconnecting from
" + pn + " - haven't received packets recently");
+ pn.disconnected();
+ continue;
+ } else if(pn.isRoutable() &&
pn.noLongerRoutable()) {
+ // we don't disconnect but we mark it
incompatible
+ pn.invalidate();
+ pn.setPeerNodeStatus(now);
+ Logger.normal(this,
"shouldDisconnectNow has returned true : marking the peer as incompatible");
+ continue;
+ }
+
+ boolean mustSend = false;
+
+ // Any urgent notifications to send?
+ long urgentTime = pn.getNextUrgentTime();
+ // Should spam the logs, unless there is a
deadlock
+ if(urgentTime < Long.MAX_VALUE && logMINOR)
+ Logger.minor(this, "Next urgent time: "
+ urgentTime + " for " + pn.getPeer());
+ if(urgentTime <= now)
+ mustSend = true;
+ else
+ nextActionTime =
Math.min(nextActionTime, urgentTime);
+
+ // Any packets to resend?
+ for(int j = 0; j < 2; j++) {
+ KeyTracker kt;
+ if(j == 0)
+ kt = pn.getCurrentKeyTracker();
+ else if(j == 1)
+ kt = pn.getPreviousKeyTracker();
+ else
+ break; // impossible
+ if(kt == null)
+ continue;
+ int[] tmp =
kt.grabResendPackets(rpiTemp, rpiIntTemp);
+ if(tmp == null)
+ continue;
+ rpiIntTemp = tmp;
+ for(int k = 0; k < rpiTemp.size(); k++)
{
+ ResendPacketItem item =
(ResendPacketItem) rpiTemp.get(k);
+ if(item == null)
+ continue;
+ try {
+ if(logMINOR)
+
Logger.minor(this, "Resending " + item.packetNumber + " to " + item.kt);
+
pn.getOutgoingMangler().resend(item);
+ mustSend = false;
+ } catch(KeyChangedException e) {
+ Logger.error(this,
"Caught " + e + " resending packets to " + kt);
+
pn.requeueResendItems(rpiTemp);
+ break;
+ } catch(NotConnectedException
e) {
+ Logger.normal(this,
"Caught " + e + " resending packets to " + kt);
+
pn.requeueResendItems(rpiTemp);
+ break;
+ } catch(PacketSequenceException
e) {
+ Logger.error(this,
"Caught " + e + " - disconnecting", e);
+ pn.forceDisconnect();
+ } catch(WouldBlockException e) {
+ Logger.error(this,
"Impossible: " + e, e);
}
- }
-
- }
+ }
- // Any messages to send?
- MessageItem[] messages = null;
- messages = pn.grabQueuedMessageItems();
- if((messages != null) && (messages.length > 0)) {
- long l = Long.MAX_VALUE;
- int sz =
pn.getOutgoingMangler().fullHeadersLengthOneMessage(); // includes UDP headers
- for(int j=0;j<messages.length;j++) {
- if(l > messages[j].submitted) l =
messages[j].submitted;
- sz += 2 + /* FIXME only 2? */
messages[j].getData(pn).length;
- }
- if((l + MAX_COALESCING_DELAY > now) &&
- (sz < ((PacketSocketHandler)
pn.getSocketHandler()).getPacketSendThreshold())) {
- // Don't send immediately
- if(nextActionTime > (l+MAX_COALESCING_DELAY))
- nextActionTime = l+MAX_COALESCING_DELAY;
- pn.requeueMessageItems(messages, 0,
messages.length, true, "TrafficCoalescing");
- } else {
- for(int j=0;j<messages.length;j++) {
- if(logMINOR) Logger.minor(this, "PS
Sending: "+(messages[j].msg == null ? "(not a Message)" :
messages[j].msg.getSpec().getName()));
- }
- // Send packets, right now, blocking, including
any active notifications
-
pn.getOutgoingMangler().processOutgoingOrRequeue(messages, pn, true, false);
- continue;
- }
- }
-
- if(mustSend) {
- // Send them
- try {
+ }
+
+ // Any messages to send?
+ MessageItem[] messages = null;
+ messages = pn.grabQueuedMessageItems();
+ if((messages != null) && (messages.length > 0))
{
+ long l = Long.MAX_VALUE;
+ int sz =
pn.getOutgoingMangler().fullHeadersLengthOneMessage(); // includes UDP headers
+ for(int j = 0; j < messages.length;
j++) {
+ if(l > messages[j].submitted)
+ l =
messages[j].submitted;
+ sz += 2 + /* FIXME only 2? */
messages[j].getData(pn).length;
+ }
+ if((l + MAX_COALESCING_DELAY > now) &&
+ (sz < ((PacketSocketHandler)
pn.getSocketHandler()).getPacketSendThreshold())) {
+ // Don't send immediately
+ if(nextActionTime > (l +
MAX_COALESCING_DELAY))
+ nextActionTime = l +
MAX_COALESCING_DELAY;
+
pn.requeueMessageItems(messages, 0, messages.length, true, "TrafficCoalescing");
+ } else {
+ for(int j = 0; j <
messages.length; j++)
+ if(logMINOR)
+
Logger.minor(this, "PS Sending: " + (messages[j].msg == null ? "(not a
Message)" : messages[j].msg.getSpec().getName()));
+ // Send packets, right now,
blocking, including any active notifications
+
pn.getOutgoingMangler().processOutgoingOrRequeue(messages, pn, true, false);
+ continue;
+ }
+ }
+
+ if(mustSend)
+ // Send them
+
+ try {
pn.sendAnyUrgentNotifications();
- } catch (PacketSequenceException e) {
- Logger.error(this, "Caught "+e+" - while sending urgent
notifications : disconnecting", e);
- pn.forceDisconnect();
+ } catch(PacketSequenceException e) {
+ Logger.error(this, "Caught " +
e + " - while sending urgent notifications : disconnecting", e);
+ pn.forceDisconnect();
}
- }
-
- // Need to send a keepalive packet?
- if(now - pn.lastSentPacketTime() > Node.KEEPALIVE_INTERVAL) {
- if(logMINOR) Logger.minor(this, "Sending keepalive");
- // Force packet to have a sequence number.
- Message m = DMT.createFNPVoid();
- pn.addToLocalNodeSentMessagesToStatistic(m);
- pn.getOutgoingMangler().processOutgoingOrRequeue(new
MessageItem[] { new MessageItem(m, null, 0, null) }, pn, true, true);
- }
- } else {
- // Not connected
- if(pn.noContactDetails())
- pn.startARKFetcher();
- }
- if(pn.shouldSendHandshake()) {
- // Send handshake if necessary
- long beforeHandshakeTime = System.currentTimeMillis();
- pn.getOutgoingMangler().sendHandshake(pn);
- long afterHandshakeTime = System.currentTimeMillis();
- if((afterHandshakeTime - beforeHandshakeTime) > (2*1000))
- Logger.error(this, "afterHandshakeTime is more than 2
seconds past beforeHandshakeTime ("+(afterHandshakeTime -
beforeHandshakeTime)+") in PacketSender working with "+pn.userToString());
- }
- long tempNow = System.currentTimeMillis();
- if((tempNow - oldTempNow) > (5 * 1000))
- Logger.error(this, "tempNow is more than 5 seconds past
oldTempNow (" + (tempNow - oldTempNow) + ") in PacketSender working with " +
pn.userToString());
- oldTempNow = tempNow;
- }
-
- // Consider sending connect requests to our opennet old-peers.
- // No point if they are NATed, of course... but we don't know whether
they are.
- OpennetManager om = node.getOpennet();
- if(om != null) {
- int connCount = node.peers.quickCountConnectedPeers();
- int minDelay = connCount == 0 ?
MIN_OLD_OPENNET_CONNECT_DELAY_NO_CONNS : MIN_OLD_OPENNET_CONNECT_DELAY;
- if(logDEBUG)
- Logger.debug(this, "Conns "+connCount+" minDelay
"+minDelay+" old opennet peers "+om.countOldOpennetPeers()+" last sent "+(now -
timeLastSentOldOpennetConnectAttempt)+" startup "+(now - node.startupTime));
- if(now - timeLastSentOldOpennetConnectAttempt > minDelay &&
- connCount <=
MIN_CONNECTIONS_TRY_OLD_OPENNET_PEERS &&
- om.countOldOpennetPeers() > 0 &&
- now - node.startupTime >
OpennetManager.DROP_STARTUP_DELAY) {
- PeerNode pn = om.randomOldOpennetNode();
- if(pn != null) {
- if(logMINOR)
- Logger.minor(this, "Sending old-opennet connect
attempt to "+pn);
- pn.getOutgoingMangler().sendHandshake(pn);
- timeLastSentOldOpennetConnectAttempt = now;
- if(pn.noContactDetails() && node.getPeerNodes().length
> 0 && connCount > 0 && node.random.nextBoolean())
- pn.startARKFetcher();
- }
- }
- }
-
- if(now - lastClearedOldSwapChains > 10000) {
- node.lm.clearOldSwapChains();
- lastClearedOldSwapChains = now;
- }
-
- long oldNow = System.currentTimeMillis();
- // Send may have taken some time
- now = System.currentTimeMillis();
- lastTimeInSeconds = (int) (now / 1000);
-
- if((now - oldNow) > (10*1000))
- Logger.error(this, "now is more than 10 seconds past oldNow
("+(now - oldNow)+") in PacketSender");
-
- Vector jobsToRun = null;
-
- synchronized(timedJobsByTime) {
- while(!timedJobsByTime.isEmpty()) {
- Long tRun = (Long) timedJobsByTime.firstKey();
- if(tRun.longValue() <= now) {
- if(jobsToRun == null) jobsToRun = new
Vector();
- Object o = timedJobsByTime.remove(tRun);
- if(o instanceof Runnable[]) {
- Runnable[] r = (Runnable[]) o;
- for(int i=0;i<r.length;i++)
- jobsToRun.add(r[i]);
- } else {
- Runnable r = (Runnable) o;
- jobsToRun.add(r);
- }
- } else {
- // FIXME how accurately do we want
ticker jobs to be scheduled?
+ // Need to send a keepalive packet?
+ if(now - pn.lastSentPacketTime() >
Node.KEEPALIVE_INTERVAL) {
+ if(logMINOR)
+ Logger.minor(this, "Sending
keepalive");
+ // Force packet to have a sequence
number.
+ Message m = DMT.createFNPVoid();
+
pn.addToLocalNodeSentMessagesToStatistic(m);
+
pn.getOutgoingMangler().processOutgoingOrRequeue(new MessageItem[]{new
MessageItem(m, null, 0, null)}, pn, true, true);
+ }
+ } else
+ // Not connected
+
+ if(pn.noContactDetails())
+ pn.startARKFetcher();
+ if(pn.shouldSendHandshake()) {
+ // Send handshake if necessary
+ long beforeHandshakeTime =
System.currentTimeMillis();
+ pn.getOutgoingMangler().sendHandshake(pn);
+ long afterHandshakeTime =
System.currentTimeMillis();
+ if((afterHandshakeTime - beforeHandshakeTime) >
(2 * 1000))
+ Logger.error(this, "afterHandshakeTime
is more than 2 seconds past beforeHandshakeTime (" + (afterHandshakeTime -
beforeHandshakeTime) + ") in PacketSender working with " + pn.userToString());
+ }
+ long tempNow = System.currentTimeMillis();
+ if((tempNow - oldTempNow) > (5 * 1000))
+ Logger.error(this, "tempNow is more than 5
seconds past oldTempNow (" + (tempNow - oldTempNow) + ") in PacketSender
working with " + pn.userToString());
+ oldTempNow = tempNow;
+ }
+
+ // Consider sending connect requests to our opennet old-peers.
+ // No point if they are NATed, of course... but we don't know
whether they are.
+ OpennetManager om = node.getOpennet();
+ if(om != null) {
+ int connCount = node.peers.quickCountConnectedPeers();
+ int minDelay = connCount == 0 ?
MIN_OLD_OPENNET_CONNECT_DELAY_NO_CONNS : MIN_OLD_OPENNET_CONNECT_DELAY;
+ if(logDEBUG)
+ Logger.debug(this, "Conns " + connCount + "
minDelay " + minDelay + " old opennet peers " + om.countOldOpennetPeers() + "
last sent " + (now - timeLastSentOldOpennetConnectAttempt) + " startup " + (now
- node.startupTime));
+ if(now - timeLastSentOldOpennetConnectAttempt >
minDelay &&
+ connCount <=
MIN_CONNECTIONS_TRY_OLD_OPENNET_PEERS &&
+ om.countOldOpennetPeers() > 0 &&
+ now - node.startupTime >
OpennetManager.DROP_STARTUP_DELAY) {
+ PeerNode pn = om.randomOldOpennetNode();
+ if(pn != null) {
+ if(logMINOR)
+ Logger.minor(this, "Sending
old-opennet connect attempt to " + pn);
+
pn.getOutgoingMangler().sendHandshake(pn);
+ timeLastSentOldOpennetConnectAttempt =
now;
+ if(pn.noContactDetails() &&
node.getPeerNodes().length > 0 && connCount > 0 && node.random.nextBoolean())
+ pn.startARKFetcher();
+ }
+ }
+ }
+
+ if(now - lastClearedOldSwapChains > 10000) {
+ node.lm.clearOldSwapChains();
+ lastClearedOldSwapChains = now;
+ }
+
+ long oldNow = System.currentTimeMillis();
+
+ // Send may have taken some time
+ now = System.currentTimeMillis();
+ lastTimeInSeconds = (int) (now / 1000);
+
+ if((now - oldNow) > (10 * 1000))
+ Logger.error(this, "now is more than 10 seconds past
oldNow (" + (now - oldNow) + ") in PacketSender");
+
+ Vector jobsToRun = null;
+
+ synchronized(timedJobsByTime) {
+ while(!timedJobsByTime.isEmpty()) {
+ Long tRun = (Long) timedJobsByTime.firstKey();
+ if(tRun.longValue() <= now) {
+ if(jobsToRun == null)
+ jobsToRun = new Vector();
+ Object o = timedJobsByTime.remove(tRun);
+ if(o instanceof Runnable[]) {
+ Runnable[] r = (Runnable[]) o;
+ for(int i = 0; i < r.length;
i++)
+ jobsToRun.add(r[i]);
+ } else {
+ Runnable r = (Runnable) o;
+ jobsToRun.add(r);
+ }
+ } else
+ // FIXME how accurately do we want
ticker jobs to be scheduled?
// FIXME can they wait the odd 200ms?
- break;
- }
- }
- }
- if(jobsToRun != null) {
- for(int i=0;i<jobsToRun.size();i++) {
- Runnable r = (Runnable) jobsToRun.get(i);
- if(logMINOR) Logger.minor(this, "Running "+r);
- if(r instanceof FastRunnable) {
- // Run in-line
- try {
- r.run();
- } catch (Throwable t) {
- Logger.error(this, "Caught "+t+"
running "+r, t);
- }
- } else {
- try {
- node.executor.execute(r, "Scheduled
job: "+r);
- } catch (OutOfMemoryError e) {
+ break;
+ }
+ }
+
+ if(jobsToRun != null)
+ for(int i = 0; i < jobsToRun.size(); i++) {
+ Runnable r = (Runnable) jobsToRun.get(i);
+ if(logMINOR)
+ Logger.minor(this, "Running " + r);
+ if(r instanceof FastRunnable)
+ // Run in-line
+
+ try {
+ r.run();
+ } catch(Throwable t) {
+ Logger.error(this, "Caught " +
t + " running " + r, t);
+ }
+ else
+ try {
+ node.executor.execute(r,
"Scheduled job: " + r);
+ } catch(OutOfMemoryError e) {
OOMHandler.handleOOM(e);
System.err.println("Will retry
above failed operation...");
queueTimedJob(r, 200);
- } catch (Throwable t) {
- Logger.error(this, "Caught in
PacketSender: "+t, t);
- System.err.println("Caught in
PacketSender: "+t);
+ } catch(Throwable t) {
+ Logger.error(this, "Caught in
PacketSender: " + t, t);
+ System.err.println("Caught in
PacketSender: " + t);
t.printStackTrace();
}
- }
- }
- }
-
- long sleepTime = nextActionTime - now;
- // MAX_COALESCING_DELAYms maximum sleep time - same as the maximum
coalescing delay
- sleepTime = Math.min(sleepTime, MAX_COALESCING_DELAY);
-
- if(now - node.startupTime > 60*1000*5) {
- if(now - lastReceivedPacketFromAnyNode > Node.ALARM_TIME) {
- Logger.error(this, "Have not received any packets from any
node in last "+Node.ALARM_TIME/1000+" seconds");
- lastReportedNoPackets = now;
- }
- }
-
- if(sleepTime > 0) {
- // Update logging only when have time to do so
- logMINOR = Logger.shouldLog(Logger.MINOR, this);
- logDEBUG = Logger.shouldLog(Logger.DEBUG, this);
- try {
- synchronized(this) {
- if(logMINOR) Logger.minor(this, "Sleeping for
"+sleepTime);
- wait(sleepTime);
- }
- } catch (InterruptedException e) {
- // Ignore, just wake up. Probably we got interrupt()ed
- // because a new packet came in.
- }
- }
+ }
+
+ long sleepTime = nextActionTime - now;
+ // MAX_COALESCING_DELAYms maximum sleep time - same as the
maximum coalescing delay
+ sleepTime = Math.min(sleepTime, MAX_COALESCING_DELAY);
+
+ if(now - node.startupTime > 60 * 1000 * 5)
+ if(now - lastReceivedPacketFromAnyNode >
Node.ALARM_TIME) {
+ Logger.error(this, "Have not received any
packets from any node in last " + Node.ALARM_TIME / 1000 + " seconds");
+ lastReportedNoPackets = now;
+ }
+
+ if(sleepTime > 0) {
+ // Update logging only when have time to do so
+ logMINOR = Logger.shouldLog(Logger.MINOR, this);
+ logDEBUG = Logger.shouldLog(Logger.DEBUG, this);
+ try {
+ synchronized(this) {
+ if(logMINOR)
+ Logger.minor(this, "Sleeping
for " + sleepTime);
+ wait(sleepTime);
+ }
+ } catch(InterruptedException e) {
+ // Ignore, just wake up. Probably we got interrupt()ed
+ // because a new packet came in.
+ }
+ }
}
- /** Wake up, and send any queued packets. */
+ /** Wake up, and send any queued packets. */
void wakeUp() {
- // Wake up if needed
- synchronized(this) {
- notifyAll();
- }
- }
+ // Wake up if needed
+ synchronized(this) {
+ notifyAll();
+ }
+ }
public void queueTimedJob(Runnable job, long offset) {
if(offset <= 0) {
- node.executor.execute(job, "Scheduled job: "+job);
+ node.executor.execute(job, "Scheduled job: " + job);
return;
}
long now = System.currentTimeMillis();
Long l = new Long(offset + now);
synchronized(timedJobsByTime) {
Object o = timedJobsByTime.get(l);
- if(o == null) {
+ if(o == null)
timedJobsByTime.put(l, job);
- } else if(o instanceof Runnable) {
- timedJobsByTime.put(l, new Runnable[] {
(Runnable)o, job });
- } else if(o instanceof Runnable[]) {
+ else if(o instanceof Runnable)
+ timedJobsByTime.put(l, new
Runnable[]{(Runnable) o, job});
+ else if(o instanceof Runnable[]) {
Runnable[] r = (Runnable[]) o;
- Runnable[] jobs = new Runnable[r.length+1];
+ Runnable[] jobs = new Runnable[r.length + 1];
System.arraycopy(r, 0, jobs, 0, r.length);
- jobs[jobs.length-1] = job;
+ jobs[jobs.length - 1] = job;
timedJobsByTime.put(l, jobs);
}
}