Update of /cvsroot/freenet/freenet/src/freenet/node/states/data
In directory sc8-pr-cvs1:/tmp/cvs-serv28750/src/freenet/node/states/data
Modified Files:
SendData.java
Log Message:
6308: Fix bug in SendData w.r.t. padding. If this works, it should reduce connection
churn amongst other things. Also reindent.
Index: SendData.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/node/states/data/SendData.java,v
retrieving revision 1.33
retrieving revision 1.34
diff -u -w -r1.33 -r1.34
--- SendData.java 31 Oct 2003 19:35:40 -0000 1.33
+++ SendData.java 4 Nov 2003 19:08:36 -0000 1.34
@@ -18,7 +18,6 @@
import freenet.node.ds.KeyInputStream;
import freenet.support.Logger;
-
/**
* Sends data from the store. If the reading from the store fails, then
* the sent data will be padded until the end of the next part, where
@@ -35,7 +34,8 @@
private final TrailerWriter send;
private final KeyInputStream in;
- private final TrailerWriteCallbackMessage myTWCM; // Use a message because of
blocking I/O from store
+ private final TrailerWriteCallbackMessage myTWCM;
+ // Use a message because of blocking I/O from store
private boolean closedSend = false, closedIn = false;
private final long length, partSize;
private Exception abortedException = null;
@@ -54,29 +54,52 @@
int m = 0;
private boolean hadDSI = false;
private boolean waitingForWriteNotify = false;
+ private boolean lastNonPaddingChunk = false;
-
- public SendData(long id, long parent, TrailerWriter send,
- KeyInputStream in, long length, long partSize,
+ public SendData(
+ long id,
+ long parent,
+ TrailerWriter send,
+ KeyInputStream in,
+ long length,
+ long partSize,
Node n) {
super(id, parent);
this.send = send;
this.in = in;
- if(in == null) throw new IllegalArgumentException("null in");
+ if (in == null)
+ throw new IllegalArgumentException("null in");
this.length = length;
this.partSize = partSize;
this.n = n;
myTWCM = new TrailerWriteCallbackMessage(id, n, this);
logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this);
if(logDEBUG)
- Core.logger.log(this, "Creating SendData("+this+")", Logger.DEBUG);
+ Core.logger.log(
+ this,
+ "Creating SendData(" + this +")",
+ Logger.DEBUG);
}
public String toString() {
- return super.toString()+": send="+send+", in="+in+", moved="+moved+"/"+
- length+", partSize="+partSize+",result="+result+
- ",lastPacketLength="+lastPacketLength+
- (paddingLength != 0 ? (",sentPadding="+sentPadding+"/"+paddingLength) :
"");
+ return super.toString()
+ + ": send="
+ + send
+ + ", in="
+ + in
+ + ", moved="
+ + moved
+ + "/"
+ + length
+ + ", partSize="
+ + partSize
+ + ",result="
+ + result
+ + ",lastPacketLength="
+ + lastPacketLength
+ + (paddingLength != 0
+ ? (",sentPadding=" + sentPadding + "/" + paddingLength)
+ : "");
}
public final long length() {
@@ -95,16 +118,21 @@
}
/** If sending upstream, you want CB_ABORTED.
- * If sending downstream, you want CB_RESTARTED.
+ * If sending downstream, you
+ * want CB_RESTARTED.
*/
public final void abort(int cb) {
silent = true;
result = cb;
if(Core.logger.shouldLog(Logger.DEBUG,this)) {
abortedException = new Exception("debug");
- Core.logger.log(this, "Aborted send for "+
- Long.toHexString(parent)+" with cb="+
- Integer.toHexString(cb), abortedException,
+ Core.logger.log(
+ this,
+ "Aborted send for "
+ + Long.toHexString(parent)
+ + " with cb="
+ + Integer.toHexString(cb),
+ abortedException,
Logger.DEBUG);
}
}
@@ -115,41 +143,43 @@
public void finalize() {
try {
- if(!closedIn) in.close();
- } catch (IOException e) {};
- if(send != null && !closedSend) send.close();
+ if (!closedIn)
+ in.close();
+ } catch (IOException e) {
+ };
+ if (send != null && !closedSend)
+ send.close();
}
- /** Sheesh! We're too overworked to even try to write CB_ABORTED.
+ /**
+ * Sheesh! We're too overworked to even try to write CB_ABORTED.
*/
public final void lost(Node n) {
try {
in.close();
closedIn = true;
} catch (IOException e) {
- Core.logger.log(this, "I/O error closing KeyInputStream",
- e, Logger.ERROR);
+ Core.logger.log(
+ this,
+ "I/O error closing KeyInputStream",
+ e,
+ Logger.ERROR);
}
- if(send != null) send.close();
+ if (send != null)
+ send.close();
closedSend = true;
}
-
- /* Async sending
+ /*
+ * Async sending
*
- * received(n, DataStateInitiator) ->
- * Reset buffer pointer
- * Read into buffer
+ * received(n, DataStateInitiator) -> Reset buffer pointer Read into buffer
* Start write
*
- * closed() ->
- * Set CB to CB_SEND_CONN_DIED
- * Run the block that used to be finally in received(..) to finish properly
+ * closed() -> Set CB to CB_SEND_CONN_DIED Run the block that used to be
+ * finally in received(..) to finish properly
*
- * written() ->
- * Reset buffer pointer
- * Read into buffer
- * Start write
+ * written() -> Reset buffer pointer Read into buffer Start write
*/
public State received(Node n, MessageObject mo) throws BadStateException {
boolean isDSI = (mo instanceof DataStateInitiator);
@@ -169,9 +199,19 @@
if (isTWCM) {
waitingForWriteNotify = false;
- if (logDEBUG) Core.logger.log(this, "Got " + myTWCM + " for "
+ this, Logger.DEBUG);
+ if (logDEBUG)
+ Core.logger.log(
+ this,
+ "Got " + myTWCM + " for " + this,
+ Logger.DEBUG);
if (!myTWCM.finished) {
- Core.logger.log(this, "Got a TWCM that was not
finished!: " + myTWCM + " for " + this, Logger.ERROR);
+ Core.logger.log(
+ this,
+ "Got a TWCM that was not finished!: "
+ + myTWCM
+ + " for "
+ + this,
+ Logger.ERROR);
return this;
}
if (!myTWCM.success) {
@@ -179,10 +219,11 @@
if (handleThrowable(null, true) == null)
return null;
} else {
- if (inPaddingMode)
+ if (inPaddingMode && !lastNonPaddingChunk)
sentPadding += lastPacketLength;
else
moved += lastPacketLength;
+ lastNonPaddingChunk = false;
}
}
@@ -195,7 +236,8 @@
if (bufferEndPtr == -1)
throw new IOException("read failed: " + this);
} catch (Throwable t) {
- return handleThrowable(t, false); // will do
termination and failure code
+ return handleThrowable(t, false);
+ // will do termination and failure code
}
}
try {
@@ -212,63 +254,123 @@
/**
* Read bytes from the store into the buffer, starting at the beginning.
* Return the number read.
- * @throws IOException if something breaks
+ *
+ * @throws IOException
+ * if something breaks
*/
protected int doRead() throws IOException {
- return in.read(buffer, 0, (int) Math.min(length - moved, buffer.length));
+ return in.read(
+ buffer,
+ 0,
+ (int) Math.min(length - moved, buffer.length));
}
/**
* Start to write some bytes to the connection
*/
- protected void startWrite(int bytes) throws UnknownTrailerSendIDException,
TrailerSendFinishedException, AlreadySendingTrailerChunkException, IOException {
- if(bytes <= 0) return;
+ protected void startWrite(int bytes)
+ throws
+ UnknownTrailerSendIDException,
+ TrailerSendFinishedException,
+ AlreadySendingTrailerChunkException,
+ IOException {
+ if (bytes <= 0)
+ return;
myTWCM.reset();
lastPacketLength = bytes;
waitingForWriteNotify = true;
- if(send != null) send.writeTrailing(buffer, 0, bytes, myTWCM);
+ if (send != null)
+ send.writeTrailing(buffer, 0, bytes, myTWCM);
}
/**
- * Handle a throwable thrown during I/O. We are expected to close the connection,
- * etc.
- * @param t the throwable causing the failure, can be null.
- * @param inWrite whether we were writing at the time. false means we were
reading.
+ * Handle a throwable thrown during I/O. We are expected to close the
+ * connection, etc.
+ *
+ * @param t
+ * the throwable causing the failure, can be null.
+ * @param inWrite
+ * whether we were writing at the time. false means we were
+ * reading.
*/
protected State handleThrowable(Throwable t, boolean inWrite) {
if(logDEBUG) {
if(t == null)
- Core.logger.log(this, "SendData.handleThrowable(null,"+inWrite+
- ") on "+this, Logger.DEBUG);
+ Core.logger.log(
+ this,
+ "SendData.handleThrowable(null," + inWrite +
") on " + this,
+ Logger.DEBUG);
else
- Core.logger.log(this, "SendData.handleThrowable("+t+","+inWrite+
- ") on "+this, t, Logger.DEBUG);
+ Core.logger.log(
+ this,
+ "SendData.handleThrowable("
+ + t
+ + ","
+ + inWrite
+ + ") on "
+ + this,
+ t,
+ Logger.DEBUG);
}
if(t == null) {
if(result == -1)
- Core.logger.log(this, "handleThrowable caller must set result if
passing null Throwable!", new Exception("grrr"), Logger.ERROR);
+ Core.logger.log(
+ this,
+ "handleThrowable caller must set result if
passing null Throwable!",
+ new Exception("grrr"),
+ Logger.ERROR);
} else if(t instanceof IOException) {
- if(inWrite) result = Presentation.CB_SEND_CONN_DIED;
+ if (inWrite)
+ result = Presentation.CB_SEND_CONN_DIED;
else {
int ifc = in.getFailureCode();
if(ifc == -1) {
- if(logDEBUG) Core.logger.log(this, "Cache failed between writing "+
- "and reading for
"+Long.toHexString(id)+": "+
- t, t, Logger.DEBUG);
+ if (logDEBUG)
+ Core.logger.log(
+ this,
+ "Cache failed between writing "
+ + "and reading for "
+ + Long.toHexString(id)
+ + ": "
+ + t,
+ t,
+ Logger.DEBUG);
result = Presentation.CB_CACHE_FAILED;
- } else result = ifc;
+ } else
+ result = ifc;
if (result == Presentation.CB_CACHE_FAILED) {
- Core.logger.log(this, "Cache failed signalled after exception " +
- "after " + moved + " of " + length
- + " bytes: "+t+" for "+Long.toHexString(id)+
- " ("+Long.toHexString(parent)+".", t,
+ Core.logger.log(
+ this,
+ "Cache failed signalled after
exception "
+ + "after "
+ + moved
+ + " of "
+ + length
+ + " bytes: "
+ + t
+ + " for "
+ + Long.toHexString(id)
+ + " ("
+ + Long.toHexString(parent)
+ + ".",
+ t,
Logger.ERROR);
}
}
} else {
- Core.logger.log(this, "Unexpected exception "+t+" in SendData "+
- this+" (inWrite="+inWrite+")", t, Logger.ERROR);
- result = Presentation.CB_CACHE_FAILED; // well, sorta
+ Core.logger.log(
+ this,
+ "Unexpected exception "
+ + t
+ + " in SendData "
+ + this
+ + " (inWrite="
+ + inWrite
+ + ")",
+ t,
+ Logger.ERROR);
+ result = Presentation.CB_CACHE_FAILED; // well,
+
// sorta
}
Core.diagnostics.occurrenceBinomial("sentData", 1, 0);
@@ -276,100 +378,180 @@
in.close();
closedIn = true;
} catch (IOException e) {
- Core.logger.log(this, "I/O error closing KeyInputStream",
- e, Logger.ERROR);
+ Core.logger.log(
+ this,
+ "I/O error closing KeyInputStream",
+ e,
+ Logger.ERROR);
}
- Core.logger.log(this, "Send failed for "+Long.toHexString(id)+
- " ("+Long.toHexString(parent)+" - result="+
- Long.toHexString(result)+", cause: "+t, Logger.MINOR);
+ Core.logger.log(
+ this,
+ "Send failed for "
+ + Long.toHexString(id)
+ + " ("
+ + Long.toHexString(parent)
+ + " - result="
+ + Long.toHexString(result)
+ + ", cause: "
+ + t,
+ Logger.MINOR);
if(inWrite) {
- if(send != null) send.close();
+ if (send != null)
+ send.close();
closedSend = true;
} else if(!inPaddingMode) {
if(moved == length) {
if(result == -1)
- Core.logger.log(this, "WTF? moved = length in handleThrowable "+
- "for "+this, new Exception("debug"),
+ Core.logger.log(
+ this,
+ "WTF? moved = length in
handleThrowable "
+ + "for "
+ + this,
+ new Exception("debug"),
Logger.NORMAL);
} else {
try {
startWritePadding();
return this;
- // don't send the DataSent until the padding has finished writing
+ // don't send the DataSent until the padding
has finished
+ // writing
} catch (IOException e) {
// Failed
t = e;
} catch (TrailerException e) {
- Core.logger.log(this, "Got "+e+" starting writing padding for
"+this,
- e, Logger.ERROR);
+ Core.logger.log(
+ this,
+ "Got " + e + " starting writing
padding for " + this,
+ e,
+ Logger.ERROR);
}
}
// Failed or already at end
- if(send != null) send.close();
+ if (send != null)
+ send.close();
closedSend = true;
} else {
// Padding failed
- if(send != null) send.close();
+ if (send != null)
+ send.close();
closedSend = true;
}
buffer = null; // early GC
- if(!silent) n.schedule(new DataSent(this));
+ if (!silent)
+ n.schedule(new DataSent(this));
return null;
}
- protected void startWritePadding() throws UnknownTrailerSendIDException,
TrailerSendFinishedException, AlreadySendingTrailerChunkException, IOException {
+ protected void startWritePadding()
+ throws
+ UnknownTrailerSendIDException,
+ TrailerSendFinishedException,
+ AlreadySendingTrailerChunkException,
+ IOException {
// Pad until end of part
inPaddingMode = true;
int controlLength = Key.getControlLength();
long tmpLen = partSize + controlLength;
- paddingLength = Math.min(tmpLen - moved % tmpLen,
- length - moved);
+ paddingLength = Math.min(tmpLen - moved % tmpLen, length - moved);
// Either way, it includes the padding byte - and the part hash
- if(!waitingForWriteNotify)
+ if (!waitingForWriteNotify) {
+ Core.logger.log(
+ this,
+ "Writing first padding chunk for " + this,
+ Logger.DEBUG);
sendWritePadding();
+ } else {
+ Core.logger.log(this,
+ "Deferring first padding chunk for " + this,
+ Logger.DEBUG);
+ lastNonPaddingChunk = true;
+ }
}
- protected void sendWritePadding() throws UnknownTrailerSendIDException,
TrailerSendFinishedException, AlreadySendingTrailerChunkException, IOException {
+ protected void sendWritePadding()
+ throws
+ UnknownTrailerSendIDException,
+ TrailerSendFinishedException,
+ AlreadySendingTrailerChunkException,
+ IOException {
byte[] stuffToSend;
long remainingPadding = paddingLength - sentPadding;
if(logDEBUG)
- Core.logger.log(this, "sendWritePadding(): paddingLength="+
- paddingLength+"/"+sentPadding+" ("+this+")",
+ Core.logger.log(
+ this,
+ "sendWritePadding(): paddingLength="
+ + paddingLength
+ + "/"
+ + sentPadding
+ + " ("
+ + this
+ + ")",
Logger.DEBUG);
- if(remainingPadding <= 0) return; // we will get finished
- if(remainingPadding < (Core.blockSize/*Key.getControlLength()*/)) {
+ if (remainingPadding <= 0)
+ return; // we will get finished
+ if (remainingPadding < (Core.blockSize /* Key.getControlLength() */
+ )) {
// Last chunk, yay
stuffToSend = new byte[(int)remainingPadding];
Random r = new Random(Core.getRandSource().nextLong());
- r.nextBytes(stuffToSend); // is this necessary? it used to be 0 padded
+ r.nextBytes(stuffToSend);
+ // is this necessary? it used to be 0 padded
// FIXME: assumes getControlLength() == 1
- stuffToSend[stuffToSend.length-1] = (byte)
- ((result == Presentation.CB_ABORTED) ?
- Presentation.CB_ABORTED : Presentation.CB_RESTARTED);
+ stuffToSend[stuffToSend.length - 1] =
+ (byte) ((result == Presentation.CB_ABORTED)
+ ? Presentation.CB_ABORTED
+ : Presentation.CB_RESTARTED);
lastPacketLength = stuffToSend.length;
} else {
// Just another chunk
stuffToSend = new byte[Core.blockSize];
Random r = new Random(Core.getRandSource().nextLong());
- r.nextBytes(stuffToSend); // is this necessary? it used to be 0 padded
+ r.nextBytes(stuffToSend);
+ // is this necessary? it used to be 0 padded
lastPacketLength = Core.blockSize;
}
myTWCM.reset();
waitingForWriteNotify = true;
- if(send != null) send.writeTrailing(stuffToSend, 0,
- stuffToSend.length, myTWCM);
+ if (send != null)
+ send.writeTrailing(stuffToSend, 0, stuffToSend.length, myTWCM);
}
protected State finish() {
- if (!(moved == length || (inPaddingMode && sentPadding >=
paddingLength) || (result != -1 && !inPaddingMode))) {
- Core.logger.log(this, "Not finishing because moved=" + moved +
"/" + length + ", inPaddingMode=" + inPaddingMode + ", sentPadding=" + sentPadding +
"/" + paddingLength + " (" + this +")", Logger.DEBUG);
+ if (!(moved == length
+ || (inPaddingMode && sentPadding >= paddingLength)
+ || (result != -1 && !inPaddingMode))) {
+ Core.logger.log(
+ this,
+ "Not finishing because moved="
+ + moved
+ + "/"
+ + length
+ + ", inPaddingMode="
+ + inPaddingMode
+ + ", sentPadding="
+ + sentPadding
+ + "/"
+ + paddingLength
+ + " ("
+ + this
+ + ")",
+ Logger.DEBUG);
return this;
}
if (inPaddingMode && sentPadding > paddingLength)
- Core.logger.log(this, "sentPadding=" + sentPadding + "/" +
paddingLength + " (" + this +")", Logger.NORMAL);
+ Core.logger.log(
+ this,
+ "sentPadding="
+ + sentPadding
+ + "/"
+ + paddingLength
+ + " ("
+ + this
+ + ")",
+ Logger.NORMAL);
if (result != -1) {
// We were aborted
return handleThrowable(null, false);
@@ -381,135 +563,19 @@
in.close();
closedIn = true;
} catch (IOException e) {
- Core.logger.log(this, "Caught " + e + " closing input
(successful): " + this, Logger.NORMAL);
+ Core.logger.log(
+ this,
+ "Caught " + e + " closing input (successful): " + this,
+ Logger.NORMAL);
}
if (moved == length)
result = Presentation.CB_OK;
- Core.diagnostics.occurrenceBinomial("sentData", 1, result ==
Presentation.CB_OK ? 1 : 0);
+ Core.diagnostics.occurrenceBinomial(
+ "sentData",
+ 1,
+ result == Presentation.CB_OK ? 1 : 0);
if (!silent)
n.schedule(new DataSent(this));
return null;
};
-
-// try {
-// while (moved < length) {
-// inWrite = false;
-// if (result != -1) throw new CancelledIOException();
-// int m = in.read(buffer, 0, (int) Math.min(length - moved,
buffer.length));
-// if (m == -1) {
-// throw new IOException("Stopped short of full transfer");
-// }
-// inWrite = true;
-// if(logDEBUG)
-// Core.logger.log(this, "Read "+(moved+m)+" of "+length+
-// " bytes for "+Long.toHexString(parent)+" ("+in+")",
-// Logger.DEBUG);
-// send.write(buffer, 0, m);
-// moved += m;
-// if(logDEBUG)
-// Core.logger.log(this, "Moved "+moved+" of "+length+" bytes for "+
-// Long.toHexString(parent)+" ("+in+")",
-// Logger.DEBUG);
-// }
-// send.close();
-// closedSend = true;
-// result = Presentation.CB_OK;
-// } catch (CancelledIOException e) {
-// if(logDEBUG)
-// Core.logger.log(this, "Cancelled IO: "+abortedException+" for "+
-// Long.toHexString(parent), abortedException,
-// Logger.DEBUG);
-// } catch (IOException e) {
-// // if it was aborted we can expect the aborter to set the failure
-// // code.
-// result = (inWrite ? Presentation.CB_SEND_CONN_DIED
-// : in.getFailureCode());
-// if (result == -1) // it broke some time between writing and reading
-// {
-// if(logDEBUG) Core.logger.log(this, "Cache failed between writing "+
-// "and reading for "+Long.toHexString(id),
-// e, Logger.DEBUG);
-// result = Presentation.CB_CACHE_FAILED;
-// }
-// if (result == Presentation.CB_CACHE_FAILED) {
-// Core.logger.log(this,
-// "Cache failed signalled after exception " +
-// "after " + moved + " of " + length
-// + " bytes: "+e+" for "+Long.toHexString(id)+
-// " ("+Long.toHexString(parent)+".", e ,
-// Logger.ERROR);
-// }
-// } finally {
-
-// n.diagnostics.occurrenceBinomial("sentData", 1,
-// result == Presentation.CB_OK ?
-// 1 : 0);
-
-// if(result != Presentation.CB_OK) {
-// if(abortedException != null) {
-// Core.logger.log(this, "Send aborted for "+Long.toHexString(id)+
-// " ("+Long.toHexString(parent)+" - result="+
-// Long.toHexString(result),
-// abortedException, Logger.MINOR);
-// abortedException = null;
-// } else {
-// Core.logger.log(this, "Send failed for "+Long.toHexString(id)+
-// " ("+Long.toHexString(parent)+" - result="+
-// Long.toHexString(result), Logger.MINOR);
-// }
-// }
-
-// buffer = null;
-
-// try {
-// in.close();
-// closedIn = true;
-// } catch (IOException e) {
-// Core.logger.log(this, "I/O error closing KeyInputStream",
-// e, Logger.ERROR);
-// }
-
-// if (moved < length && !inWrite) {
-// try {
-// // pad until end of part
-// long tmpLen = partSize + Key.getControlLength();
-// tmpLen = Math.min(tmpLen - moved % tmpLen, length - moved) - 1;
-
-// byte[] zeroes = new byte[Core.blockSize];
-// while (tmpLen > 0) {
-// int m = (int) Math.min(tmpLen, zeroes.length);
-// send.write(zeroes, 0, m);
-// tmpLen -= m;
-// }
-// send.write(result == Presentation.CB_ABORTED ?
Presentation.CB_ABORTED
-// : Presentation.CB_RESTARTED);
-// } catch (IOException e) {
-// // this is important b/c it lets the parent chain know that
-// // it shouldn't try to get back in touch with the upstream
-// result = Presentation.CB_SEND_CONN_DIED;
-// }
-// }
-
-// if (result != Presentation.CB_OK) {
-// try {
-// send.close();
-// closedSend = true;
-// }
-// catch (IOException e) {
-// Core.logger.log(this, "I/O error closing data send stream",
-// e, Logger.MINOR);
-// }
-// }
-
-// // we could be exiting with an uncaught exception or something..
-// if (result == -1) result = Presentation.CB_SEND_CONN_DIED;
-
-// // had to wait around to see if we'd get a CB_SEND_CONN_DIED
-// // when padding the write
-// if (!silent) n.schedule(new DataSent(this));
-// }
-
-// return null;
-// }
}
-
_______________________________________________
cvs mailing list
[EMAIL PROTECTED]
http://dodo.freenetproject.org/cgi-bin/mailman/listinfo/cvs