Author: toad
Date: 2005-11-16 19:29:35 +0000 (Wed, 16 Nov 2005)
New Revision: 7546
Modified:
trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
trunk/freenet/src/freenet/client/InsertSegment.java
trunk/freenet/src/freenet/client/InserterContext.java
trunk/freenet/src/freenet/io/comm/UdpSocketManager.java
trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
trunk/freenet/src/freenet/node/FNPPacketMangler.java
trunk/freenet/src/freenet/node/Version.java
trunk/freenet/src/freenet/support/BucketTools.java
trunk/freenet/src/freenet/support/DoublyLinkedListImpl.java
Log:
Mandatory 183:
Don't allow several simultaneous FNPPacketMangler.process*()'s.
This eliminates all low level packet sequence issues.
Obviously the high level overload issues remain. Hopefully these can be dealt
with by load management, later.
There are some splitfile bugs to be tackled.
Modified: trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
===================================================================
--- trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
2005-11-16 15:23:44 UTC (rev 7545)
+++ trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
2005-11-16 19:29:35 UTC (rev 7546)
@@ -46,7 +46,6 @@
// going by memory usage only; 4kB per stripe
static final int MAX_SPLITFILE_BLOCKS_PER_SEGMENT = 1024;
static final int MAX_SPLITFILE_CHECK_BLOCKS_PER_SEGMENT = 1536;
- // ~ 70kB/sec encode, 16MB segments
static final int SPLITFILE_BLOCKS_PER_SEGMENT = 128;
static final int SPLITFILE_CHECK_BLOCKS_PER_SEGMENT = 192;
Modified: trunk/freenet/src/freenet/client/InsertSegment.java
===================================================================
--- trunk/freenet/src/freenet/client/InsertSegment.java 2005-11-16 15:23:44 UTC
(rev 7545)
+++ trunk/freenet/src/freenet/client/InsertSegment.java 2005-11-16 19:29:35 UTC
(rev 7546)
@@ -24,7 +24,10 @@
public InsertSegment(short splitfileAlgo, SplitfileBlock[]
origDataBlocks, int blockLength, BucketFactory bf, boolean getCHKOnly, int
segNo) {
this.origDataBlocks = origDataBlocks;
codec = FECCodec.getCodec(splitfileAlgo, origDataBlocks.length);
- checkBlocks = new SplitfileBlock[codec.countCheckBlocks()];
+ if(codec != null)
+ checkBlocks = new
SplitfileBlock[codec.countCheckBlocks()];
+ else
+ checkBlocks = new SplitfileBlock[0];
this.blockLength = blockLength;
this.bf = bf;
this.getCHKOnly = getCHKOnly;
Modified: trunk/freenet/src/freenet/client/InserterContext.java
===================================================================
--- trunk/freenet/src/freenet/client/InserterContext.java 2005-11-16
15:23:44 UTC (rev 7545)
+++ trunk/freenet/src/freenet/client/InserterContext.java 2005-11-16
19:29:35 UTC (rev 7546)
@@ -27,7 +27,8 @@
this.bf = bf;
this.random = random;
dontCompress = false;
- splitfileAlgorithm = Metadata.SPLITFILE_ONION_STANDARD;
+ //splitfileAlgorithm = Metadata.SPLITFILE_ONION_STANDARD;
+ splitfileAlgorithm = Metadata.SPLITFILE_NONREDUNDANT;
this.maxInsertBlockRetries = maxRetries;
this.maxSplitInsertThreads = maxThreads;
this.eventProducer = eventProducer;
Modified: trunk/freenet/src/freenet/io/comm/UdpSocketManager.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/UdpSocketManager.java 2005-11-16
15:23:44 UTC (rev 7545)
+++ trunk/freenet/src/freenet/io/comm/UdpSocketManager.java 2005-11-16
19:29:35 UTC (rev 7546)
@@ -22,6 +22,7 @@
import java.net.*;
import java.util.*;
+import freenet.node.PeerNode;
import freenet.support.Logger;
public class UdpSocketManager extends Thread {
@@ -47,6 +48,7 @@
}
public void start() {
+ setPriority(Thread.MAX_PRIORITY);
super.start();
Thread checker = new Thread(new USMChecker());
checker.setDaemon(true);
@@ -388,19 +390,20 @@
Logger.minor(this, "" + (System.currentTimeMillis() %
60000) + " " + _sock.getPort() + " -> " + destination
+ " : " + m);
}
- byte[] blockToSend = m.encodeToPacket(lowLevelFilter,
destination);
- if(lowLevelFilter != null) {
- try {
- lowLevelFilter.processOutgoing(blockToSend, 0,
blockToSend.length, destination);
- return;
- } catch (LowLevelFilterException t) {
- Logger.error(this, "Caught "+t+" sending "+m+"
to "+destination, t);
- destination.forceDisconnect();
- throw new NotConnectedException("Error
"+t.toString()+" forced disconnect");
- }
- } else {
- sendPacket(blockToSend, destination.getPeer());
- }
+// byte[] blockToSend = m.encodeToPacket(lowLevelFilter,
destination);
+// if(lowLevelFilter != null) {
+// try {
+// lowLevelFilter.processOutgoing(blockToSend, 0,
blockToSend.length, destination);
+// return;
+// } catch (LowLevelFilterException t) {
+// Logger.error(this, "Caught "+t+" sending "+m+"
to "+destination, t);
+// destination.forceDisconnect();
+// throw new NotConnectedException("Error
"+t.toString()+" forced disconnect");
+// }
+// } else {
+// sendPacket(blockToSend, destination.getPeer());
+// }
+ ((PeerNode)destination).sendAsync(m, null);
}
/**
Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2005-11-16
15:23:44 UTC (rev 7545)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2005-11-16
19:29:35 UTC (rev 7546)
@@ -28,6 +28,7 @@
import freenet.io.comm.NotConnectedException;
import freenet.io.comm.PeerContext;
import freenet.io.comm.UdpSocketManager;
+import freenet.node.PeerNode;
import freenet.support.BitArray;
import freenet.support.Logger;
@@ -84,23 +85,29 @@
_senderThread.wait(x);
}
}
- while (_unsent.size()
== 0) {
+ while (true) {
+
synchronized(_unsent) {
+
if(_unsent.size() != 0) break;
+ }
if(_sendComplete) return;
synchronized
(_senderThread) {
_senderThread.wait();
}
}
} catch (InterruptedException
e) { }
- int packetNo = ((Integer)
_unsent.removeFirst()).intValue();
+ int packetNo;
+ synchronized(_unsent) {
+ packetNo = ((Integer)
_unsent.removeFirst()).intValue();
+ }
_sentPackets.setBit(packetNo,
true);
try {
-
_usm.send(BlockTransmitter.this._destination, DMT.createPacketTransmit(_uid,
packetNo, _sentPackets, _prb.getPacket(packetNo)));
-
+
((PeerNode)_destination).sendAsync(DMT.createPacketTransmit(_uid, packetNo,
_sentPackets, _prb.getPacket(packetNo)), null);
// We accelerate the ping rate
during the transfer to keep a closer eye on round-trip-time
sentSinceLastPing++;
if (sentSinceLastPing >=
PING_EVERY) {
sentSinceLastPing = 0;
-
_usm.send(BlockTransmitter.this._destination, DMT.createPing());
+
//_usm.send(BlockTransmitter.this._destination, DMT.createPing());
+
((PeerNode)_destination).sendAsync(DMT.createPing(), null);
}
} catch (NotConnectedException
e) {
Logger.normal(this,
"Terminating send: "+e);
@@ -126,7 +133,7 @@
public void receiveAborted(int reason, String
description) {
try {
- _usm.send(_destination, DMT.createSendAborted(reason,
description));
+
((PeerNode)_destination).sendAsync(DMT.createSendAborted(reason, description),
null);
} catch (NotConnectedException e) {
Logger.minor(this, "Receive aborted and receiver is not
connected");
}
@@ -159,7 +166,9 @@
for (Iterator i = missing.iterator();
i.hasNext();) {
Integer packetNo = (Integer) i.next();
if
(_prb.isReceived(packetNo.intValue())) {
- _unsent.addFirst(packetNo);
+ synchronized(_unsent) {
+
_unsent.addFirst(packetNo);
+ }
_sentPackets.setBit(packetNo.intValue(), false);
synchronized(_senderThread) {
_senderThread.notify();
Modified: trunk/freenet/src/freenet/node/FNPPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/FNPPacketMangler.java 2005-11-16
15:23:44 UTC (rev 7545)
+++ trunk/freenet/src/freenet/node/FNPPacketMangler.java 2005-11-16
19:29:35 UTC (rev 7546)
@@ -1050,7 +1050,7 @@
* @throws KeyChangedException If the primary key changes while we are
trying to send this packet.
* @throws PacketSequenceException
*/
- public void processOutgoingPreformatted(byte[] buf, int offset, int
length, KeyTracker tracker, int packetNumber, AsyncMessageCallback[] callbacks)
throws KeyChangedException, NotConnectedException, PacketSequenceException {
+ public synchronized void processOutgoingPreformatted(byte[] buf, int
offset, int length, KeyTracker tracker, int packetNumber,
AsyncMessageCallback[] callbacks) throws KeyChangedException,
NotConnectedException, PacketSequenceException {
if(Logger.shouldLog(Logger.MINOR, this)) {
String log =
"processOutgoingPreformatted("+Fields.hashCode(buf)+",
"+offset+","+length+","+tracker+","+packetNumber+",";
if(callbacks == null) log += "null";
@@ -1093,12 +1093,15 @@
* sent after this packet was originally sent (it may be a resend) */
int realSeqNumber;
+ int otherSideSeqNumber;
+
synchronized(tracker) {
acks = tracker.grabAcks();
resendRequests = tracker.grabResendRequests();
ackRequests = tracker.grabAckRequests();
realSeqNumber = tracker.getLastOutgoingSeqNumber();
-
+ otherSideSeqNumber = tracker.highestReceivedIncomingSeqNumber();
+ Logger.minor(this, "otherSideSeqNumber: "+otherSideSeqNumber);
}
int packetLength = acks.length + resendRequests.length +
ackRequests.length + 4 + 1 + length + 4 + 4 + RANDOM_BYTES_LENGTH;
@@ -1132,9 +1135,6 @@
plaintext[ptr++] = (byte)(realSeqNumber - seqNumber);
}
- int otherSideSeqNumber = tracker.highestReceivedIncomingSeqNumber();
- Logger.minor(this, "otherSideSeqNumber: "+otherSideSeqNumber);
-
plaintext[ptr++] = (byte)(otherSideSeqNumber >> 24);
plaintext[ptr++] = (byte)(otherSideSeqNumber >> 16);
plaintext[ptr++] = (byte)(otherSideSeqNumber >> 8);
Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2005-11-16 15:23:44 UTC (rev
7545)
+++ trunk/freenet/src/freenet/node/Version.java 2005-11-16 19:29:35 UTC (rev
7546)
@@ -20,10 +20,10 @@
public static final String protocolVersion = "1.0";
/** The build number of the current revision */
- public static final int buildNumber = 182;
+ public static final int buildNumber = 183;
/** Oldest build of Fred we will talk to */
- public static final int lastGoodBuild = 181;
+ public static final int lastGoodBuild = 183;
/** The highest reported build of fred */
public static int highestSeenBuild = buildNumber;
Modified: trunk/freenet/src/freenet/support/BucketTools.java
===================================================================
--- trunk/freenet/src/freenet/support/BucketTools.java 2005-11-16 15:23:44 UTC
(rev 7545)
+++ trunk/freenet/src/freenet/support/BucketTools.java 2005-11-16 19:29:35 UTC
(rev 7546)
@@ -321,6 +321,20 @@
return data;
}
+ public static int toByteArray(Bucket bucket, byte[] output) throws
IOException {
+ long size = bucket.size();
+ if(size > output.length)
+ throw new IllegalArgumentException("Data does not fit
in provided buffer");
+ InputStream is = bucket.getInputStream();
+ int moved = 0;
+ while(true) {
+ if(moved == size) return moved;
+ int x = is.read(output, moved, (int)(size - moved));
+ if(x == -1) return moved;
+ moved += x;
+ }
+ }
+
public static Bucket makeImmutableBucket(BucketFactory bucketFactory,
byte[] data) throws IOException {
Bucket bucket = bucketFactory.makeBucket(data.length);
OutputStream os = bucket.getOutputStream();
Modified: trunk/freenet/src/freenet/support/DoublyLinkedListImpl.java
===================================================================
--- trunk/freenet/src/freenet/support/DoublyLinkedListImpl.java 2005-11-16
15:23:44 UTC (rev 7545)
+++ trunk/freenet/src/freenet/support/DoublyLinkedListImpl.java 2005-11-16
19:29:35 UTC (rev 7546)
@@ -195,6 +195,7 @@
j.setNext(newtailptr);
newtailptr.setPrev(j);
+ newtailptr.setParent(this);
i.setPrev(newheadptr);
newheadptr.setNext(i);
@@ -268,7 +269,7 @@
if (i.getParent() != this)
throw new PromiscuousItemException(i, i.getParent());
if (j.getParent() != null)
- throw new PromiscuousItemException(j, i.getParent());
+ throw new PromiscuousItemException(j, j.getParent());
if (j.getNext() != null || j.getPrev() != null)
throw new PromiscuousItemException(j);
DoublyLinkedList.Item prev = i.getPrev();