Update of /cvsroot/freenet/freenet/src/freenet
In directory sc8-pr-cvs1:/tmp/cvs-serv15810/src/freenet
Modified Files:
ConnectionHandler.java OpenConnectionManager.java
PeerHandler.java PeerPacketMessage.java Version.java
Added Files:
RemovingPeerHandlerException.java
Log Message:
6249:
Iakin's fix for Request.java - nasty bug, may have caused requests from recent nodes
to fail.
Try to fix lock contention in RandomAccessFilePool. Code review would be appreciated.
Catch Throwables when closing files.
Reduce outLimitCutoff to 80%. Thus the node will reject all queries if it is using
more than 80% of outbound bandwidth. The previous 90% figure conflicted with the hard
bandwidth limit of 120%, which (because it is deliberately conservative), was
preventing it from going over 90%.
Implement removal of PeerHandlers from OCM, when either
a) No connections and no contact details, and not in routing table, regardless of
whether we have queued messages or
b) No connections, no messages.
- Some tricky concurrency issues, hence the new RemovingPeerHandlerException
Track number and last event time for outbound connection attempts, successes, and
failures, in PeerHandler.
Add PeerHandler.probablyNotConnectable()
Don't queue messages if we are unlikely to be able to open a connection and we have no
current connections
Diagnostics - messageSendTime* especially. Added messageSendTime{Request,NonRequest},
fixed messageSendTimeNoQR.
Make sending a QR because of a loop in RequestDone asynchronous.
Don't add PeerHandlers with null IDs in OCM (probably not used).
Lots of logging, a few toString()'s, indenting
- remove some spurious logging, for example, Accepted before SendFinished is okay
unless it's a lot before it, or the SendFinished is unsuccessful
--- NEW FILE: RemovingPeerHandlerException.java ---
package freenet;
public class RemovingPeerHandlerException extends Exception {
}
Index: ConnectionHandler.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/ConnectionHandler.java,v
retrieving revision 1.183
retrieving revision 1.184
diff -u -w -r1.183 -r1.184
--- ConnectionHandler.java 14 Oct 2003 23:59:01 -0000 1.183
+++ ConnectionHandler.java 15 Oct 2003 21:15:48 -0000 1.184
@@ -115,6 +115,8 @@
private TrailerWriteCallback twcb;
ReceiveInputStream currentInputStream = null;
private PeerHandler peerHandler = null;
+ // NOT final - see end of registerOCM
+
private boolean sendingCloseMessage = false;
private SocketChannel chan;
private Socket sock;
@@ -401,6 +403,7 @@
byte[] toSend;
if(receiveClosed.state() && receivingCount <= 0 &&
sendClosed.state() && trailerSendID == -1) {
+ logDEBUG("Terminating at beginning of registerOCM");
terminate();
return;
}
@@ -418,9 +421,19 @@
innerSendPacket(wsl.MESSAGE - 1 - sentPacket.countMessages(),
sentPacket);
if(receiveClosed.state() && receivingCount <= 0 &&
- sendClosed.state() && trailerSendID == -1)
+ sendClosed.state() && trailerSendID == -1) {
+ logDEBUG("terminating at end of registerOCM");
terminate();
- else peerHandler.registerConnectionHandler(this);
+ } else {
+ try {
+ peerHandler.registerConnectionHandler(this);
+ } catch (RemovingPeerHandlerException e) {
+ logDEBUG("Waiting for PeerHandler to finish removing:
"+
+ peerHandler+": "+e);
+ peerHandler.waitForRemovedFromOCM();
+ peerHandler = ocm.makePeerHandler(identity, otherRef);
+ }
+ }
}
public boolean shouldThrottle() {
@@ -437,7 +450,8 @@
return c.countAsThrottled();
}
- public void setPeerHandler(PeerHandler ph) {
+ public void setPeerHandler(PeerHandler ph)
+ throws RemovingPeerHandlerException {
if(peerHandler == null) {
peerHandler = ph;
ph.registerConnectionHandler(this);
Index: OpenConnectionManager.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/OpenConnectionManager.java,v
retrieving revision 1.124
retrieving revision 1.125
diff -u -w -r1.124 -r1.125
--- OpenConnectionManager.java 15 Oct 2003 09:57:37 -0000 1.124
+++ OpenConnectionManager.java 15 Oct 2003 21:15:48 -0000 1.125
@@ -158,14 +158,15 @@
}
}
- private void removePeerHandler(Peer p) {
+ public void removePeerHandler(Identity i) {
synchronized(peerHandlers) {
- peerHandlers.remove(p);
+ peerHandlers.remove(i);
}
}
public void addPeerHandler(Identity i, PeerHandler ph,
boolean allowedToFail) {
+ if(i == null) return; // FCP peerHandlers are not indexed
synchronized(peerHandlers) {
Object oph = peerHandlers.get(i);
if(oph != null) {
@@ -204,7 +205,17 @@
}
}
// ph != null now
+ while(true) {
+ try {
ch.setPeerHandler(ph); // will register on ph
+ } catch (RemovingPeerHandlerException e) {
+ Core.logger.log(this, "Caught "+e+" setting ph for
"+ch,
+ Logger.MINOR);
+ ph = makePeerHandler(i, ch.targetReference());
+ continue;
+ }
+ break;
+ }
Object syncOb = chs.getSync(i);
if(syncOb == null) {
// FIXME: adequately synchronized? I _think_ so because of
chs's internal synchronization...
Index: PeerHandler.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/PeerHandler.java,v
retrieving revision 1.20
retrieving revision 1.21
diff -u -w -r1.20 -r1.21
--- PeerHandler.java 15 Oct 2003 02:10:10 -0000 1.20
+++ PeerHandler.java 15 Oct 2003 21:15:48 -0000 1.21
@@ -28,11 +28,22 @@
final LinkedList connectionHandlers;
final Node node;
+ boolean removingFromOCM = false;
+ volatile boolean removedFromOCM = true;
+ Object removedFromOCMLock = new Object();
long lastMessageSentTime = -1;
Object rejectOldVersionLock = new Object();
final long initialRejectOldVersionTime = 250;
long rejectOldVersionTime = initialRejectOldVersionTime;
+ long lastRegisterTime = 0;
+ Object outboundDiagnosticsSync = new Object();
+ long lastOutboundAttemptTime = -1;
+ long lastOutboundSuccessTime = -1;
+ long lastOutboundFailureTime = -1;
+ int totalOutboundSuccesses = 0;
+ int totalOutboundAttempts = 0;
+ int totalOutboundFailures = 0;
boolean logDEBUG;
@@ -43,7 +54,9 @@
public static final int NORMAL = 1;
public String toString() {
- return super.toString() + " ("+id+","+ref+")";
+ return super.toString() + " ("+id+","+ref+"): outbound attempts="+
+ totalOutboundSuccesses+":"+totalOutboundFailures+"/"+
+ totalOutboundAttempts;
}
public long queuedBytes() {
@@ -70,7 +83,11 @@
* Register a ConnectionHandler to us. This should be called as soon as
* it has completed negotiations, not waiting for the Identify.
*/
- public void registerConnectionHandler(ConnectionHandler ch) {
+ public void registerConnectionHandler(ConnectionHandler ch)
+ throws RemovingPeerHandlerException {
+ if(removingFromOCM)
+ throw new RemovingPeerHandlerException();
+ lastRegisterTime = System.currentTimeMillis();
if(Core.logger.shouldLog(Logger.MINOR,this))
Core.logger.log(this, "Registering "+ch+" on
"+this,Logger.MINOR);
synchronized(connectionHandlers) {
@@ -83,16 +100,81 @@
* have lost the ability to send and receive messages on the CH.
*/
public void unregisterConnectionHandler(ConnectionHandler ch) {
- if(Core.logger.shouldLog(Logger.MINOR,this)) Core.logger.log(this,
"Unregistering "+ch+" on "+this, Logger.MINOR);
+ if(Core.logger.shouldLog(Logger.MINOR,this))
+ Core.logger.log(this, "Unregistering "+ch+" on "+this,
+ Logger.MINOR);
+ boolean notInRT = id == null || !(node.rt.references(id));
+ boolean notContactable = (ref == null || ref.noPhysical()) &&
+ notInRT;
+ // This is a stronger criterion than probablyNotConnectible
+ // FIXME: we should probably wait a while before removing, to make
sure?
+ boolean noConnections = false;
synchronized(connectionHandlers) {
connectionHandlers.remove(ch);
+ if(connectionHandlers.size() == 0) {
+ noConnections = true;
+ if(notContactable)
+ removingFromOCM = true;
+ }
+ }
+ if(removingFromOCM) {
+ removeFromOCM();
+ } else if(noConnections && notInRT) {
+ synchronized(messages) {
+ if(messages.isEmpty() &&
+ messagesWithTrailers.isEmpty())
+ removingFromOCM = true;
}
- if(connectionHandlers.size() == 0 &&
- ref == null &&
- (!messages.isEmpty()) || (!messagesWithTrailers.isEmpty())) {
+ }
+ if(removingFromOCM)
+ removeFromOCM();
+ }
+
+ public void removeFromOCM() {
+ removingFromOCM = true;
+ Core.logger.log(this, "Removing from OCM... "+this,
+ Logger.DEBUG);
+ SendFailedException sfe =
+ new SendFailedException(null, id, "Removing from OCM",
+ true);
+ synchronized(messages) {
+ if(!messages.isEmpty() || (!messagesWithTrailers.isEmpty()))
Core.logger.log(this, "Lost all connections for "+id+
- ", but "+messages.size()+"
messages and "+messagesWithTrailers.size()+" messages with trailer left",
- Logger.NORMAL);
+ ", but
"+messages.size()+" messages and "+
+
messagesWithTrailers.size()+" messages with "+
+ "trailer left when
removing from OCM",
+ Logger.MINOR);
+ while(!messages.isEmpty()) {
+ PeerPacketMessage m =
+ (PeerPacketMessage)(messages.removeFirst());
+ if(m == null) break;
+ m.notifyFailure(sfe);
+ }
+ while(!messagesWithTrailers.isEmpty()) {
+ PeerPacketMessage m =
+
(PeerPacketMessage)(messagesWithTrailers.removeFirst());
+ if(m == null) break;
+ m.notifyFailure(sfe);
+ }
+ }
+ if(id != null)
+ node.connections.removePeerHandler(id);
+ synchronized(removedFromOCMLock) {
+ removedFromOCM = true;
+ removedFromOCMLock.notifyAll();
+ }
+ Core.logger.log(this, "Removed from OCM: "+this,
+ Logger.DEBUG);
+ }
+
+ public void waitForRemovedFromOCM() {
+ while(!removedFromOCM) {
+ synchronized(removedFromOCMLock) {
+ if(removedFromOCM) return;
+ try {
+ removedFromOCMLock.wait(200);
+ } catch (InterruptedException e) {}
+ }
}
}
@@ -248,7 +330,8 @@
* @param n the Node.
* @param maxPacketSize the maximum length allowable for sending packets.
*/
- public PeerHandler(Identity id, NodeReference ref, Node n, int maxPacketSize) {
+ public PeerHandler(Identity id, NodeReference ref, Node n,
+ int maxPacketSize) {
this.id = id;
this.ref = ref;
this.node = n;
@@ -295,6 +378,14 @@
* 3. No Idle ConnectionHandlers
* Strategy: wait for one to ask us for a packet
*/
+ if(removingFromOCM) {
+ SendFailedException sfe =
+ new SendFailedException(null, id, "Removing from OCM",
true);
+ pm.notifyFailure(sfe);
+ if(logDEBUG)
+ Core.logger.log(this, "Dumped message "+pm+
+ ": removing from OCM
("+this+")", Logger.DEBUG);
+ }
logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this);
if(logDEBUG) Core.logger.log(this, "Sending "+pm+" on "+this,
Logger.DEBUG);
@@ -343,11 +434,20 @@
// Queue the message
if(logDEBUG) Core.logger.log(this, "Queueing "+pm+" on "+this,
Logger.DEBUG);
- if((id == null || ref == null) && handlersSendingPackets == 0) {
- // Can't open a connection
- pm.notifyFailure(null);
+ if(handlersSendingPackets == 0 &&
+ probablyNotConnectable()) {
+ // If it's FCP, or if it's not got a return address,
+ // and we can't send now or wait for a packet send to finish,
+ // then dump the message
+ SendFailedException sfe =
+ new SendFailedException(null,id,"No open connections "+
+ "and
can't contact node",true);
+ pm.notifyFailure(sfe);
if(logDEBUG)
- Core.logger.log(this, "Failed to send packet, no more
conns, no way to open connection: DISCARDING "+pm+" on "+this, id == null ?
Logger.NORMAL : Logger.MINOR); // id == null => FCP
+ Core.logger.log(this, "Failed to send packet, no more
conns, "+
+ "no way to open
connection: DISCARDING "+pm+
+ " on "+this, id ==
null ? Logger.NORMAL :
+ Logger.MINOR); // id
== null => FCP
return;
}
if(pm.priority == EXPENDABLE) {
@@ -376,6 +476,13 @@
messagesWithTrailers.size());
}
+ public boolean probablyNotConnectable() {
+ return id == null || ref == null || ref.noPhysical() ||
+ (totalOutboundAttempts > 1 &&
+ totalOutboundSuccesses == 0 &&
+ totalOutboundFailures > 1);
+ }
+
/**
* Send one message on one ConnectionHandler.
* @param ch the connection on which to send the message
@@ -516,8 +623,7 @@
if(maxTimeoutAt != -1 &&
System.currentTimeMillis() > maxTimeoutAt) {
Core.logger.log(this, "Took more than 5
minutes to send "+
- pm+"!!", ref
== null ? Logger.MINOR :
- Logger.ERROR);
+ pm+"!!",
Logger.NORMAL);
break;
}
try {
@@ -590,4 +696,26 @@
trailer = tw;
}
}
+
+ // Try to determine contactability
+ public void attemptingOutboundConnection() {
+ synchronized(outboundDiagnosticsSync) {
+ lastOutboundAttemptTime = System.currentTimeMillis();
+ totalOutboundAttempts++;
+ }
+ }
+
+ public void succeededOutboundConnection() {
+ synchronized(outboundDiagnosticsSync) {
+ lastOutboundSuccessTime = System.currentTimeMillis();
+ totalOutboundSuccesses++;
+ }
+ }
+
+ public void failedOutboundConnection() {
+ synchronized(outboundDiagnosticsSync) {
+ lastOutboundFailureTime = System.currentTimeMillis();
+ totalOutboundFailures++;
+ }
+ }
}
Index: PeerPacketMessage.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/PeerPacketMessage.java,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -w -r1.9 -r1.10
--- PeerPacketMessage.java 12 Oct 2003 15:06:25 -0000 1.9
+++ PeerPacketMessage.java 15 Oct 2003 21:15:48 -0000 1.10
@@ -111,13 +111,33 @@
return;
}
finished = true;
- long sendTime = System.currentTimeMillis() - startTime;
+ long sentTime = System.currentTimeMillis();
+ long sendTime = sentTime - startTime;
+ if(sendTime > 5 * 60 * 1000) {
+ long seconds = sendTime/1000;
+ long secondsSinceConn = (sentTime - ph.lastRegisterTime)/1000;
+ Core.logger.log(this, "Took "+seconds+" seconds to send "+
+ this+"(notifySuccess("+tw+
+ ")! (last connection registered "+
+ secondsSinceConn+" seconds ago on "+ph,
+ ph.probablyNotConnectable() ? Logger.MINOR :
+ Logger.NORMAL);
+ }
Core.diagnostics.occurrenceContinuous("messageSendTime", sendTime);
if(ph.ref != null)
Core.diagnostics.occurrenceContinuous("messageSendTimeContactable",
sendTime);
else
Core.diagnostics.occurrenceContinuous("messageSendTimeNonContactable",
+ sendTime);
+ if(msg instanceof freenet.message.Request)
+ Core.diagnostics.occurrenceContinuous("messageSendTimeRequest",
+ sendTime);
+ else
+ Core.diagnostics.occurrenceContinuous("messageSendTimeNonRequest",
+ sendTime);
+ if(!(msg instanceof freenet.message.QueryRejected))
+ Core.diagnostics.occurrenceContinuous("messageSendTimeNoQR",
sendTime);
if(cb == null) return;
try {
Index: Version.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/Version.java,v
retrieving revision 1.441
retrieving revision 1.442
diff -u -w -r1.441 -r1.442
--- Version.java 15 Oct 2003 02:10:10 -0000 1.441
+++ Version.java 15 Oct 2003 21:15:48 -0000 1.442
@@ -18,7 +18,7 @@
public static String protocolVersion = "1.46";
/** The build number of the current revision */
- public static final int buildNumber = 6248;
+ public static final int buildNumber = 6249;
// 6028: may 3; ARK retrieval fix
public static final int ignoreBuildsAfter = 6500;
_______________________________________________
cvs mailing list
[EMAIL PROTECTED]
http://dodo.freenetproject.org/cgi-bin/mailman/listinfo/cvs