Author: mrogers
Date: 2007-02-25 19:10:41 +0000 (Sun, 25 Feb 2007)
New Revision: 11920
Modified:
trunk/apps/load-balancing-sims/phase7/sim/Node.java
trunk/apps/load-balancing-sims/phase7/sim/Peer.java
Log:
One timer per node (should lead to fairer bandwidth sharing between peers when
throttled)
Modified: trunk/apps/load-balancing-sims/phase7/sim/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Node.java 2007-02-25 18:08:47 UTC
(rev 11919)
+++ trunk/apps/load-balancing-sims/phase7/sim/Node.java 2007-02-25 19:10:41 UTC
(rev 11920)
@@ -12,6 +12,9 @@
{
public final static boolean LOG = false;
+ // Retransmission/coalescing timer
+ public final static double TICK = 0.1; // Timer granularity, seconds
+
// Flow control
public static boolean useTokens = false;
public static boolean useBackoff = false;
@@ -40,6 +43,7 @@
private boolean decrementMaxHtl = false;
private boolean decrementMinHtl = false;
public TokenBucket bandwidth; // Bandwidth limiter
+ private boolean timerRunning = false; // Coalescing/retransmission timer
private int spareTokens = FLOW_TOKENS; // Tokens not allocated to a peer
private double delay = 0.0; // Delay caused by congestion or b/w limiter
private LinkedList<Search> searchQueue;
@@ -66,7 +70,7 @@
pubKeyCache = new LruCache<Integer> (16000);
if (Math.random() < 0.5) decrementMaxHtl = true;
if (Math.random() < 0.25) decrementMinHtl = true;
- bandwidth = new TokenBucket (40000, 400000);
+ bandwidth = new TokenBucket (40000, 80000);
searchQueue = new LinkedList<Search>();
if (useTokens) {
// Allocate flow control tokens after a short delay
@@ -501,6 +505,28 @@
return copy;
}
+ // Called by Peer to start the retransmission/coalescing timer
+ public void startTimer()
+ {
+ if (timerRunning) return;
+ timerRunning = true;
+ if (LOG) log ("starting timer");
+ Event.schedule (this, TICK, TIMER, null);
+ }
+
+ // Event callback - check retransmission/coalescing deadlines
+ private void timer()
+ {
+ boolean stopTimer = true;
+ // Check the peers in a random order for fair bandwidth sharing
+ for (Peer p : peers()) if (p.timer()) stopTimer = false;
+ if (stopTimer && timerRunning) {
+ timerRunning = false;
+ if (LOG) log ("stopping timer");
+ }
+ else Event.schedule (this, TICK, TIMER, null);
+ }
+
public void log (String message)
{
Event.log (net.address + " " + message);
@@ -640,6 +666,10 @@
case SEND_SEARCH:
sendSearch();
break;
+
+ case TIMER:
+ timer();
+ break;
}
}
@@ -650,4 +680,5 @@
public final static int SSK_COLLISION = 5;
private final static int ALLOCATE_TOKENS = 6;
private final static int SEND_SEARCH = 7;
+ private final static int TIMER = 8;
}
Modified: trunk/apps/load-balancing-sims/phase7/sim/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2007-02-25 18:08:47 UTC
(rev 11919)
+++ trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2007-02-25 19:10:41 UTC
(rev 11920)
@@ -4,7 +4,7 @@
import java.util.Iterator;
import java.util.HashSet;
-public class Peer implements EventTarget
+public class Peer
{
public final static boolean LOG = false;
@@ -18,10 +18,8 @@
public final static double FRTO = 1.5; // Fast retx timeout in RTTs
public final static double RTT_DECAY = 0.9; // Exp moving average
public final static double LINK_IDLE = 8.0; // RTTs without transmitting
+ public final static double MAX_DELAY = 0.1; // Coalescing delay, seconds
- // Retransmission/coalescing timer
- public final static double TICK = 0.1; // Timer granularity, seconds
-
// Backoff
public final static double INITIAL_BACKOFF = 1.0; // Seconds
public final static double BACKOFF_MULTIPLIER = 2.0;
@@ -40,7 +38,6 @@
private CongestionWindow window; // AIMD congestion window
private double lastTransmission = Double.POSITIVE_INFINITY; // Abs. time
private boolean tgif = false; // "Transfers go in first" toggle
- private boolean timerRunning = false; // Retransmission/coalescing timer
// Receiver state
private HashSet<Integer> rxDupe; // Detect duplicates by sequence number
@@ -68,7 +65,7 @@
// Queue a message for transmission
public void sendMessage (Message m)
{
- m.deadline = Event.time() + TICK;
+ m.deadline = Event.time() + MAX_DELAY;
if (m instanceof Block) {
if (LOG) log (m + " added to transfer queue");
transferQueue.add (m);
@@ -78,20 +75,11 @@
searchQueue.add (m);
}
// Start the coalescing timer
- startTimer();
+ node.startTimer();
// Send as many packets as possible
while (send (-1));
}
- // Start the retransmission/coalescing timer
- private void startTimer()
- {
- if (timerRunning) return;
- timerRunning = true;
- if (LOG) log ("starting timer");
- Event.schedule (this, TICK, TIMER, null);
- }
-
// Try to send a packet, return true if a packet was sent
private boolean send (int ack)
{
@@ -164,7 +152,7 @@
if (p.messages != null) {
p.sent = Event.time();
txBuffer.add (p);
- startTimer(); // Start the retransmission timer
+ node.startTimer(); // Start the retransmission timer
window.bytesSent (p.size);
}
return true;
@@ -303,18 +291,14 @@
return tokensIn;
}
- // Event callback: wake up, send packets, go back to sleep
- private void timer()
+ // Called by Node - return true if there are messages outstanding
+ public boolean timer()
{
- // Send as many packets as possible
- while (send (-1));
// Stop the timer if there's nothing to wait for
if (searchQueue.size + transferQueue.size == 0
- && txBuffer.isEmpty()) {
- if (LOG) log ("stopping timer");
- timerRunning = false;
- return;
- }
+ && txBuffer.isEmpty()) return false;
+ // Send as many packets as possible
+ while (send (-1));
// Check the retransmission timeouts
double now = Event.time();
for (Packet p : txBuffer) {
@@ -326,8 +310,7 @@
window.timeout (now);
}
}
- // Schedule the next check
- Event.schedule (this, TICK, TIMER, null);
+ return true;
}
public void log (String message)
@@ -339,12 +322,4 @@
{
return Integer.toString (address);
}
-
- // EventTarget interface
- public void handleEvent (int type, Object data)
- {
- if (type == TIMER) timer();
- }
-
- private final static int TIMER = 1;
}