Author: toad
Date: 2008-04-25 19:04:57 +0000 (Fri, 25 Apr 2008)
New Revision: 19560

Added:
   trunk/freenet/src/freenet/node/SyncSendWaitedTooLongException.java
Modified:
   trunk/freenet/src/freenet/io/comm/PeerContext.java
   trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
   trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
   trunk/freenet/src/freenet/io/xfer/PacketThrottle.java
   trunk/freenet/src/freenet/node/FailureTable.java
   trunk/freenet/src/freenet/node/PeerNode.java
   trunk/freenet/src/freenet/node/RequestHandler.java
   trunk/freenet/src/freenet/node/SSKInsertHandler.java
   trunk/freenet/src/freenet/node/SSKInsertSender.java
Log:
sendThrottledMessage() now has the option to send synchronously (with a 
timeout, and an exception)
Use this to make sure we've sent the packet and got the callback before calling 
applyByteCounts().
Not as big a change as the last commit in terms of increasing the SSK request 
average bytes...

Modified: trunk/freenet/src/freenet/io/comm/PeerContext.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/PeerContext.java  2008-04-25 18:44:50 UTC 
(rev 19559)
+++ trunk/freenet/src/freenet/io/comm/PeerContext.java  2008-04-25 19:04:57 UTC 
(rev 19560)
@@ -8,6 +8,7 @@
 import freenet.io.xfer.PacketThrottle;
 import freenet.io.xfer.WaitedTooLongException;
 import freenet.node.OutgoingPacketMangler;
+import freenet.node.SyncSendWaitedTooLongException;

 /**
  * @author amphibian
@@ -35,8 +36,9 @@
        /** Send a message to the node */
        public void sendAsync(Message msg, AsyncMessageCallback cb, int 
alreadyReportedBytes, ByteCounter ctr) throws NotConnectedException;

-       /** Send a throttled message to the node (may block for a long time). */
-       public void sendThrottledMessage(Message msg, int packetSize, 
ByteCounter ctr, int timeout) throws NotConnectedException, 
WaitedTooLongException;
+       /** Send a throttled message to the node (may block for a long time). 
+        * @throws SyncSendWaitedTooLongException */
+       public void sendThrottledMessage(Message msg, int packetSize, 
ByteCounter ctr, int timeout, boolean waitForSent) throws 
NotConnectedException, WaitedTooLongException, SyncSendWaitedTooLongException;

        /** Get the current boot ID. This is a random number that changes every 
time the node starts up. */
        public long getBootID();

Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java     2008-04-25 
18:44:50 UTC (rev 19559)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java     2008-04-25 
19:04:57 UTC (rev 19560)
@@ -31,6 +31,7 @@
 import freenet.io.comm.PeerContext;
 import freenet.io.comm.RetrievalException;
 import freenet.node.PrioRunnable;
+import freenet.node.SyncSendWaitedTooLongException;
 import freenet.support.BitArray;
 import freenet.support.Executor;
 import freenet.support.Logger;
@@ -98,7 +99,7 @@
                                        }
                                        int totalPackets;
                                        try {
-                                               
_destination.sendThrottledMessage(DMT.createPacketTransmit(_uid, packetNo, 
_sentPackets, _prb.getPacket(packetNo)), _prb._packetSize, _ctr, SEND_TIMEOUT);
+                                               
_destination.sendThrottledMessage(DMT.createPacketTransmit(_uid, packetNo, 
_sentPackets, _prb.getPacket(packetNo)), _prb._packetSize, _ctr, SEND_TIMEOUT, 
false);
                                                
totalPackets=_prb.getNumPackets();
                                        } catch (NotConnectedException e) {
                                                Logger.normal(this, 
"Terminating send: "+e);
@@ -114,6 +115,10 @@
                                                        _sendComplete = true;
                                                }
                                                return;
+                                       } catch (SyncSendWaitedTooLongException 
e) {
+                                               // Impossible
+                                               Logger.error(this, "Impossible: 
Caught "+e, e);
+                                               return;
                                        }
                                        synchronized (_senderThread) {
                                                _sentPackets.setBit(packetNo, 
true);

Modified: trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java      2008-04-25 
18:44:50 UTC (rev 19559)
+++ trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java      2008-04-25 
19:04:57 UTC (rev 19560)
@@ -11,6 +11,7 @@
 import freenet.io.comm.MessageFilter;
 import freenet.io.comm.NotConnectedException;
 import freenet.io.comm.PeerContext;
+import freenet.node.SyncSendWaitedTooLongException;
 import freenet.support.BitArray;
 import freenet.support.Logger;

@@ -244,7 +245,7 @@

                        // Congestion control and bandwidth limiting
                        try {
-                               
peer.sendThrottledMessage(DMT.createFNPBulkPacketSend(uid, blockNo, buf), 
buf.length, ctr, BulkReceiver.TIMEOUT);
+                               
peer.sendThrottledMessage(DMT.createFNPBulkPacketSend(uid, blockNo, buf), 
buf.length, ctr, BulkReceiver.TIMEOUT, false);
                                synchronized(this) {
                                        blocksNotSentButPresent.setBit(blockNo, 
false);
                                }
@@ -257,6 +258,10 @@
                        } catch (WaitedTooLongException e) {
                                Logger.error(this, "Failed to send bulk packet 
"+blockNo+" for "+this);
                                return false;
+                       } catch (SyncSendWaitedTooLongException e) {
+                               // Impossible
+                               Logger.error(this, "Impossible: Caught "+e, e);
+                               return false;
                        }
                }
        }

Modified: trunk/freenet/src/freenet/io/xfer/PacketThrottle.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/PacketThrottle.java       2008-04-25 
18:44:50 UTC (rev 19559)
+++ trunk/freenet/src/freenet/io/xfer/PacketThrottle.java       2008-04-25 
19:04:57 UTC (rev 19560)
@@ -25,6 +25,7 @@
 import freenet.io.comm.Peer;
 import freenet.io.comm.PeerContext;
 import freenet.node.PeerNode;
+import freenet.node.SyncSendWaitedTooLongException;
 import freenet.support.DoubleTokenBucket;
 import freenet.support.Logger;

@@ -146,7 +147,7 @@
                return ((PACKET_SIZE * 1000.0 / getDelay()));
        }

