Author: toad
Date: 2005-11-15 21:10:36 +0000 (Tue, 15 Nov 2005)
New Revision: 7543
Added:
trunk/freenet/src/freenet/io/xfer/AbortedException.java
trunk/freenet/src/freenet/support/UpdatableSortedLinkedListKilledException.java
Modified:
trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
trunk/freenet/src/freenet/client/StandardOnionFECCodec.java
trunk/freenet/src/freenet/crypt/DiffieHellmanContext.java
trunk/freenet/src/freenet/crypt/Yarrow.java
trunk/freenet/src/freenet/io/comm/UdpSocketManager.java
trunk/freenet/src/freenet/io/xfer/BlockReceiver.java
trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBlock.java
trunk/freenet/src/freenet/node/FNPPacketMangler.java
trunk/freenet/src/freenet/node/InsertHandler.java
trunk/freenet/src/freenet/node/KeyTracker.java
trunk/freenet/src/freenet/node/Node.java
trunk/freenet/src/freenet/node/PeerNode.java
trunk/freenet/src/freenet/node/Version.java
trunk/freenet/src/freenet/support/UpdatableSortedLinkedList.java
trunk/freenet/src/freenet/support/UpdatableSortedLinkedListWithForeignIndex.java
Log:
180:
LOTS of bug fixes and bug paranoia.
Inserts may complete now, but they often fail due to timeouts, even locally.
Modified: trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
===================================================================
--- trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
2005-11-12 22:23:39 UTC (rev 7542)
+++ trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
2005-11-15 21:10:36 UTC (rev 7543)
@@ -47,8 +47,8 @@
static final int MAX_SPLITFILE_BLOCKS_PER_SEGMENT = 1024;
static final int MAX_SPLITFILE_CHECK_BLOCKS_PER_SEGMENT = 1536;
// ~ 70kB/sec encode, 16MB segments
- static final int SPLITFILE_BLOCKS_PER_SEGMENT = 512;
- static final int SPLITFILE_CHECK_BLOCKS_PER_SEGMENT = 768;
+ static final int SPLITFILE_BLOCKS_PER_SEGMENT = 128;
+ static final int SPLITFILE_CHECK_BLOCKS_PER_SEGMENT = 192;
public HighLevelSimpleClientImpl(SimpleLowLevelClient client,
ArchiveManager mgr, BucketFactory bf, RandomSource r) {
Modified: trunk/freenet/src/freenet/client/StandardOnionFECCodec.java
===================================================================
--- trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2005-11-12
22:23:39 UTC (rev 7542)
+++ trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2005-11-15
21:10:36 UTC (rev 7543)
@@ -271,7 +271,7 @@
Runtime.getRuntime().gc();
Runtime.getRuntime().runFinalization();
long memUsedAtStart = Runtime.getRuntime().totalMemory() -
Runtime.getRuntime().freeMemory();
- Logger.minor(this, "Memory in use at start: "+memUsedAtStart);
+ Logger.minor(this, "Memory in use at start: "+memUsedAtStart+"
max="+Runtime.getRuntime().maxMemory());
System.err.println("************* Encoding " +
dataBlockStatus.length
+ " -> " + checkBlockStatus.length + "
*************");
Logger.minor(this, "Doing encode: " + dataBlockStatus.length
Modified: trunk/freenet/src/freenet/crypt/DiffieHellmanContext.java
===================================================================
--- trunk/freenet/src/freenet/crypt/DiffieHellmanContext.java 2005-11-12
22:23:39 UTC (rev 7542)
+++ trunk/freenet/src/freenet/crypt/DiffieHellmanContext.java 2005-11-15
21:10:36 UTC (rev 7543)
@@ -88,7 +88,11 @@
public synchronized void setOtherSideExponential(NativeBigInteger a) {
lastUsedTime = System.currentTimeMillis();
- if(peerExponential != null) throw new IllegalStateException("Assigned
other side exponential twice");
+ if(peerExponential != null) {
+ if(!peerExponential.equals(a))
+ throw new IllegalStateException("Assigned other side
exponential twice");
+ else return;
+ }
if(a == null) throw new NullPointerException();
peerExponential = a;
}
Modified: trunk/freenet/src/freenet/crypt/Yarrow.java
===================================================================
--- trunk/freenet/src/freenet/crypt/Yarrow.java 2005-11-12 22:23:39 UTC (rev
7542)
+++ trunk/freenet/src/freenet/crypt/Yarrow.java 2005-11-15 21:10:36 UTC (rev
7543)
@@ -535,6 +535,7 @@
}
private void fast_pool_reseed() {
+ long startTime = System.currentTimeMillis();
byte[] v0 = fast_pool.digest();
byte[] vi = v0;
@@ -550,6 +551,9 @@
rekey(tmp);
Arrays.fill(v0, (byte) 0); // blank out for security
fast_entropy = 0;
+ long endTime = System.currentTimeMillis();
+ if(endTime - startTime > 5000)
+ Logger.normal(this, "Fast pool reseed took
"+(endTime-startTime)+"ms");
}
private void slow_pool_reseed() {
Modified: trunk/freenet/src/freenet/io/comm/UdpSocketManager.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/UdpSocketManager.java 2005-11-12
22:23:39 UTC (rev 7542)
+++ trunk/freenet/src/freenet/io/comm/UdpSocketManager.java 2005-11-15
21:10:36 UTC (rev 7543)
@@ -50,6 +50,7 @@
super.start();
Thread checker = new Thread(new USMChecker());
checker.setDaemon(true);
+ checker.setPriority(Thread.MAX_PRIORITY);
checker.start();
}
@@ -88,44 +89,46 @@
public void run() { // Listen for packets
try {
- while (_active) {
- try {
- DatagramPacket packet = getPacket();
- // Check for timedout _filters
- removeTimedOutFilters();
- // Check for matched _filters
- if(packet != null) {
- Peer peer = new Peer(packet.getAddress(),
packet.getPort());
- byte[] data = packet.getData();
- int offset = packet.getOffset();
- int length = packet.getLength();
- if(lowLevelFilter != null) {
- try {
- lowLevelFilter.process(data, offset,
length, peer);
- Logger.minor(this, "Successfully handled
packet length "+length);
- } catch (Throwable t) {
- Logger.error(this, "Caught "+t+" from
"+lowLevelFilter, t);
- }
- }
- else {
- // Create a bogus context since no filter
- Message m = decodePacket(data, offset, length,
new DummyPeerContext(peer));
- if(m != null)
- checkFilters(m);
- }
- } else
- Logger.minor(this, "Null packet");
- } catch (Throwable t) {
- Logger.error(this, "Caught "+t, t);
- }
- }
+ while (/*_active*/true) {
+ try {
+ DatagramPacket packet = getPacket();
+ // Check for timedout _filters
+ removeTimedOutFilters();
+ // Check for matched _filters
+ if (packet != null) {
+ Peer peer = new
Peer(packet.getAddress(), packet.getPort());
+ byte[] data = packet.getData();
+ int offset = packet.getOffset();
+ int length = packet.getLength();
+ if (lowLevelFilter != null) {
+ try {
+
lowLevelFilter.process(data, offset, length, peer);
+
Logger.minor(this,
+
"Successfully handled packet length " + length);
+ } catch (Throwable t) {
+
Logger.error(this, "Caught " + t + " from "
+
+ lowLevelFilter, t);
+ }
+ } else {
+ // Create a bogus
context since no filter
+ Message m =
decodePacket(data, offset, length,
+ new
DummyPeerContext(peer));
+ if (m != null)
+ checkFilters(m);
+ }
+ } else
+ Logger.minor(this, "Null
packet");
+ } catch (Throwable t) {
+ Logger.error(this, "Caught " + t, t);
+ }
+ }
} finally {
- Logger.error(this, "run() exiting");
- synchronized (this) {
- _isDone = true;
- notifyAll();
+ Logger.error(this, "run() exiting");
+ synchronized (this) {
+ _isDone = true;
+ notifyAll();
+ }
}
- }
}
/**
Added: trunk/freenet/src/freenet/io/xfer/AbortedException.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/AbortedException.java 2005-11-12
22:23:39 UTC (rev 7542)
+++ trunk/freenet/src/freenet/io/xfer/AbortedException.java 2005-11-15
21:10:36 UTC (rev 7543)
@@ -0,0 +1,13 @@
+package freenet.io.xfer;
+
+/**
+ * Thrown when a transfer is aborted, and caller tries to do something on PRB,
+ * in order to avoid some races.
+ */
+public class AbortedException extends Exception {
+
+ public AbortedException(String msg) {
+ super(msg);
+ }
+
+}
Modified: trunk/freenet/src/freenet/io/xfer/BlockReceiver.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockReceiver.java 2005-11-12
22:23:39 UTC (rev 7542)
+++ trunk/freenet/src/freenet/io/xfer/BlockReceiver.java 2005-11-15
21:10:36 UTC (rev 7543)
@@ -135,6 +135,10 @@
return _prb.getBlock();
} catch(NotConnectedException e) {
throw new
RetrievalException(RetrievalException.SENDER_DISCONNECTED);
+ } catch(AbortedException e) {
+ // We didn't cause it?!
+ Logger.error(this, "Caught in receive - probably a bug
as receive sets it: "+e);
+ throw new
RetrievalException(RetrievalException.UNKNOWN);
}
}
}
Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2005-11-12
22:23:39 UTC (rev 7542)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2005-11-15
21:10:36 UTC (rev 7543)
@@ -56,7 +56,12 @@
_destination = destination;
_uid = uid;
_prb = source;
- _sentPackets = new BitArray(_prb.getNumPackets());
+ try {
+ _sentPackets = new BitArray(_prb.getNumPackets());
+ } catch (AbortedException e) {
+ Logger.error(this, "Aborted during setup");
+ // Will throw on running
+ }
}
public boolean send() {
@@ -100,11 +105,15 @@
} catch (NotConnectedException
e) {
Logger.normal(this,
"Terminating send: "+e);
_sendComplete = true;
+ } catch (AbortedException e) {
+ Logger.normal(this,
"Terminating send due to abort: "+e);
+ _sendComplete = true;
}
}
}
};
+ try {
_unsent = _prb.addListener(new
PartiallyReceivedBlock.PacketReceivedListener() {;
public void packetReceived(int packetNo) {
@@ -164,7 +173,12 @@
// Terminated abnormally
return false;
}
- }
+ }
+ } catch (AbortedException e) {
+ // Terminate
+ _sendComplete = true;
+ return false;
+ }
}
public int getNumSent() {
Modified: trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBlock.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBlock.java
2005-11-12 22:23:39 UTC (rev 7542)
+++ trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBlock.java
2005-11-15 21:10:36 UTC (rev 7543)
@@ -61,9 +61,9 @@
_packetSize = packetSize;
}
- public synchronized LinkedList addListener(PacketReceivedListener
listener) {
+ public synchronized LinkedList addListener(PacketReceivedListener
listener) throws AbortedException {
if (_aborted) {
- throw new RuntimeException("Adding listener to aborted
PRB");
+ throw new AbortedException("Adding listener to aborted
PRB");
}
_packetReceivedListeners.add(listener);
LinkedList ret = new LinkedList();
@@ -75,30 +75,30 @@
return ret;
}
- public synchronized boolean isReceived(int packetNo) {
+ public synchronized boolean isReceived(int packetNo) throws
AbortedException {
if (_aborted) {
- throw new RuntimeException("PRB is aborted");
+ throw new AbortedException("PRB is aborted");
}
return _received[packetNo];
}
- public synchronized int getNumPackets() {
+ public synchronized int getNumPackets() throws AbortedException {
if (_aborted) {
- throw new RuntimeException("PRB is aborted");
+ throw new AbortedException("PRB is aborted");
}
return _packets;
}
- public synchronized int getPacketSize() {
+ public synchronized int getPacketSize() throws AbortedException {
if (_aborted) {
- throw new RuntimeException("PRB is aborted");
+ throw new AbortedException("PRB is aborted");
}
return _packetSize;
}
- public synchronized void addPacket(int position, Buffer packet) {
+ public synchronized void addPacket(int position, Buffer packet) throws
AbortedException {
if (_aborted) {
- throw new RuntimeException("PRB is aborted");
+ throw new AbortedException("PRB is aborted");
}
if (packet.getLength() != _packetSize) {
throw new RuntimeException("New packet size
"+packet.getLength()+" but expecting packet of size "+_packetSize);
@@ -114,16 +114,16 @@
}
}
- public boolean allReceived() {
+ public boolean allReceived() throws AbortedException {
if (_aborted) {
- throw new RuntimeException("PRB is aborted");
+ throw new AbortedException("PRB is aborted");
}
return _receivedCount == _packets;
}
- public byte[] getBlock() {
+ public byte[] getBlock() throws AbortedException {
if (_aborted) {
- throw new RuntimeException("PRB is aborted");
+ throw new AbortedException("PRB is aborted");
}
if (!allReceived()) {
throw new RuntimeException("Tried to get block before
all packets received");
@@ -131,9 +131,9 @@
return _data;
}
- public Buffer getPacket(int x) {
+ public Buffer getPacket(int x) throws AbortedException {
if (_aborted) {
- throw new RuntimeException("PRB is aborted");
+ throw new AbortedException("PRB is aborted");
}
return new Buffer(_data, x * _packetSize, _packetSize);
}
Modified: trunk/freenet/src/freenet/node/FNPPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/FNPPacketMangler.java 2005-11-12
22:23:39 UTC (rev 7542)
+++ trunk/freenet/src/freenet/node/FNPPacketMangler.java 2005-11-15
21:10:36 UTC (rev 7543)
@@ -338,7 +338,7 @@
output[1] = (byte) negType;
output[2] = (byte) phase;
System.arraycopy(data, 0, output, 3, data.length);
- Logger.minor(this, "Sending auth packet to "+replyTo+" -
version="+version+" negType="+negType+" phase="+phase+"
data.length="+data.length+" for "+pn);
+ Logger.minor(this, "Sending auth packet ("+version+","+negType+" to
"+replyTo+" - version="+version+" negType="+negType+" phase="+phase+"
data.length="+data.length+" for "+pn);
sendAuthPacket(output, pn, replyTo);
}
@@ -369,8 +369,8 @@
pcfb.blockEncipher(output, 0, output.length);
System.arraycopy(output, 0, data, hash.length+iv.length+2,
output.length);
usm.sendPacket(data, replyTo);
- Logger.minor(this, "Sending auth packet to "+replyTo+" - size
"+data.length+" data length: "+output.length);
- }
+ Logger.minor(this, "Sending auth packet (long) to "+replyTo+" - size
"+data.length+" data length: "+output.length);
+ }
/**
* @param i
@@ -692,18 +692,19 @@
int ackCount = decrypted[ptr++] & 0xff;
Logger.minor(this, "Acks: "+ackCount);
-
+
+ int[] acks = new int[ackCount];
for(int i=0;i<ackCount;i++) {
int offset = decrypted[ptr++] & 0xff;
if(ptr > decrypted.length) {
Logger.error(this, "Packet not long enough at byte "+ptr+" on
"+tracker);
return;
}
- int realSeqNo = referenceSeqNumber - offset;
- Logger.minor(this, "ACK: "+realSeqNo);
- tracker.acknowledgedPacket(realSeqNo);
+ acks[i] = referenceSeqNumber - offset;
}
+ tracker.acknowledgedPackets(acks);
+
int retransmitCount = decrypted[ptr++] & 0xff;
Logger.minor(this, "Retransmit requests: "+retransmitCount);
@@ -1062,6 +1063,10 @@
// We do not support forgotten packets at present
+ int[] acks = tracker.grabAcks();
+ int[] resendRequests = tracker.grabResendRequests();
+ int[] ackRequests = tracker.grabAckRequests();
+
// Allocate a sequence number
int seqNumber;
if(packetNumber > 0)
@@ -1076,10 +1081,6 @@
Logger.minor(this, "Sequence number (sending): "+seqNumber+"
("+packetNumber+") to "+tracker.pn.getPeer());
- int[] acks = tracker.grabAcks();
- int[] resendRequests = tracker.grabResendRequests();
- int[] ackRequests = tracker.grabAckRequests();
-
int packetLength = acks.length + resendRequests.length +
ackRequests.length + 4 + 1 + length + 4 + 4 + RANDOM_BYTES_LENGTH;
if(packetNumber == -1) packetLength += 4;
else packetLength++;
@@ -1096,6 +1097,7 @@
plaintext[ptr++] = (byte)seqNumber;
node.random.nextBytes(randomJunk);
+ Logger.minor(this, "Got random junk");
System.arraycopy(randomJunk, 0, plaintext, ptr, RANDOM_BYTES_LENGTH);
ptr += RANDOM_BYTES_LENGTH;
Modified: trunk/freenet/src/freenet/node/InsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/InsertHandler.java 2005-11-12 22:23:39 UTC
(rev 7542)
+++ trunk/freenet/src/freenet/node/InsertHandler.java 2005-11-15 21:10:36 UTC
(rev 7543)
@@ -6,6 +6,7 @@
import freenet.io.comm.MessageFilter;
import freenet.io.comm.NotConnectedException;
import freenet.io.comm.RetrievalException;
+import freenet.io.xfer.AbortedException;
import freenet.io.xfer.BlockReceiver;
import freenet.io.xfer.PartiallyReceivedBlock;
import freenet.keys.CHKBlock;
@@ -195,14 +196,17 @@
private void finish() {
Message toSend = null;
synchronized(this) { // REDFLAG do not use synch(this) for any other
purpose!
- if(!canCommit) return;
- if(!prb.allReceived()) return;
try {
+ if(!canCommit) return;
+ if(!prb.allReceived()) return;
CHKBlock block = new CHKBlock(prb.getBlock(), headers, key);
node.store(block);
} catch (CHKVerifyException e) {
Logger.error(this, "Verify failed in InsertHandler: "+e+" -
headers: "+HexUtil.bytesToHex(headers), e);
toSend = DMT.createFNPDataInsertRejected(uid,
DMT.DATA_INSERT_REJECTED_VERIFY_FAILED);
+ } catch (AbortedException e) {
+ Logger.error(this, "Receive failed: "+e);
+ // Receiver thread will handle below
}
}
if(toSend != null) {
Modified: trunk/freenet/src/freenet/node/KeyTracker.java
===================================================================
--- trunk/freenet/src/freenet/node/KeyTracker.java 2005-11-12 22:23:39 UTC
(rev 7542)
+++ trunk/freenet/src/freenet/node/KeyTracker.java 2005-11-15 21:10:36 UTC
(rev 7543)
@@ -13,6 +13,7 @@
import freenet.support.LimitedRangeIntByteArrayMapElement;
import freenet.support.Logger;
import freenet.support.UpdatableSortedLinkedListItem;
+import freenet.support.UpdatableSortedLinkedListKilledException;
import freenet.support.UpdatableSortedLinkedListWithForeignIndex;
import freenet.support.WouldBlockException;
import freenet.support.DoublyLinkedList.Item;
@@ -208,7 +209,7 @@
long activeTime;
final Integer packetNumberAsInteger;
- void sent() {
+ void sent() throws UpdatableSortedLinkedListKilledException {
long now = System.currentTimeMillis();
activeTime = now + 500;
urgentTime = activeTime + 200;
@@ -285,7 +286,7 @@
super(packetNumber);
}
- void sent() {
+ void sent() throws UpdatableSortedLinkedListKilledException {
synchronized(resendRequestQueue) {
super.sent();
resendRequestQueue.update(this);
@@ -307,7 +308,7 @@
}
}
- void sent() {
+ void sent() throws UpdatableSortedLinkedListKilledException {
synchronized(ackRequestQueue) {
super.sent();
ackRequestQueue.update(this);
@@ -319,7 +320,8 @@
* Called when we receive a packet.
* @param seqNumber The packet's serial number.
*/
- public synchronized void receivedPacket(int seqNumber) {
+ public void receivedPacket(int seqNumber) {
+ Logger.minor(this, "Received packet "+seqNumber);
try {
pn.receivedPacket();
} catch (NotConnectedException e) {
@@ -327,39 +329,49 @@
return;
}
if(seqNumber == -1) return;
+ // FIXME delete this log statement
+ Logger.minor(this, "Still received packet: "+seqNumber);
+ // Received packet
receivedPacketNumber(seqNumber);
- if(packetNumbersReceived.contains(seqNumber)) {
- // They resent it
- // Lets re-ack it.
- queueAck(seqNumber);
- } else {
- receivedPacketNumber(seqNumber);
- queueAck(seqNumber);
- }
+ // Ack it even if it is a resend
+ queueAck(seqNumber);
}
- protected synchronized void receivedPacketNumber(int seqNumber) {
+ protected void receivedPacketNumber(int seqNumber) {
+ Logger.minor(this, "Handling received packet number "+seqNumber);
queueResendRequests(seqNumber);
packetNumbersReceived.got(seqNumber);
- removeResendRequest(seqNumber);
- highestSeenIncomingSerialNumber =
Math.max(highestSeenIncomingSerialNumber, seqNumber);
+ try {
+ removeResendRequest(seqNumber);
+ } catch (UpdatableSortedLinkedListKilledException e) {
+ // Ignore, not our problem
+ }
+ synchronized(this) {
+ highestSeenIncomingSerialNumber =
Math.max(highestSeenIncomingSerialNumber, seqNumber);
+ }
+ Logger.minor(this, "Handled received packet number "+seqNumber);
}
/**
* Remove a resend request from the queue.
* @param seqNumber
+ * @throws UpdatableSortedLinkedListKilledException
*/
- private void removeResendRequest(int seqNumber) {
- resendRequestQueue.removeByKey(new Integer(seqNumber));
+ private void removeResendRequest(int seqNumber) throws
UpdatableSortedLinkedListKilledException {
+ resendRequestQueue.removeByKey(new Integer(seqNumber));
}
/**
* Add some resend requests if necessary.
* @param seqNumber The number of the packet we just received.
*/
- private synchronized void queueResendRequests(int seqNumber) {
- int max = packetNumbersReceived.highest();
+ private void queueResendRequests(int seqNumber) {
+ int max;
+ synchronized(this) {
+ max = packetNumbersReceived.highest();
+ }
if(seqNumber > max) {
+ try {
if(max != -1 && seqNumber - max > 1) {
Logger.minor(this, "Queueing resends from "+max+" to
"+seqNumber);
// Missed some packets out
@@ -367,6 +379,9 @@
queueResendRequest(i);
}
}
+ } catch (UpdatableSortedLinkedListKilledException e) {
+ // Ignore (we are decoding packet, not sending one)
+ }
}
}
@@ -374,23 +389,27 @@
* Queue a resend request
* @param packetNumber the packet serial number to queue a
* resend request for
+ * @throws UpdatableSortedLinkedListKilledException
*/
- private void queueResendRequest(int packetNumber) {
- if(queuedResendRequest(packetNumber)) {
- Logger.minor(this, "Not queueing resend request for
"+packetNumber+" - already queued");
- return;
- }
- Logger.minor(this, "Queueing resend request for "+packetNumber);
- QueuedResendRequest qrr = new QueuedResendRequest(packetNumber);
- resendRequestQueue.add(qrr);
+ private void queueResendRequest(int packetNumber) throws
UpdatableSortedLinkedListKilledException {
+ synchronized(resendRequestQueue) {
+ if(queuedResendRequest(packetNumber)) {
+ Logger.minor(this, "Not queueing resend request for
"+packetNumber+" - already queued");
+ return;
+ }
+ Logger.minor(this, "Queueing resend request for "+packetNumber);
+ QueuedResendRequest qrr = new QueuedResendRequest(packetNumber);
+ resendRequestQueue.add(qrr);
+ }
}
/**
* Queue an ack request
* @param packetNumber the packet serial number to queue a
* resend request for
+ * @throws UpdatableSortedLinkedListKilledException
*/
- private void queueAckRequest(int packetNumber) {
+ private void queueAckRequest(int packetNumber) throws
UpdatableSortedLinkedListKilledException {
synchronized(ackRequestQueue) {
if(queuedAckRequest(packetNumber)) {
Logger.minor(this, "Not queueing ack request for
"+packetNumber+" - already queued");
@@ -417,26 +436,63 @@
}
/**
+ * Called when we have received several packet acknowledgements.
+ */
+ public void acknowledgedPackets(int[] seqNos) {
+ AsyncMessageCallback[][] callbacks = new
AsyncMessageCallback[seqNos.length][];
+ for(int i=0;i<seqNos.length;i++) {
+ int realSeqNo = seqNos[i];
+ Logger.minor(this, "Acknowledged packet (synced): "+realSeqNo);
+ try {
+ removeAckRequest(realSeqNo);
+ } catch (UpdatableSortedLinkedListKilledException e) {
+ // Ignore, we are processing an incoming packet
+ }
+ Logger.minor(this, "Removed ack request");
+ callbacks[i] = sentPacketsContents.getCallbacks(realSeqNo);
+ sentPacketsContents.remove(realSeqNo);
+ }
+ int cbCount = 0;
+ for(int i=0;i<callbacks.length;i++) {
+ AsyncMessageCallback[] cbs = callbacks[i];
+ if(cbs != null) {
+ for(int j=0;j<cbs.length;j++) {
+ cbs[j].acknowledged();
+ cbCount++;
+ }
+ }
+ }
+ if(cbCount > 0)
+ Logger.minor(this, "Executed "+cbCount+" callbacks");
+ }
+
+ /**
* Called when we have received a packet acknowledgement.
* @param realSeqNo
*/
public void acknowledgedPacket(int realSeqNo) {
AsyncMessageCallback[] callbacks;
- synchronized(this) {
- removeAckRequest(realSeqNo);
- callbacks = sentPacketsContents.getCallbacks(realSeqNo);
- sentPacketsContents.remove(realSeqNo);
- }
+ Logger.minor(this, "Acknowledged packet (synced): "+realSeqNo);
+ try {
+ removeAckRequest(realSeqNo);
+ } catch (UpdatableSortedLinkedListKilledException e) {
+ // Ignore, we are processing an incoming packet
+ }
+ Logger.minor(this, "Removed ack request");
+ callbacks = sentPacketsContents.getCallbacks(realSeqNo);
+ sentPacketsContents.remove(realSeqNo);
if(callbacks != null) {
for(int i=0;i<callbacks.length;i++)
callbacks[i].acknowledged();
+ Logger.minor(this, "Executed "+callbacks.length+" callbacks");
}
}
/**
* Remove an ack request from the queue by packet number.
+ * @throws UpdatableSortedLinkedListKilledException
*/
- private void removeAckRequest(int seqNo) {
+ private void removeAckRequest(int seqNo) throws
UpdatableSortedLinkedListKilledException {
ackRequestQueue.removeByKey(new Integer(seqNo));
}
@@ -453,13 +509,15 @@
}
pn.node.ps.queuedResendPacket();
} else {
- String msg = "Asking me to resend packet "+seqNumber+
- " which we haven't sent yet or which they have already
acked (next="+nextPacketNumber+")";
- // Can have a race condition
- if(seqNumber < nextPacketNumber && seqNumber > nextPacketNumber-64)
- Logger.minor(this, msg);
- else
- Logger.error(this, msg);
+ synchronized(this) {
+ String msg = "Asking me to resend packet "+seqNumber+
+ " which we haven't sent yet or which they have
already acked (next="+nextPacketNumber+")";
+ // Can have a race condition
+ if(seqNumber < nextPacketNumber && seqNumber >
nextPacketNumber-64)
+ Logger.minor(this, msg);
+ else
+ Logger.error(this, msg);
+ }
}
}
@@ -478,7 +536,11 @@
queueAck(packetNumber);
} else {
// We have not received it, so get them to resend it
- queueResendRequest(packetNumber);
+ try {
+ queueResendRequest(packetNumber);
+ } catch
(UpdatableSortedLinkedListKilledException e) {
+ // Ignore, we are decoding, not sending.
+ }
synchronized(this) {
highestSeenIncomingSerialNumber =
Math.max(highestSeenIncomingSerialNumber, packetNumber);
}
@@ -514,7 +576,11 @@
} else {
Logger.error(this, "Destination forgot packet: "+seqNumber);
}
- removeResendRequest(seqNumber);
+ try {
+ removeResendRequest(seqNumber);
+ } catch (UpdatableSortedLinkedListKilledException e) {
+ // Ignore
+ }
}
/**
@@ -584,12 +650,14 @@
/**
* Grab all the currently queued resend requests.
* @return An array of the packet numbers of all the packets we want to be
resent.
+ * @throws NotConnectedException If the peer is no longer connected.
*/
- public int[] grabResendRequests() {
+ public int[] grabResendRequests() throws NotConnectedException {
UpdatableSortedLinkedListItem[] items;
int[] packetNumbers;
int realLength;
long now = System.currentTimeMillis();
+ try {
synchronized(resendRequestQueue) {
items = resendRequestQueue.toArray();
int length = items.length;
@@ -597,6 +665,11 @@
realLength = 0;
for(int i=0;i<length;i++) {
QueuedResendRequest qrr = (QueuedResendRequest)items[i];
+ if(packetNumbersReceived.contains(qrr.packetNumber)) {
+ Logger.minor(this, "Have already seen
"+qrr.packetNumber+", removing from resend list");
+ resendRequestQueue.remove(qrr);
+ continue;
+ }
if(qrr.activeTime <= now) {
packetNumbers[realLength++] = qrr.packetNumber;
Logger.minor(this, "Grabbing resend request:
"+qrr.packetNumber+" from "+this);
@@ -606,40 +679,45 @@
}
}
}
+ } catch (UpdatableSortedLinkedListKilledException e) {
+ throw new NotConnectedException();
+ }
int[] trimmedPacketNumbers = new int[realLength];
System.arraycopy(packetNumbers, 0, trimmedPacketNumbers, 0,
realLength);
return trimmedPacketNumbers;
}
- public int[] grabAckRequests() {
+ public int[] grabAckRequests() throws NotConnectedException {
UpdatableSortedLinkedListItem[] items;
int[] packetNumbers;
int realLength;
long now = System.currentTimeMillis();
- synchronized(this) {
- synchronized(ackRequestQueue) {
- items = ackRequestQueue.toArray();
- int length = items.length;
- packetNumbers = new int[length];
- realLength = 0;
- for(int i=0;i<length;i++) {
- QueuedAckRequest qrr = (QueuedAckRequest)items[i];
- int packetNumber = qrr.packetNumber;
- if(qrr.activeTime <= now) {
- if(sentPacketsContents.get(packetNumber) == null) {
- Logger.minor(this, "Asking to ack packet which has
already been acked: "+packetNumber+" on "+this+".grabAckRequests");
- ackRequestQueue.remove(qrr);
- continue;
- }
- packetNumbers[realLength++] = packetNumber;
- Logger.minor(this, "Grabbing ack request
"+packetNumber+" from "+this);
- qrr.sent();
- } else {
- Logger.minor(this, "Ignoring ack request
"+packetNumber+" - will become active in "+(qrr.activeTime-now)+" ms on "+this);
+ try {
+ synchronized(ackRequestQueue) {
+ items = ackRequestQueue.toArray();
+ int length = items.length;
+ packetNumbers = new int[length];
+ realLength = 0;
+ for(int i=0;i<length;i++) {
+ QueuedAckRequest qr = (QueuedAckRequest)items[i];
+ int packetNumber = qr.packetNumber;
+ if(qr.activeTime <= now) {
+ if(sentPacketsContents.get(packetNumber) == null) {
+ Logger.minor(this, "Asking to ack packet which has
already been acked: "+packetNumber+" on "+this+".grabAckRequests");
+ ackRequestQueue.remove(qr);
+ continue;
}
+ packetNumbers[realLength++] = packetNumber;
+ Logger.minor(this, "Grabbing ack request "+packetNumber+"
from "+this);
+ qr.sent();
+ } else {
+ Logger.minor(this, "Ignoring ack request "+packetNumber+"
- will become active in "+(qr.activeTime-now)+" ms on "+this);
}
}
}
+ } catch (UpdatableSortedLinkedListKilledException e) {
+ throw new NotConnectedException();
+ }
int[] trimmedPacketNumbers = new int[realLength];
System.arraycopy(packetNumbers, 0, trimmedPacketNumbers, 0,
realLength);
return trimmedPacketNumbers;
@@ -685,8 +763,9 @@
* Report a packet has been sent
* @param data The data we have just sent (payload only, decrypted).
* @param seqNumber The packet number.
+ * @throws NotConnectedException
*/
- public void sentPacket(byte[] data, int seqNumber, AsyncMessageCallback[]
callbacks) {
+ public void sentPacket(byte[] data, int seqNumber, AsyncMessageCallback[]
callbacks) throws NotConnectedException {
if(callbacks != null) {
for(int i=0;i<callbacks.length;i++) {
if(callbacks[i] == null)
@@ -694,7 +773,11 @@
}
}
sentPacketsContents.add(seqNumber, data, callbacks);
- queueAckRequest(seqNumber);
+ try {
+ queueAckRequest(seqNumber);
+ } catch (UpdatableSortedLinkedListKilledException e) {
+ throw new NotConnectedException();
+ }
}
public void completelyDeprecated(KeyTracker newTracker) {
@@ -741,8 +824,8 @@
synchronized(ackQueue) {
ackQueue.clear();
}
- resendRequestQueue.clear();
- ackRequestQueue.clear();
+ resendRequestQueue.kill();
+ ackRequestQueue.kill();
synchronized(packetsToResend) {
packetsToResend.clear();
}
Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java 2005-11-12 22:23:39 UTC (rev
7542)
+++ trunk/freenet/src/freenet/node/Node.java 2005-11-15 21:10:36 UTC (rev
7543)
@@ -37,6 +37,7 @@
import freenet.io.comm.Peer;
import freenet.io.comm.PeerParseException;
import freenet.io.comm.UdpSocketManager;
+import freenet.io.xfer.AbortedException;
import freenet.io.xfer.PartiallyReceivedBlock;
import freenet.keys.CHKBlock;
import freenet.keys.CHKVerifyException;
@@ -377,7 +378,10 @@
} catch (CHKVerifyException e) {
Logger.error(this, "Does not verify: "+e, e);
throw new
LowLevelGetException(LowLevelGetException.DECODE_FAILED);
- }
+ } catch (AbortedException e) {
+ Logger.error(this, "Impossible: "+e, e);
+ throw new
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR);
+ }
} else {
switch(rs.getStatus()) {
case RequestSender.NOT_FINISHED:
Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java 2005-11-12 22:23:39 UTC
(rev 7542)
+++ trunk/freenet/src/freenet/node/PeerNode.java 2005-11-15 21:10:36 UTC
(rev 7543)
@@ -883,28 +883,34 @@
* Requeue ResendPacketItem[]s if they are not sent.
* @param resendItems
*/
- public synchronized void requeueResendItems(ResendPacketItem[]
resendItems) {
+ public void requeueResendItems(ResendPacketItem[] resendItems) {
+ KeyTracker cur, prev, unv;
+ synchronized(this) {
+ cur = currentTracker;
+ prev = previousTracker;
+ unv = unverifiedTracker;
+ }
for(int i=0;i<resendItems.length;i++) {
ResendPacketItem item = resendItems[i];
if(item.pn != this)
throw new IllegalArgumentException("item.pn != this!");
- KeyTracker kt = currentTracker;
+ KeyTracker kt = cur;
if(kt != null && item.kt == kt) {
kt.resendPacket(item.packetNumber);
continue;
}
- kt = previousTracker;
+ kt = prev;
if(kt != null && item.kt == kt) {
kt.resendPacket(item.packetNumber);
continue;
}
- kt = unverifiedTracker;
+ kt = unv;
if(kt != null && item.kt == kt) {
kt.resendPacket(item.packetNumber);
continue;
}
// Doesn't match any of these, need to resend the data
- kt = currentTracker == null ? unverifiedTracker : currentTracker;
+ kt = cur == null ? unv : cur;
if(kt == null) {
Logger.error(this, "No tracker to resend packet
"+item.packetNumber+" on");
continue;
Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2005-11-12 22:23:39 UTC (rev
7542)
+++ trunk/freenet/src/freenet/node/Version.java 2005-11-15 21:10:36 UTC (rev
7543)
@@ -20,10 +20,10 @@
public static final String protocolVersion = "1.0";
/** The build number of the current revision */
- public static final int buildNumber = 179;
+ public static final int buildNumber = 180;
/** Oldest build of Fred we will talk to */
- public static final int lastGoodBuild = 178;
+ public static final int lastGoodBuild = 180;
/** The highest reported build of fred */
public static int highestSeenBuild = buildNumber;
Modified: trunk/freenet/src/freenet/support/UpdatableSortedLinkedList.java
===================================================================
--- trunk/freenet/src/freenet/support/UpdatableSortedLinkedList.java
2005-11-12 22:23:39 UTC (rev 7542)
+++ trunk/freenet/src/freenet/support/UpdatableSortedLinkedList.java
2005-11-15 21:10:36 UTC (rev 7543)
@@ -11,13 +11,16 @@
*/
public class UpdatableSortedLinkedList {
+ private boolean killed = false;
+
public UpdatableSortedLinkedList() {
list = new DoublyLinkedListImpl();
}
private final DoublyLinkedList list;
- public synchronized void add(UpdatableSortedLinkedListItem i) {
+ public synchronized void add(UpdatableSortedLinkedListItem i) throws
UpdatableSortedLinkedListKilledException {
+ if(killed) throw new UpdatableSortedLinkedListKilledException();
Logger.minor(this, "Add("+i+") on "+this);
if(list.isEmpty()) {
list.push(i);
@@ -58,34 +61,32 @@
int statedLength = list.size();
int realLength = 0;
sb.setLength(0);
- int x = 0;
for(Enumeration e = list.elements();e.hasMoreElements();) {
UpdatableSortedLinkedListItem i =
(UpdatableSortedLinkedListItem) e.nextElement();
- sb.append(x);
- sb.append("=");
- sb.append(i);
- sb.append('\n');
+ // Sanity check for infinite looping
+ if(realLength > 100*1000)
+ Logger.normal(this, "["+realLength+"] = "+i+"
(prev="+i.getPrev()+")");
realLength++;
}
if(statedLength != realLength) {
String err = "statedLength = "+statedLength+" but realLength =
"+realLength+" on "+this;
Logger.error(this, "Illegal ERROR: "+err, new
Exception("error"));
- Logger.error(this, "Details:\n"+sb.toString());
throw new IllegalStateException(err);
} else {
Logger.minor(this, "checkList() successful: realLength =
statedLength = "+realLength+" on "+this);
- Logger.minor(this, "Details:\n"+sb.toString());
}
}
- public synchronized void remove(UpdatableSortedLinkedListItem i) {
+ public synchronized void remove(UpdatableSortedLinkedListItem i) throws
UpdatableSortedLinkedListKilledException {
+ if(killed) throw new UpdatableSortedLinkedListKilledException();
Logger.minor(this, "Remove("+i+") on "+this);
checkList();
list.remove(i);
checkList();
}
- public synchronized void update(UpdatableSortedLinkedListItem i) {
+ public synchronized void update(UpdatableSortedLinkedListItem i) throws
UpdatableSortedLinkedListKilledException {
+ if(killed) throw new UpdatableSortedLinkedListKilledException();
Logger.minor(this, "Update("+i+") on "+this);
checkList();
if(i.compareTo(list.tail()) > 0) {
@@ -160,8 +161,10 @@
/**
* Dump the current status of the list to the log.
+ * @throws UpdatableSortedLinkedListKilledException
*/
- private synchronized void dump() {
+ private synchronized void dump() throws
UpdatableSortedLinkedListKilledException {
+ if(killed) throw new UpdatableSortedLinkedListKilledException();
for(Enumeration e=list.elements();e.hasMoreElements();) {
UpdatableSortedLinkedListItem item =
(UpdatableSortedLinkedListItem) e.nextElement();
Logger.minor(this, item.toString());
@@ -177,8 +180,10 @@
/**
* @return an array, in order, of the elements in the list
+ * @throws UpdatableSortedLinkedListKilledException
*/
- public synchronized UpdatableSortedLinkedListItem[] toArray() {
+ public synchronized UpdatableSortedLinkedListItem[] toArray() throws
UpdatableSortedLinkedListKilledException {
+ if(killed) throw new UpdatableSortedLinkedListKilledException();
int size = list.size();
if(size < 0)
throw new IllegalStateException("list.size() = "+size+" for
"+this);
@@ -210,4 +215,9 @@
public synchronized void clear() {
list.clear();
}
+
+ public synchronized void kill() {
+ clear();
+ killed = true;
+ }
}
Added:
trunk/freenet/src/freenet/support/UpdatableSortedLinkedListKilledException.java
===================================================================
---
trunk/freenet/src/freenet/support/UpdatableSortedLinkedListKilledException.java
2005-11-12 22:23:39 UTC (rev 7542)
+++
trunk/freenet/src/freenet/support/UpdatableSortedLinkedListKilledException.java
2005-11-15 21:10:36 UTC (rev 7543)
@@ -0,0 +1,5 @@
+package freenet.support;
+
+public class UpdatableSortedLinkedListKilledException extends Exception {
+
+}
Modified:
trunk/freenet/src/freenet/support/UpdatableSortedLinkedListWithForeignIndex.java
===================================================================
---
trunk/freenet/src/freenet/support/UpdatableSortedLinkedListWithForeignIndex.java
2005-11-12 22:23:39 UTC (rev 7542)
+++
trunk/freenet/src/freenet/support/UpdatableSortedLinkedListWithForeignIndex.java
2005-11-15 21:10:36 UTC (rev 7543)
@@ -18,7 +18,7 @@
map = new HashMap();
}
- public synchronized void add(UpdatableSortedLinkedListItem item) {
+ public synchronized void add(UpdatableSortedLinkedListItem item) throws
UpdatableSortedLinkedListKilledException {
if(!(item instanceof IndexableUpdatableSortedLinkedListItem)) {
throw new IllegalArgumentException();
}
@@ -33,10 +33,9 @@
checkList();
}
- public synchronized void remove(UpdatableSortedLinkedListItem item) {
+ public synchronized void remove(UpdatableSortedLinkedListItem item) throws
UpdatableSortedLinkedListKilledException {
super.remove(item);
map.remove(((IndexableUpdatableSortedLinkedListItem)item).indexValue());
- checkList();
}
public synchronized boolean containsKey(Object o) {
@@ -45,8 +44,9 @@
/**
* Remove an element from the list by its key.
+ * @throws UpdatableSortedLinkedListKilledException
*/
- public synchronized void removeByKey(Object key) {
+ public synchronized void removeByKey(Object key) throws
UpdatableSortedLinkedListKilledException {
IndexableUpdatableSortedLinkedListItem item =
(IndexableUpdatableSortedLinkedListItem) map.get(key);
if(item != null) remove(item);