Update of /cvsroot/freenet/freenet/src/freenet
In directory sc8-pr-cvs1:/tmp/cvs-serv29322/src/freenet
Modified Files:
ConnectionHandler.java PeerHandler.java PeerPacketMessage.java
Version.java
Log Message:
6223: major bugfixes, from local testing. also merge the Yarrow patch. also lots of
logging.
Index: ConnectionHandler.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/ConnectionHandler.java,v
retrieving revision 1.167
retrieving revision 1.168
diff -u -r1.167 -r1.168
--- ConnectionHandler.java 7 Oct 2003 00:47:30 -0000 1.167
+++ ConnectionHandler.java 7 Oct 2003 20:03:39 -0000 1.168
@@ -962,35 +962,14 @@
Core.logger.log(this, "Partial notification: "+size+" bytes written
successfully on "+
this, Logger.DEBUG);
lastSizeDone = size;
- if(sentPacket == null)
- Core.logger.log(this, "sentPacket NULL in jobPartDone! for
"+this,
- Logger.ERROR);
- else
- sentPacket.jobDone(false, size, peer, null);
-// if(sentMessages.size() > 0) {
-// synchronized(sendLock) { // avoid deadlock
-// synchronized(sentMessages) {
-// if(logDEBUG)
-// Core.logger.log(this, "Notifying
"+sentMessages.size()+" messages for "+
-// this,
Logger.DEBUG);
-// // Go through queue
-// // Notify all that succeeded of success, and
ignore the rest
-// int pos = 0;
-// for(Iterator i =
sentMessages.iterator();i.hasNext();) {
-// MessageSend ms =
(MessageSend)(i.next());
-// int l = ms.toSend.length;
-// int opos = pos;
-// pos += l;
-// if(pos <= size) {
-// if(lastSizeDone < pos) // if
<=, we have already processed
-// ms.jobDone(pos - opos,
true);
-// } else break;
-// }
-// sentMessagesCount = 0;
-// sentMessages.clear();
-// }
-// }
-// }
+ if(trailerSendID == -1) {
+ if(sentPacket == null) {
+ if(!finalized.state())
+ Core.logger.log(this, "sentPacket NULL in
jobPartDone! for "+
+ this,
Logger.ERROR);
+ } else
+ sentPacket.jobDone(false, size, peer, null);
+ }
}
boolean checkingSending = false;
@@ -1108,28 +1087,33 @@
sentPacket = null;
if(logDEBUG)
logDEBUG("Set sentPacket to null");
+ // If we have a trailer, must deal with it
immediately
+ // If we unlock then deal with it, another
thread could
+ // start sending a regular packet...
+ if(packet.hasTrailer()) {
+ if(logDEBUG)
+ logDEBUG("packet has trailer");
+ int sendId;
+ synchronized(trailerSendIDCounterLock)
{
+ trailerSendIDCounter++;
+ if(trailerSendIDCounter < 0)
+ trailerSendIDCounter =
0;
+ sendId = trailerSendIDCounter;
+ }
+ synchronized(trailerSendLock) {
+ trailerSendID = sendId;
+ trailerSendLength =
packet.trailerLength();
+ trailerSentBytes = 0;
+ }
+ tw = new MyTrailerWriter(sendId);
+ if(logDEBUG)
+ logDEBUG("Creating "+tw);
+ }
}
}
if(packet != null) {
if(logDEBUG)
logDEBUG("Packet not null");
- if(packet.hasTrailer()) {
- if(logDEBUG)
- logDEBUG("packet has trailer");
- int sendId;
- synchronized(trailerSendIDCounterLock) {
- trailerSendIDCounter++;
- if(trailerSendIDCounter < 0)
- trailerSendIDCounter = 0;
- sendId = trailerSendIDCounter;
- }
- synchronized(trailerSendLock) {
- trailerSendID = sendId;
- trailerSendLength =
packet.trailerLength();
- trailerSentBytes = 0;
- }
- tw = new MyTrailerWriter(sendId);
- }
if(packet != null)
packet.jobDone(true, size, peer, tw);
}
@@ -1138,31 +1122,47 @@
if(sendClosed.state() && !sendingCloseMessage) return;
if(logDEBUG)
logDEBUG("Trying to send a packet...");
+ if(trailerSendID != -1) return;
+ // This is nasty...
+ // Only way to START a trailer send is through this function
+ // sentPacket -> null
+ // sentPacketLock unlocked
+ // other thread starts sending a message with a trailer
+ // other thread hits jobDone
+ // other thread locks sentPacketLock, clears sentPacket
+ // other thread sets trailerSendID, unlocks sentPacketLock
+ // But if we check trailerSendID right after relocking it, we're ok
+ boolean ourSentPacket = false;
synchronized(sentPacketLock) {
- logDEBUG("synchronized...");
- if(sendingCloseMessage) {
- Message cm = p.getCloseMessage();
- if(cm != null) {
+ if(trailerSendID != -1) return;
+ if(sentPacket == null) {
+ ourSentPacket = true;
+ logDEBUG("synchronized...");
+ if(sendingCloseMessage) {
+ Message cm = p.getCloseMessage();
+ if(cm != null) {
+ try {
+ sentPacket =
peerHandler.getPacket(link, p,
+
identity,
+
cm, true);
+ } catch (IOException e) {
+ sentPacket = null;
+ logDEBUG("Caught "+e+
+ " in
getPacket (close message) in jobDone");
+ }
+ }
+ sendingCloseMessage = false;
+ } else {
try {
- sentPacket =
peerHandler.getPacket(link, p, identity,
-
cm, true);
+ sentPacket =
peerHandler.getPacket(link, p);
} catch (IOException e) {
sentPacket = null;
- logDEBUG("Caught "+e+
- " in getPacket (close
message) in jobDone");
+ logDEBUG("Caught "+e+" in getPacket in
jobDone");
}
}
- sendingCloseMessage = false;
- } else {
- try {
- sentPacket = peerHandler.getPacket(link, p);
- } catch (IOException e) {
- sentPacket = null;
- logDEBUG("Caught "+e+" in getPacket in
jobDone");
- }
}
}
- if (sentPacket != null) {
+ if (sentPacket != null && ourSentPacket) {
innerSendPacket(wsl.MESSAGE + 1 - sentPacket.countMessages(),
sentPacket);
int sentPacketLength = sentPacket.getLength();
@@ -1241,6 +1241,10 @@
throws UnknownTrailerSendIDException, TrailerSendFinishedException,
AlreadySendingTrailerChunkException, IOException {
lastActiveTime = System.currentTimeMillis();
synchronized(trailerSendLock) {
+ if(logDEBUG)
+ Core.logger.log(this, "writeTrailing("+id+",byte[],"+
+
offset+","+length+","+cb+" on "+this+
+ " (id =
"+trailerSendID+")", Logger.DEBUG);
if(finalized.state())
throw new IOException("Closed:
"+finalized.state()+":"+this);
if(trailerSendID != id)
@@ -1922,8 +1926,8 @@
private void sendBytes(byte[] toSend, int off, int len, int priority) throws
IOException {
Core.logger.log(this, "Sending "+len+" bytes on "+this,
- Logger.DEBUG);
- if(conn == null)
+ new Exception("debug"), Logger.DEBUG);
+ if(conn == null)
throw new IOException("Connection closed: "+this);
java.net.Socket sock = conn.getSocket();
if(sock == null)
@@ -2140,22 +2144,30 @@
}
}
- public final void sendPacket(PeerPacket packet, int prio) {
+ /**
+ * Send a packet on the ConnectionHandler. Called by PeerHandler.
+ * @return true if we sent the packet, false if a locking conflict
+ * prevented us sending it, or the connection is closed.
+ */
+ public final boolean sendPacket(PeerPacket packet, int prio) {
if(sendClosed.state()) {
Core.logger.log(this, "sendPacket("+packet+","+prio+
") when already closed:
"+this, Logger.NORMAL);
packet.jobDone(true, 0, peer, null);
- return;
+ return false;
}
synchronized(sentPacketLock) {
if(sentPacket != null) {
- throw new
IllegalStateException("sendPacket("+packet+") but "+
-
"already sending "+sentPacket+
-
"!!");
+ return false;
+ }
+ if(trailerSendID != -1) {
+ throw new
IllegalStateException("sendPacket("+packet+","+prio+
+
" called but sending a trailer!");
}
this.sentPacket = packet;
}
innerSendPacket(prio, sentPacket);
+ return true;
}
/**
@@ -2761,9 +2773,7 @@
[EMAIL PROTECTED] whether the connection is currently sending something
*/
public final boolean sending() {
- if (!isFNP) //fcp messages can be behind trailers.
- return false;
- return trailingPresent;
+ return trailerSendID != -1;
}
/**
Index: PeerHandler.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/PeerHandler.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- PeerHandler.java 7 Oct 2003 00:47:30 -0000 1.5
+++ PeerHandler.java 7 Oct 2003 20:03:39 -0000 1.6
@@ -92,10 +92,16 @@
public boolean needsConnection() {
if(Core.logger.shouldLog(Logger.DEBUG, this))
Core.logger.log(this, "needsConnection(): "+
- this+":
"+connectionHandlers.size(),
+ this+":
chcount="+connectionHandlers.size()+
+ ",
messagescount="+messages.size(),
Logger.DEBUG);
if(messages.isEmpty() &&
- (id == null || (!(node.rt.references(id))))) return false;
+ (id == null || (!(node.rt.references(id))))) {
+ if(Core.logger.shouldLog(Logger.DEBUG, this))
+ Core.logger.log(this, "returning false immediately",
+ Logger.DEBUG);
+ return false;
+ }
// FIXME: layering
int closedCount = 0;
@@ -115,6 +121,9 @@
// If it is sending a trailer, it is not available
// if(ch.isSendingPacket()) continue;
// But if it is sending only packets, it *IS* available
+ if(Core.logger.shouldLog(Logger.DEBUG, this))
+ Core.logger.log(this, "Found free conn for
"+this,
+ Logger.DEBUG);
return false;
}
}
@@ -207,6 +216,12 @@
e.remove();
continue;
}
+ if(ch.sending()) {
+ Core.logger.log(this, ch.toString()+
+ " already
sending a trailer for "+
+ pm+" on
"+this, Logger.DEBUG);
+ continue;
+ }
if(ch.isSendingPacket()) {
Core.logger.log(this, ch.toString()+
" already
sending a packet for "+pm+
@@ -219,8 +234,7 @@
": send one packet
"+pm+" for "+this,
Logger.DEBUG);
try {
- sendSinglePacket(ch, pm);
- return;
+ if(sendSinglePacket(ch, pm)) return;
} catch (IOException ex) {
Core.logger.log(this, ch.toString()+
": caught
"+ex+" trying to send packet ("+
@@ -258,14 +272,17 @@
* Send one message on one ConnectionHandler.
* @param ch the connection on which to send the message
* @param pm the single message to send
+ * @returns true if successful, false if we didn't send the
+ * message on the connection because of a locking conflict
*/
- protected void sendSinglePacket(ConnectionHandler ch,
-
PeerPacketMessage pm)
+ protected boolean sendSinglePacket(ConnectionHandler ch,
+
PeerPacketMessage pm)
throws IOException {
PeerPacketMessage[] msgs = new PeerPacketMessage[] { pm };
PeerPacket packet = new PeerPacket(msgs, ch.getLink(),
ch.presentationType());
- ch.sendPacket(packet, freenet.transport.WriteSelectorLoop.MESSAGE);
+ return ch.sendPacket(packet,
+
freenet.transport.WriteSelectorLoop.MESSAGE);
}
public final PeerPacket getPacket(Link link, Presentation p)
Index: PeerPacketMessage.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/PeerPacketMessage.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- PeerPacketMessage.java 7 Oct 2003 00:47:30 -0000 1.3
+++ PeerPacketMessage.java 7 Oct 2003 20:03:39 -0000 1.4
@@ -37,13 +37,14 @@
}
/** Set the message up to send on a connection using a specific
- * Presentation.
+ * Presentation. Also resets message finished. Can be called
+ * multiple times.
* @param p the presentation to use to transform the message into
* a RawMessage and thence to a byte array. Can be null to clear the
* cached message.
*/
- // Can be called multiple times
public void resolve(Presentation p) {
+ finished = false;
if(this.p == p)
return;
this.p = p;
@@ -93,9 +94,11 @@
* Notify the callback that we successfully sent the message.
*/
public void notifySuccess(TrailerWriter tw) {
+ Core.logger.log(this, "notifySuccess("+tw+") for "+this,
+ Logger.DEBUG);
if(finished) {
Core.logger.log(this, "notifySuccess on "+this+" already finished!",
- Logger.ERROR);
+ new Exception("debug"), Logger.ERROR);
return;
}
finished = true;
@@ -114,9 +117,13 @@
* @param detail the excuse exception
*/
public void notifyFailure(SendFailedException detail) {
+ Core.logger.log(this, "notifyFailure("+detail+") for "+this,
+ detail, Logger.DEBUG);
+ Core.logger.log(this, "notifyFailure() for "+this,
+ new Exception("debug"), Logger.DEBUG);
if(finished) {
Core.logger.log(this, "notifyFailure on "+this+" already finished!",
- Logger.ERROR);
+ new Exception("debug"), Logger.ERROR);
return;
}
finished = true;
Index: Version.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/Version.java,v
retrieving revision 1.415
retrieving revision 1.416
diff -u -r1.415 -r1.416
--- Version.java 7 Oct 2003 00:47:30 -0000 1.415
+++ Version.java 7 Oct 2003 20:03:39 -0000 1.416
@@ -18,7 +18,7 @@
public static String protocolVersion = "1.46";
/** The build number of the current revision */
- public static final int buildNumber = 6222;
+ public static final int buildNumber = 6223;
// 6028: may 3; ARK retrieval fix
public static final int ignoreBuildsAfter = 6500;
_______________________________________________
cvs mailing list
[EMAIL PROTECTED]
http://dodo.freenetproject.org/cgi-bin/mailman/listinfo/cvs