-       public void sendThrottledMessage(Message msg, PeerContext peer, 
DoubleTokenBucket overallThrottle, int packetSize, ByteCounter ctr, long 
deadline) throws NotConnectedException, ThrottleDeprecatedException, 
WaitedTooLongException {
+       public void sendThrottledMessage(Message msg, PeerContext peer, 
DoubleTokenBucket overallThrottle, int packetSize, ByteCounter ctr, long 
deadline, boolean blockForSend) throws NotConnectedException, 
ThrottleDeprecatedException, WaitedTooLongException, 
SyncSendWaitedTooLongException {
                long start = System.currentTimeMillis();
                long bootID = peer.getBootID();
                synchronized(this) {
@@ -238,6 +239,23 @@
                                Logger.minor(this, "Not throttling 
"+peer.shortToString()+" for "+this);
                        peer.sendAsync(msg, callback, packetSize, ctr);
                        ctr.sentPayload(packetSize);
+                       if(blockForSend) {
+                               synchronized(callback) {
+                                       long timeout = 
System.currentTimeMillis() + 60*1000;
+                                       long now;
+                                       while((now = 
System.currentTimeMillis()) < timeout && !callback.finished) {
+                                               try {
+                                                       
callback.wait((int)(timeout - now));
+                                               } catch (InterruptedException 
e) {
+                                                       // Ignore
+                                               }
+                                       }
+                                       if(!callback.finished) {
+                                               throw new 
SyncSendWaitedTooLongException();
+                                       }
+                               }
+                       }
+                       
                } catch (RuntimeException e) {
                        callback.fatalError();
                        throw e;

Modified: trunk/freenet/src/freenet/node/FailureTable.java
===================================================================
--- trunk/freenet/src/freenet/node/FailureTable.java    2008-04-25 18:44:50 UTC 
(rev 19559)
+++ trunk/freenet/src/freenet/node/FailureTable.java    2008-04-25 19:04:57 UTC 
(rev 19560)
@@ -421,12 +421,14 @@

                                public void run() {
                                        try {
-                                               
source.sendThrottledMessage(data, dataLength, senderCounter, 60*1000);
+                                               
source.sendThrottledMessage(data, dataLength, senderCounter, 60*1000, false);
                                        } catch (NotConnectedException e) {
                                                // :(
                                        } catch (WaitedTooLongException e) {
                                                // :<
                                                Logger.error(this, "Waited too 
long sending SSK data");
+                                       } catch (SyncSendWaitedTooLongException 
e) {
+                                               // Impossible
                                        } finally {
                                                node.unlockUID(uid, isSSK, 
false, false, true, false);
                                        }

Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java        2008-04-25 18:44:50 UTC 
(rev 19559)
+++ trunk/freenet/src/freenet/node/PeerNode.java        2008-04-25 19:04:57 UTC 
(rev 19560)
@@ -3874,12 +3874,12 @@
                return resendBytesSent;
        }

-       public void sendThrottledMessage(Message msg, int packetSize, 
ByteCounter ctr, int timeout) throws NotConnectedException, 
WaitedTooLongException {
+       public void sendThrottledMessage(Message msg, int packetSize, 
ByteCounter ctr, int timeout, boolean blockForSend) throws 
NotConnectedException, WaitedTooLongException, SyncSendWaitedTooLongException {
                long deadline = System.currentTimeMillis() + timeout;
                if(logMINOR) Logger.minor(this, "Sending throttled message with 
timeout "+timeout+" packet size "+packetSize+" to "+shortToString());
                for(int i=0;i<100;i++) {
                        try {
-                               getThrottle().sendThrottledMessage(msg, this, 
node.outputThrottle, packetSize, ctr, deadline);
+                               getThrottle().sendThrottledMessage(msg, this, 
node.outputThrottle, packetSize, ctr, deadline, blockForSend);
                                return;
                        } catch (ThrottleDeprecatedException e) {
                                // Try with the new throttle. We don't need it, 
we'll get it from getThrottle().

Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java  2008-04-25 18:44:50 UTC 
(rev 19559)
+++ trunk/freenet/src/freenet/node/RequestHandler.java  2008-04-25 19:04:57 UTC 
(rev 19560)
@@ -350,13 +350,15 @@

                        public void run() {
                                try {
-                                       source.sendThrottledMessage(dataMsg, 
data.length, RequestHandler.this, 60*1000);
+                                       source.sendThrottledMessage(dataMsg, 
data.length, RequestHandler.this, 60*1000, true);
                                        applyByteCounts();
                                } catch (NotConnectedException e) {
                                        // Okay
                                } catch (WaitedTooLongException e) {
                                        // Grrrr
-                                       Logger.error(this, "Waited too long to 
send SSK data on "+RequestHandler.this);
+                                       Logger.error(this, "Waited too long to 
send SSK data on "+RequestHandler.this+" because of bwlimiting");
+                               } catch (SyncSendWaitedTooLongException e) {
+                                       Logger.error(this, "Waited too long to 
send SSK data on "+RequestHandler.this+" because of peer");
                                } finally {
                                        unregisterRequestHandlerWithNode();
                                }
@@ -382,7 +384,12 @@
                Message headersMsg = DMT.createFNPSSKDataFoundHeaders(uid, 
headers);
                source.sendAsync(headersMsg, null, 0, ctr);
                final Message dataMsg = DMT.createFNPSSKDataFoundData(uid, 
data);
-               source.sendThrottledMessage(dataMsg, data.length, ctr, 60*1000);
+               try {
+                       source.sendThrottledMessage(dataMsg, data.length, ctr, 
60*1000, false);
+               } catch (SyncSendWaitedTooLongException e) {
+                       // Impossible
+                       throw new Error(e);
+               }

                if(SEND_OLD_FORMAT_SSK) {
                        Message df = DMT.createFNPSSKDataFound(uid, headers, 
data);

Modified: trunk/freenet/src/freenet/node/SSKInsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertHandler.java        2008-04-25 
18:44:50 UTC (rev 19559)
+++ trunk/freenet/src/freenet/node/SSKInsertHandler.java        2008-04-25 
19:04:57 UTC (rev 19560)
@@ -184,7 +184,7 @@
                                if(logMINOR) Logger.minor(this, "Lost 
connection to source on "+uid);
                                return;
                        } catch (WaitedTooLongException e1) {
-                               Logger.error(this, "Took too long to send ssk 
datareply to "+uid);
+                               Logger.error(this, "Took too long to send ssk 
datareply to "+uid+" (because of throttling)");
                                return;
                        }
                        block = storedBlock;
@@ -247,7 +247,7 @@
                                        if(logMINOR) Logger.minor(this, "Lost 
connection to source on "+uid);
                                        return;
                                } catch (WaitedTooLongException e1) {
-                                       Logger.error(this, "Took too long to 
send ssk datareply to "+uid);
+                                       Logger.error(this, "Took too long to 
send ssk datareply to "+uid+" because of bwlimiting");
                                        return;
                                }
             }

Modified: trunk/freenet/src/freenet/node/SSKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertSender.java 2008-04-25 18:44:50 UTC 
(rev 19559)
+++ trunk/freenet/src/freenet/node/SSKInsertSender.java 2008-04-25 19:04:57 UTC 
(rev 19560)
@@ -249,13 +249,15 @@

             try {
                                next.sendAsync(headersMsg, null, 0, this);
-                               next.sendThrottledMessage(dataMsg, data.length, 
this, SSKInsertHandler.DATA_INSERT_TIMEOUT);
+                               next.sendThrottledMessage(dataMsg, data.length, 
this, SSKInsertHandler.DATA_INSERT_TIMEOUT, false);
                        } catch (NotConnectedException e1) {
                                if(logMINOR) Logger.minor(this, "Not connected 
to "+next);
                                continue;
                        } catch (WaitedTooLongException e) {
                                Logger.error(this, "Waited too long to send 
"+dataMsg+" to "+next+" on "+this);
                                continue;
+                       } catch (SyncSendWaitedTooLongException e) {
+                               // Impossible
                        }

             // Do we need to send them the pubkey?

Added: trunk/freenet/src/freenet/node/SyncSendWaitedTooLongException.java
===================================================================
--- trunk/freenet/src/freenet/node/SyncSendWaitedTooLongException.java          
                (rev 0)
+++ trunk/freenet/src/freenet/node/SyncSendWaitedTooLongException.java  
2008-04-25 19:04:57 UTC (rev 19560)
@@ -0,0 +1,11 @@
+package freenet.node;
+
+/**
+ * This exception is thrown when it we try to do a blocking send of a message, 
and it takes too long
+ * so we timeout. Compare to WaitedTooLongException, which is thrown when we 
are waiting for clearance
+ * from bandwidth limiting and don't get it within a timeout period.
+ * @author toad
+ */
+public class SyncSendWaitedTooLongException extends Exception {
+
+}


Reply via email to