Author: robert Date: 2008-01-10 22:46:19 +0000 (Thu, 10 Jan 2008) New Revision: 17004
Modified:
trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
Log:
- remove unneccesary redundancy
- missed one timeSent = -1
- minor just-on-time deadlock
Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2008-01-10
22:15:13 UTC (rev 17003)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2008-01-10
22:46:19 UTC (rev 17004)
@@ -41,6 +41,9 @@
/**
* @author ian
+ *
+ * Given a PartiallyReceivedBlock retransmit to another node (to be received
by BlockReceiver).
+ * Since a PRB can be concurrently transmitted to many peers NOWHERE in this
class is prb.abort() to be called.
*/
public class BlockTransmitter {
@@ -102,18 +105,12 @@
totalPackets=_prb.getNumPackets();
} catch (NotConnectedException e) {
Logger.normal(this,
"Terminating send: "+e);
- synchronized(_senderThread) {
- _sendComplete = true;
-
_senderThread.notifyAll();
- return;
- }
+ //the recieve() thread should
notice...
+ return;
} catch (AbortedException e) {
Logger.normal(this,
"Terminating send due to abort: "+e);
- synchronized(_senderThread) {
- _sendComplete = true;
-
_senderThread.notifyAll();
- return;
- }
+ //the recieve() thread should
notice...
+ return;
}
synchronized (_senderThread) {
_sentPackets.setBit(packetNo,
true);
@@ -164,8 +161,8 @@
} catch (InterruptedException
e) {
// Ignore
}
- now =
System.currentTimeMillis();
}
+ now = System.currentTimeMillis();
}
}
};
@@ -176,7 +173,7 @@
}
public boolean send(Executor executor) {
- PartiallyReceivedBlock.PacketReceivedListener myListener;
+ PartiallyReceivedBlock.PacketReceivedListener myListener=null;
try {
synchronized(_prb) {
@@ -192,29 +189,12 @@
}
public void receiveAborted(int reason,
String description) {
- try {
-
_destination.sendAsync(DMT.createSendAborted(_uid, reason, description), null,
0, _ctr);
- } catch (NotConnectedException
e) {
-
if(Logger.shouldLog(Logger.MINOR, this))
-
Logger.minor(this, "Receive aborted and receiver is not connected");
- }
}
});
}
executor.execute(_senderThread, "BlockTransmitter
sender for "+_uid);
while (true) {
- if (_prb.isAborted()) {
- synchronized(_senderThread) {
- _sendComplete = true;
- _senderThread.notifyAll();
- }
- String desc=_prb.getAbortDescription();
- if (desc.indexOf("Downstream")<0)
- desc="Downstream transfer
failed: "+desc;
- sendAborted(_prb.getAbortReason(),
desc);
- return false;
- }
Message msg;
boolean logMINOR =
Logger.shouldLog(Logger.MINOR, this);
try {
@@ -225,28 +205,15 @@
if(logMINOR) Logger.minor(this, "Got
"+msg);
} catch (DisconnectedException e) {
Logger.normal(this, "Terminating send
"+_uid+" to "+_destination+" from "+_destination.getSocketHandler()+" because
node disconnected while waiting");
- synchronized(_senderThread) {
- _sendComplete = true;
- _senderThread.notifyAll();
- }
//They disconnected, can't send an
abort to them then can we?
return false;
}
if(logMINOR) Logger.minor(this, "Got "+msg);
- if(_sendComplete) {
- Logger.normal(this, "send cancelled by
_senderThread");
- //_senderThread will have sent an
aborted message
- return false;
- }
if (msg == null) {
long now = System.currentTimeMillis();
//SEND_TIMEOUT (one minute) after all
packets have been transmitted, terminate the send.
if((timeAllSent > 0) && ((now -
timeAllSent) > SEND_TIMEOUT) &&
(getNumSent() ==
_prb.getNumPackets())) {
- synchronized(_senderThread) {
- _sendComplete = true;
-
_senderThread.notifyAll();
- }
String
timeString=TimeUtil.formatTime((now - timeAllSent), 2, true);
Logger.error(this, "Terminating
send "+_uid+" to "+_destination+" from "+_destination.getSocketHandler()+" as
we haven't heard from receiver in "+timeString+ '.');
sendAborted(RetrievalException.RECEIVER_DIED, "Haven't heard from you
(receiver) in "+timeString);
@@ -262,27 +229,19 @@
if
(_prb.isReceived(packetNo.intValue())) {
synchronized(_senderThread) {
_unsent.addFirst(packetNo);
+ timeAllSent=-1;
_sentPackets.setBit(packetNo.intValue(), false);
_senderThread.notifyAll();
}
}
}
} else if
(msg.getSpec().equals(DMT.allReceived)) {
- synchronized(_senderThread) {
- _sendComplete = true;
- _senderThread.notifyAll();
- }
return true;
} else if
(msg.getSpec().equals(DMT.sendAborted)) {
// Overloaded: receiver no longer wants
the data
// Do NOT abort PRB, it's none of its
business.
// And especially, we don't want a
downstream node to
// be able to abort our sends to all
the others!
- _prb.removeListener(myListener);
- synchronized(_senderThread) {
- _sendComplete = true;
- _senderThread.notifyAll();
- }
//They aborted, don't need to send an
aborted back :)
return false;
} else {
@@ -305,11 +264,13 @@
}
return false;
} finally {
- //Terminate, if we are not listening for control
packets, don't be sending any data
+ //Terminate the sender thread, if we are not listening
for control packets, don't be sending any data
synchronized(_senderThread) {
_sendComplete = true;
_senderThread.notifyAll();
- }
+ }
+ if (myListener!=null)
+ _prb.removeListener(myListener);
}
}
