Author: robert
Date: 2008-02-08 18:31:02 +0000 (Fri, 08 Feb 2008)
New Revision: 17704
Modified:
trunk/freenet/src/freenet/node/NetworkIDManager.java
Log:
initial SecretPinger implementation (untested / disabled)
Modified: trunk/freenet/src/freenet/node/NetworkIDManager.java
===================================================================
--- trunk/freenet/src/freenet/node/NetworkIDManager.java 2008-02-08
18:03:29 UTC (rev 17703)
+++ trunk/freenet/src/freenet/node/NetworkIDManager.java 2008-02-08
18:31:02 UTC (rev 17704)
@@ -1,8 +1,11 @@
package freenet.node;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
import freenet.io.comm.DMT;
import freenet.io.comm.DisconnectedException;
@@ -11,6 +14,8 @@
import freenet.io.comm.NotConnectedException;
import freenet.support.Logger;
+import freenet.support.math.BootstrappingDecayingRunningAverage;
+import freenet.support.math.RunningAverage;
/**
* Handles the processing of challenge/response pings as well as the storage
of the secrets pertaining thereto.
@@ -18,21 +23,48 @@
* @author robert
* @created 2008-02-06
*/
-public class NetworkIDManager {
+public class NetworkIDManager implements Runnable {
public static boolean disableSecretPings=true;
+ public static boolean disableSecretPinger=true;
- private static final int SECRETPONG_TIMEOUT=10000;
+ private static final int ACCEPTED_TIMEOUT = 5000;
+ private static final int SECRETPONG_TIMEOUT = 20000;
+
+ //Intervals between connectivity checks and NetworkID reckoning.
+ //Checks for added peers may be delayed up to LONG_PERIOD, so don't
make it too long.
+ private static final long BETWEEN_PEERS = 2000;
+ private static final long STARTUP_DELAY = 30000;
+ private static final long LONG_PERIOD = 120000;
+
+ private final short MAX_HTL;
+ private final short MIN_HTL=3;
private final boolean logMINOR;
+ private static final int NO_NETWORKID = 0;
+
+ //The number of pings, etc. beyond which is considered a sane value to
start experimenting from.
+ private static final int COMFORT_LEVEL=20;
+
//Atomic: Locking for both via secretsByPeer
private final HashMap secretsByPeer=new HashMap();
private final HashMap secretsByUID=new HashMap();
private final Node node;
+ private int startupChecks;
- NetworkIDManager(Node node) {
+ NetworkIDManager(final Node node) {
this.node=node;
+ this.MAX_HTL=node.maxHTL();
this.logMINOR = Logger.shouldLog(Logger.MINOR, this);
+ if (!disableSecretPinger) {
+ node.getTicker().queueTimedJob(new Runnable() {
+ public void run() {
+ checkAllPeers();
+ startupChecks =
node.peers.quickCountConnectedPeers();
+ reschedule(0);
+ }
+ }, STARTUP_DELAY);
+ }
}
/**
@@ -229,4 +261,326 @@
return DMT.createFNPSecretPong(uid, counter, secret);
}
}
+
+ private final class PingRecord {
+ PeerNode target;
+ PeerNode via;
+ long lastSuccess=-1;
+ long lastTry=-1;
+ int shortestSuccess=Integer.MAX_VALUE;
+ RunningAverage average=new
BootstrappingDecayingRunningAverage(0.0, 0.0, 1.0, 200, null);
+ RunningAverage sHtl=new
BootstrappingDecayingRunningAverage(MAX_HTL, 0.0, MAX_HTL, 200, null);
+ RunningAverage fHtl=new
BootstrappingDecayingRunningAverage(MAX_HTL, 0.0, MAX_HTL, 200, null);
+ RunningAverage sDawn=new
BootstrappingDecayingRunningAverage(0.0, 0.0, MAX_HTL, 200, null);
+ RunningAverage fDawn=new
BootstrappingDecayingRunningAverage(0.0, 0.0, MAX_HTL, 200, null);
+ public String toString() {
+ return "percent="+average.currentValue();
+ }
+ public void success(int counter, short htl, short dawn) {
+ long now=System.currentTimeMillis();
+ lastTry=now;
+ lastSuccess=now;
+ average.report(1.0);
+ if (counter < shortestSuccess)
+ shortestSuccess=counter;
+ dawn=(short)(htl-dawn);
+ sHtl.report(htl);
+ sDawn.report(dawn);
+ }
+ public void failure(int counter, short htl, short dawn) {
+ long now=System.currentTimeMillis();
+ lastTry=now;
+ average.report(0.0);
+ dawn=(short)(htl-dawn);
+ fHtl.report(htl);
+ fDawn.report(dawn);
+ }
+ /**
+ * Written to start high and slowly restrict the htl at 80%.
+ */
+ public short getNextHtl() {
+ if (sHtl.countReports()<COMFORT_LEVEL) {
+ return MAX_HTL;
+ } else if (average.currentValue()>0.8) {
+ //looking good, try lower htl
+ short htl=(short)(sHtl.currentValue()-0.5);
+ if (htl<MIN_HTL)
+ htl=MIN_HTL;
+ return htl;
+ } else {
+ //not so good, try higher htl
+ short htl=(short)(sHtl.currentValue()+0.5);
+ if (htl>MAX_HTL)
+ htl=MAX_HTL;
+ return htl;
+ }
+ }
+ /**
+ * Written to start with 2 random hops, and slowly expand if
too many failures.
+ * Will not use more than 1/2 the hops. For good connections,
should always be 2.
+ */
+ public short getNextDawnHtl(short htl) {
+ //the number of random hops (htl-dawn)
+ short diff;
+ short max=(short)(htl/2-1);
+ if (fDawn.countReports()<COMFORT_LEVEL) {
+ diff=2;
+ } else if (sDawn.countReports()<COMFORT_LEVEL) {
+ //We've had enough failures, not successes
+ diff=(short)(fDawn.currentValue()+0.5);
+ } else {
+ //Just a different algorithim than getNextHtl()
so that they both might stabilize...
+
diff=(short)(0.25*fDawn.currentValue()+0.75*sDawn.currentValue());
+ }
+ if (diff>max)
+ diff=max;
+ return (short)(htl-diff);
+ }
+ public boolean equals(Object o) {
+ PeerNode p=(PeerNode)o;
+ return (via.equals(p));
+ }
+ public int hashCode() {
+ return via.hashCode();
+ }
+ }
+
+ //Directional lists of reachability, a "Map of Maps" of peers to
pingRecords.
+ //This is asymetric; so recordsByPeer.get(a).get(b) [i.e. a's
reachability through peer b] may not
+ //be nearly the same as recordsByPeer.get(b).get(a) [i.e. b's
reachability through peer a].
+ private HashMap recordMapsByPeer=new HashMap();
+
+ private PingRecord getPingRecord(PeerNode target, PeerNode via) {
+ PingRecord retval;
+ synchronized (recordMapsByPeer) {
+ HashMap
peerRecords=(HashMap)recordMapsByPeer.get(target);
+ if (peerRecords==null) {
+ //no record of any pings towards target
+ peerRecords=new HashMap();
+ recordMapsByPeer.put(target, peerRecords);
+ }
+ retval=(PingRecord)peerRecords.get(via);
+ if (retval==null) {
+ //no records via this specific peer
+ retval=new PingRecord();
+ retval.target=target;
+ retval.via=via;
+ peerRecords.put(via, retval);
+ }
+ }
+ return retval;
+ }
+
+ private void forgetPingRecords(PeerNode p) {
+ synchronized (workQueue) {
+ workQueue.remove(p);
+ if (p.equals(processing)) {
+ //don't chase the thread making records, signal
a fault.
+ processingRace=true;
+ return;
+ }
+ }
+ synchronized (recordMapsByPeer) {
+ recordMapsByPeer.remove(p);
+ Iterator i=recordMapsByPeer.values().iterator();
+ while (i.hasNext()) {
+ HashMap complement=(HashMap)i.next();
+ //FIXME: NB: Comparing PeerNodes with
PingRecords.
+ complement.values().remove(p);
+ }
+ }
+ }
+
+ private List workQueue=new ArrayList();
+ private PeerNode processing;
+ private boolean processingRace;
+
+ private void reschedule(long period) {
+ node.getTicker().queueTimedJob(this, LONG_PERIOD);
+ }
+
+ public void run() {
+ //pick a target
+ synchronized (workQueue) {
+ if (processing!=null) {
+ Logger.error(this, "possibly *bad* programming
error, only one thread should use secretpings");
+ return;
+ }
+ if (!workQueue.isEmpty())
+ processing=(PeerNode)workQueue.remove(0);
+ }
+ if (processing!=null) {
+ PeerNode target=processing;
+ double randomTarget=node.random.nextDouble();
+ HashSet nodesRoutedTo = new HashSet();
+ PeerNode next = node.peers.closerPeer(target,
nodesRoutedTo, null, randomTarget, true, false, -1, null, null);
+ while (next!=null && target.isRoutable() &&
!processingRace) {
+ nodesRoutedTo.add(next);
+ //the order is not that important, but for all
connected peers try to ping 'target'
+ blockingUpdatePingRecord(target, next);
+ //Since we are causing traffic to 'target'
+ betweenPingSleep(target);
+ next = node.peers.closerPeer(target,
nodesRoutedTo, null, randomTarget, true, false, -1, null, null);
+ }
+ }
+ boolean didAnything;
+ synchronized (workQueue) {
+ didAnything=(processing!=null);
+ //how sad... all that work may be garbage.
+ if (processingRace) {
+ processingRace=false;
+ //processing must not be null now, but must be
null when we call the forget() function.
+ PeerNode target=processing;
+ processing=null;
+ forgetPingRecords(target);
+ }
+ }
+ if (startupChecks>0)
+ startupChecks--;
+ else
+ doNetworkIDReckoning(didAnything);
+ synchronized (workQueue) {
+ if (workQueue.isEmpty()) {
+ //Add two random peers to check next time, or
maybe... all of them?
+ PeerNode p1=node.peers.getRandomPeer();
+ PeerNode p2=node.peers.getRandomPeer(p1);
+ addWorkToLockedQueue(p1);
+ addWorkToLockedQueue(p2);
+ reschedule(LONG_PERIOD);
+ } else {
+ reschedule(BETWEEN_PEERS);
+ }
+ }
+ }
+
+ // Best effort ping from next to target, if anything out of the
ordinary happens, it counts as a failure.
+ private void blockingUpdatePingRecord(PeerNode target, PeerNode next) {
+ //make a secret & uid
+ long uid=node.random.nextLong();
+ long secret=node.random.nextLong();
+ PingRecord record=getPingRecord(target, next);
+ short htl=record.getNextHtl();
+ short dawn=record.getNextDawnHtl(htl);
+
+ boolean success=false;
+ int suppliedCounter=1;
+
+ try {
+ //store secret in target
+ target.sendSync(DMT.createFNPStoreSecret(uid, secret),
null);
+
+ //Wait for an accepted or give up
+ MessageFilter mfAccepted =
MessageFilter.create().setSource(target).setField(DMT.UID,
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPAccepted);
+ Message msg = node.usm.waitFor(mfAccepted, null);
+
+ if (msg==null || (msg.getSpec() != DMT.FNPAccepted)) {
+ //backoff?
+ Logger.error(this, "peer is unresponsive to
StoreSecret "+target);
+ return;
+ }
+
+ //next... send a secretping through next to target
+ next.sendSync(DMT.createFNPSecretPing(uid,
target.getLocation(), htl, dawn, 0), null);
+
+ //wait for a response; SecretPong, RejectLoop, or
timeout
+ MessageFilter mfPong =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(SECRETPONG_TIMEOUT).setType(DMT.FNPSecretPong);
+ MessageFilter mfRejectLoop =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(SECRETPONG_TIMEOUT).setType(DMT.FNPRejectedLoop);
+
+ msg = node.usm.waitFor(mfPong.or(mfRejectLoop), null);
+
+ if (msg==null) {
+ Logger.error(this, "fatal timeout in waiting
for secretpong from "+next);
+ } else if (msg.getSpec() == DMT.FNPSecretPong) {
+ suppliedCounter=msg.getInt(DMT.COUNTER);
+ long suppliedSecret=msg.getLong(DMT.SECRET);
+ if (logMINOR) Logger.minor(this, "got secret,
counter="+suppliedCounter);
+ success=(secret==suppliedSecret);
+ } else if (msg.getSpec() == DMT.FNPRejectedLoop) {
+ Logger.normal(this, "top level rejectLoop (no
route found): "+next+" -> "+target);
+ }
+ } catch (NotConnectedException e) {
+ Logger.normal(this, "one party left during connectivity
test: "+e);
+ } catch (DisconnectedException e) {
+ Logger.normal(this, "one party left during connectivity
test: "+e);
+ } finally {
+ if (success)
+ record.success(suppliedCounter, htl, dawn);
+ else
+ record.failure(suppliedCounter, htl, dawn);
+ }
+ }
+
+ private void betweenPingSleep(PeerNode target) {
+ //We are currently sending secret pings to target, sleep for a
while to be nice; could increase for cause of target's backoff.
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ //ignore
+ }
+ }
+
+ private void addWorkToLockedQueue(PeerNode p) {
+ if (p!=null && !workQueue.contains(p))
+ workQueue.add(p);
+ }
+
+ public void checkAllPeers() {
+ double randomTarget=node.random.nextDouble();
+ HashSet connectedPeers = new HashSet();
+ PeerNode next = node.peers.closerPeer(null, connectedPeers,
null, randomTarget, true, false, -1, null, null);
+ while (next!=null) {
+ connectedPeers.add(next);
+ next = node.peers.closerPeer(null, connectedPeers,
null, randomTarget, true, false, -1, null, null);
+ }
+ Iterator i=connectedPeers.iterator();
+ synchronized (workQueue) {
+ while (i.hasNext()) {
+ addWorkToLockedQueue((PeerNode)i.next());
+ }
+ }
+ }
+
+ /**
+ * Takes all the stored PingRecords, combines it with the network id's
advertised by our peers,
+ * and then does the monstrous task of doing something useful with that
data. At the end of this
+ * function we must assign and broadcast a network id to each of our
peers; or at least the ones
+ * we have ping records for this time around; even if it is just
totally madeup identifiers.
+ */
+ private void doNetworkIDReckoning(boolean anyPingChanges) {
+ // [...!!!...] \\
+ }
+
+ public void onPeerNodeChangedNetworkID(PeerNode p) {
+ /*
+ If the network group we assigned to them is (unstable?)...
that is; we would have made a
+ different assignment based on there preference, change the
network id for that entire group
+ and readvertise it to the peers.
+
+ This helps the network form a consensus much more quickly by
not waiting for the next round
+ of peer-secretpinging/and network-id-reckoning. Note that we
must still not clobber priorities
+ so...
+
+ //do nothing on:
p.getNetGroup().disallowedIds.contains(p.getNetID());
+ //do nothing on: allAssignedNetGroups.contains(p.getNetID());
+
+ There is a minor race condition here that between updates we
might improperly favor the first
+ peer to notify us of a new network id, but this will be
authoritatively clobbered next round.
+ */
+ }
+
+ /**
+ A list of peers that we have assigned a network id to, and some logic
as to why.
+ */
+ private static class PeerNetworkGroup {
+ List members;
+ int networkid;
+ boolean ourGroup;
+ int getConsensus() {
+ //mod(peers[].getNetworkID() +
ourGroup?ourID:NO_NETWORKID )
+ return NO_NETWORKID;
+ }
+ }
+
+ //or zero if we don't know yet
+ private int ourNetworkId = NO_NETWORKID;
}
\ No newline at end of file