Update of /cvsroot/freenet/freenet/src/freenet/node/states/data
In directory sc8-pr-cvs1:/tmp/cvs-serv17951/src/freenet/node/states/data
Modified Files:
DataState.java ReceiveData.java SendData.java
Added Files:
TrailerWriteCallbackMessage.java
Log Message:
6195: (mostly) asynchronous trailer writing, lots of logging improvements (most code
supports selective logging now), bugfixes
--- NEW FILE: TrailerWriteCallbackMessage.java ---
package freenet.node.states.data;
import freenet.*;
import freenet.node.*;
/**
* Message form of the TrailerWriteCallback. Used by states that need to do
* expensive stuff e.g. blocking reads after the write succeeds.
*/
class TrailerWriteCallbackMessage extends EventMessageObject implements
TrailerWriteCallback {
boolean finished;
boolean success;
Node n;
public TrailerWriteCallbackMessage(long id, Node n) {
super(id, false); // Data states are internal, see DataStateInitiator
this.n = n;
reset();
}
public void reset() {
finished = false;
success = false;
}
public void closed() {
success = false;
finished = true;
n.schedule(this);
}
public void written() {
success = true;
finished = true;
n.schedule(this);
}
public String toString() {
return getClass().getName() + ":" + finished + ":" + success;
}
}
Index: DataState.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/node/states/data/DataState.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- DataState.java 19 Jun 2003 13:31:20 -0000 1.2
+++ DataState.java 18 Sep 2003 17:48:11 -0000 1.3
@@ -48,7 +48,7 @@
}
public String toString() {
- return getClass().getName()+": "+Long.toHexString(id)+"/"+
+ return super.toString()+": "+Long.toHexString(id)+"/"+
Long.toHexString(parent);
}
}
Index: ReceiveData.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/node/states/data/ReceiveData.java,v
retrieving revision 1.21
retrieving revision 1.22
diff -u -r1.21 -r1.22
--- ReceiveData.java 1 Jul 2003 23:50:02 -0000 1.21
+++ ReceiveData.java 18 Sep 2003 17:48:11 -0000 1.22
@@ -59,7 +59,7 @@
}
public final void commit() throws IOException, KeyCollisionException {
- if(Core.logger.shouldLog(Logger.DEBUG))
+ if(Core.logger.shouldLog(Logger.DEBUG,this))
Core.logger.log(this, "Committing "+this, Core.logger.DEBUG);
if (out != null)
out.commit();
@@ -103,7 +103,7 @@
byte[] buffer = new byte[Core.blockSize];
long moved = 0;
- boolean logDEBUG = n.logger.shouldLog(Logger.DEBUG);
+ boolean logDEBUG = n.logger.shouldLog(Logger.DEBUG,this);
try {
while (moved < length) {
Index: SendData.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/node/states/data/SendData.java,v
retrieving revision 1.16
retrieving revision 1.17
diff -u -r1.16 -r1.17
--- SendData.java 20 Aug 2003 17:11:33 -0000 1.16
+++ SendData.java 18 Sep 2003 17:48:11 -0000 1.17
@@ -7,6 +7,7 @@
import freenet.support.*;
import freenet.support.io.*;
import java.io.*;
+import java.util.*;
/**
@@ -23,22 +24,48 @@
public class SendData extends DataState {
- private final OutputStream send;
+ private final TrailerWriter send;
private final KeyInputStream in;
+ private final TrailerWriteCallbackMessage myTWCM; // Use a message because of
blocking I/O from store
private boolean closedSend = false, closedIn = false;
private final long length, partSize;
private Exception abortedException = null;
private volatile int result = -1;
private boolean silent = false;
+ private boolean inPaddingMode = false;
+ private long paddingLength = 0;
+ private long sentPadding = 0;
+ private int lastPacketLength = 0;
+ private Node n;
+ boolean logDEBUG = true;
+ boolean inWrite = false;
+ long moved = 0;
+ byte[] buffer = null;
+ int bufferEndPtr = 0;
+ int m = 0;
+ private boolean hadDSI = false;
+ private boolean waitingForWriteNotify = false;
- public SendData(long id, long parent, OutputStream send,
- KeyInputStream in, long length, long partSize) {
+ public SendData(long id, long parent, TrailerWriter send,
+ KeyInputStream in, long length, long partSize,
+ Node n) {
super(id, parent);
this.send = send;
this.in = in;
+ if(in == null) throw new IllegalArgumentException("null in");
this.length = length;
this.partSize = partSize;
+ this.n = n;
+ myTWCM = new TrailerWriteCallbackMessage(id, n);
+ logDEBUG = n.logger.shouldLog(Logger.DEBUG,this);
+ if(logDEBUG)
+ n.logger.log(this, "Creating SendData("+this+")", Logger.DEBUG);
+ }
+
+ public String toString() {
+ return super.toString()+": send="+send+", in="+in+", moved="+moved+"/"+
+ length+", partSize="+partSize;
}
public final long length() {
@@ -55,7 +82,7 @@
public final void abort(int cb) {
silent = true;
result = cb;
- if(Core.logger.shouldLog(Logger.DEBUG)) {
+ if(Core.logger.shouldLog(Logger.DEBUG,this)) {
abortedException = new Exception("debug");
Core.logger.log(this, "Aborted send for "+
Long.toHexString(parent)+" with cb="+
@@ -72,9 +99,7 @@
try {
if(!closedIn) in.close();
} catch (IOException e) {};
- try {
- if(!closedSend) send.close();
- } catch (IOException e) {};
+ if(send != null && !closedSend) send.close();
}
/** Sheesh! We're too overworked to even try to write CB_ABORTED.
@@ -83,165 +108,381 @@
try {
in.close();
closedIn = true;
- }
- catch (IOException e) {
+ } catch (IOException e) {
n.logger.log(this, "I/O error closing KeyInputStream",
e, Logger.ERROR);
}
- try {
- send.close();
- closedSend = true;
- }
- catch (IOException e) {
- n.logger.log(this, "I/O error closing data send stream",
- e, Logger.MINOR);
- }
+ if(send != null) send.close();
+ closedSend = true;
}
- boolean inWrite = false;
- long moved = 0;
- byte[] buffer = null;
- int m = 0;
-
+
+ /* Async sending
+ *
+ * received(n, DataStateInitiator) ->
+ * Reset buffer pointer
+ * Read into buffer
+ * Start write
+ *
+ * closed() ->
+ * Set CB to CB_SEND_CONN_DIED
+ * Run the block that used to be finally in received(..) to finish properly
+ *
+ * written() ->
+ * Reset buffer pointer
+ * Read into buffer
+ * Start write
+ */
public State received(Node n, MessageObject mo) throws BadStateException {
- if (!(mo instanceof DataStateInitiator))
- throw new BadStateException("expecting DataStateInitiator");
-
- // if there is an IOException, this says
- // whether it was while writing to the store
- inWrite = false;
- moved = 0;
- m = 0;
+ boolean isDSI = (mo instanceof DataStateInitiator);
+ if(hadDSI) isDSI = false;
+ boolean isTWCM = (mo == myTWCM);
+ if(!(isDSI || isTWCM))
+ throw new BadStateException("expecting DataStateInitiator");
- boolean logDEBUG = n.logger.shouldLog(Logger.DEBUG);
+ if(isDSI) {
+ hadDSI = true;
+ moved = 0;
+ m = 0;
+ }
- try {
- buffer = new byte[Core.blockSize];
- while (moved < length) {
- inWrite = false;
- if (result != -1) throw new CancelledIOException();
- int m = in.read(buffer, 0, (int) Math.min(length - moved,
buffer.length));
- if (m == -1) {
- throw new IOException("Stopped short of full transfer");
- }
- inWrite = true;
- if(logDEBUG)
- n.logger.log(this, "Read "+(moved+m)+" of "+length+
- " bytes for "+Long.toHexString(parent)+" ("+in+")",
- Logger.DEBUG);
- send.write(buffer, 0, m);
- moved += m;
- if(logDEBUG)
- n.logger.log(this, "Moved "+moved+" of "+length+" bytes for "+
- Long.toHexString(parent)+" ("+in+")",
- Logger.DEBUG);
- }
- send.close();
- closedSend = true;
- result = Presentation.CB_OK;
- }
- catch (CancelledIOException e) {
+ logDEBUG = n.logger.shouldLog(Logger.DEBUG,this);
+
+ if(isTWCM) {
+ waitingForWriteNotify = false;
if(logDEBUG)
- n.logger.log(this, "Cancelled IO: "+abortedException+" for "+
- Long.toHexString(parent), abortedException,
+ n.logger.log(this, "Got "+myTWCM+" for "+this,
Logger.DEBUG);
- } // result already set
- //catch (BadDataException e) {
- // result = e.getFailureCode();
- //}
- catch (IOException e) {
- // if it was aborted we can expect the aborter to set the failure
- // code.
- result = (inWrite ? Presentation.CB_SEND_CONN_DIED
- : in.getFailureCode());
- if (result == -1) // it broke some time between writing and reading
- {
+ if(!myTWCM.finished) {
+ n.logger.log(this, "Got a TWCM that was not finished!: "+
+ myTWCM+" for "+this, Logger.ERROR);
+ return this;
+ }
+ if(!myTWCM.success) {
+ result = Presentation.CB_SEND_CONN_DIED;
+ if(handleThrowable(null, true) == null)
+ return null;
+ } else {
+ moved += lastPacketLength;
+ }
+ }
+
+ if(isDSI)
+ buffer = new byte[Core.blockSize];
+
+ if(!inPaddingMode) {
+ try {
+ bufferEndPtr = doRead();
+ if(bufferEndPtr == -1) throw new IOException("duh!");
+ } catch (Throwable t) {
+ return handleThrowable(t, false); // will do termination and failure
code
+ }
+ }
+ try {
+ if(inPaddingMode)
+ startWritePadding();
+ else
+ startWrite(bufferEndPtr);
+ } catch (Throwable t) {
+ return handleThrowable(t, true);
+ }
+ return finish(); // Must check whether we are actually finished!
+ }
+
+ /**
+ * Read bytes from the store into the buffer, starting at the beginning.
+ * Return the number read.
+ * @throws IOException if something breaks
+ */
+ protected int doRead() throws IOException {
+ return in.read(buffer, 0, (int) Math.min(length - moved, buffer.length));
+ }
+
+ /**
+ * Start to write some bytes to the connection
+ */
+ protected void startWrite(int bytes) throws UnknownTrailerSendIDException,
TrailerSendFinishedException, AlreadySendingTrailerChunkException, IOException {
+ if(bytes <= 0) return;
+ myTWCM.reset();
+ lastPacketLength = bytes;
+ waitingForWriteNotify = true;
+ if(send != null) send.writeTrailing(buffer, 0, bytes, myTWCM);
+ }
+
+ /**
+ * Handle a throwable thrown during I/O. We are expected to close the connection,
+ * etc.
+ * @param t the throwable causing the failure, can be null.
+ * @param inWrite whether we were writing at the time. false means we were
reading.
+ */
+ protected State handleThrowable(Throwable t, boolean inWrite) {
+ if(logDEBUG) {
+ if(t == null)
+ n.logger.log(this, "SendData.handleThrowable(null,"+inWrite+
+ ") on "+this, Logger.DEBUG);
+ else
+ n.logger.log(this, "SendData.handleThrowable("+t+","+inWrite+
+ ") on "+this, t, Logger.DEBUG);
+ }
+ if(t == null) {
+ if(result == -1)
+ n.logger.log(this, "handleThrowable caller must set result if passing
null Throwable!", new Exception("grrr"), Logger.ERROR);
+ } else if(t instanceof IOException) {
+ if(inWrite) result = Presentation.CB_SEND_CONN_DIED;
+ else {
+ int ifc = in.getFailureCode();
+ if(ifc == -1) {
if(logDEBUG) n.logger.log(this, "Cache failed between writing "+
- "and reading for "+Long.toHexString(id),
- e, Logger.DEBUG);
+ "and reading for
"+Long.toHexString(id)+": "+
+ t, t, Logger.DEBUG);
result = Presentation.CB_CACHE_FAILED;
+ } else result = ifc;
+ if (result == Presentation.CB_CACHE_FAILED) {
+ Core.logger.log(this, "Cache failed signalled after exception " +
+ "after " + moved + " of " + length
+ + " bytes: "+t+" for "+Long.toHexString(id)+
+ " ("+Long.toHexString(parent)+".", t,
+ Logger.ERROR);
}
- if (result == Presentation.CB_CACHE_FAILED) {
- Core.logger.log(this,
- "Cache failed signalled after exception " +
- "after " + moved + " of " + length
- + " bytes: "+e+" for "+Long.toHexString(id)+
- " ("+Long.toHexString(parent)+".", e ,
- Logger.ERROR);
- }
- } finally {
-
- n.diagnostics.occurrenceBinomial("sentData", 1,
- result == Presentation.CB_OK ?
- 1 : 0);
-
- if(result != Presentation.CB_OK) {
- if(abortedException != null) {
- Core.logger.log(this, "Send aborted for "+Long.toHexString(id)+
- " ("+Long.toHexString(parent)+" - result="+
- Long.toHexString(result),
- abortedException, Logger.MINOR);
- abortedException = null;
- } else {
- Core.logger.log(this, "Send failed for "+Long.toHexString(id)+
- " ("+Long.toHexString(parent)+" - result="+
- Long.toHexString(result), Logger.MINOR);
+ }
+ } else {
+ Core.logger.log(this, "Unexpected exception "+t+" in SendData "+
+ this+" (inWrite="+inWrite+")", t, Logger.ERROR);
+ result = Presentation.CB_CACHE_FAILED; // well, sorta
+ }
+
+ n.diagnostics.occurrenceBinomial("sentData", 1, 0);
+ try {
+ in.close();
+ closedIn = true;
+ } catch (IOException e) {
+ n.logger.log(this, "I/O error closing KeyInputStream",
+ e, Logger.ERROR);
+ }
+ n.logger.log(this, "Send failed for "+Long.toHexString(id)+
+ " ("+Long.toHexString(parent)+" - result="+
+ Long.toHexString(result)+", cause: "+t, Logger.MINOR);
+ if(inWrite) {
+ if(send != null) send.close();
+ closedSend = true;
+ } else if(!inPaddingMode) {
+ if(moved == length)
+ n.logger.log(this, "WTF? moved = length in handleThrowable "+
+ "for "+this, new Exception("debug"),
+ Logger.NORMAL);
+ else {
+ try {
+ startWritePadding();
+ return this;
+ // don't send the DataSent until the padding has finished writing
+ } catch (IOException e) {
+ // Failed
+ t = e;
+ } catch (TrailerException e) {
+ Core.logger.log(this, "Got "+e+" starting writing padding for
"+this,
+ e, Logger.ERROR);
}
}
+ // Failed or already at end
+ if(send != null) send.close();
+ closedSend = true;
+ } else {
+ // Padding failed
+ if(send != null) send.close();
+ closedSend = true;
+ }
+
+ buffer = null; // early GC
+
+ if(!silent) n.schedule(new DataSent(this));
+ return null;
+ }
+
+ protected void startWritePadding() throws UnknownTrailerSendIDException,
TrailerSendFinishedException, AlreadySendingTrailerChunkException, IOException {
+ // Pad until end of part
+ inPaddingMode = true;
+ int controlLength = Key.getControlLength();
+ long tmpLen = partSize + controlLength;
+ paddingLength = Math.min(tmpLen - moved % tmpLen, length - moved) - 1;
+
+ if(!waitingForWriteNotify)
+ sendWritePadding();
+ }
+
+ protected void sendWritePadding() throws UnknownTrailerSendIDException,
TrailerSendFinishedException, AlreadySendingTrailerChunkException, IOException {
+ byte[] stuffToSend;
+ long remainingPadding = paddingLength - sentPadding;
+ if(remainingPadding < (Core.blockSize - /*Key.getControlLength()*/ 1)) {
+ // Last chunk, yay
+ stuffToSend = new byte[/*Key.getControlLength()*/ 1 +
(int)remainingPadding];
+ Random r = new Random(Core.randSource.nextLong());
+ r.nextBytes(stuffToSend); // is this necessary? it used to be 0 padded
+ // FIXME: assumes getControlLength() == 1
+ stuffToSend[stuffToSend.length-1] = (byte)
+ ((result == Presentation.CB_ABORTED) ?
+ Presentation.CB_ABORTED : Presentation.CB_RESTARTED);
+ sentPadding = paddingLength;
+ } else {
+ // Just another chunk
+ stuffToSend = new byte[Core.blockSize];
+ Random r = new Random(Core.randSource.nextLong());
+ r.nextBytes(stuffToSend); // is this necessary? it used to be 0 padded
+ sentPadding += Core.blockSize;
+ }
+ myTWCM.reset();
+ waitingForWriteNotify = true;
+ if(send != null) send.writeTrailing(stuffToSend, 0,
+ stuffToSend.length, myTWCM);
+ }
+
+ protected State finish() {
+ if(!(moved == length || (inPaddingMode && sentPadding == paddingLength) ||
+ (result != -1))) {
+ n.logger.log(this, "Not finishing because moved="+moved+"/"+length+
+ ", inPaddingMode="+inPaddingMode+", sentPadding="+sentPadding+
+ "/"+paddingLength+" ("+this+")", Logger.DEBUG);
+ return this;
+ }
+ if(result != -1) {
+ // We were aborted
+ return handleThrowable(null, false);
+ }
+ if(send != null) send.close();
+ closedSend = true;
+ try {
+ in.close();
+ closedIn = true;
+ } catch (IOException e) {
+ Core.logger.log(this, "Caught "+e+" closing input (successful): "+this,
+ Logger.NORMAL);
+ }
+ if(moved == length) result = Presentation.CB_OK;
+ n.diagnostics.occurrenceBinomial("sentData", 1,
+ result == Presentation.CB_OK ?
+ 1 : 0);
+ if(!silent) n.schedule(new DataSent(this));
+ return null;
+ };
+
+// try {
+// while (moved < length) {
+// inWrite = false;
+// if (result != -1) throw new CancelledIOException();
+// int m = in.read(buffer, 0, (int) Math.min(length - moved,
buffer.length));
+// if (m == -1) {
+// throw new IOException("Stopped short of full transfer");
+// }
+// inWrite = true;
+// if(logDEBUG)
+// n.logger.log(this, "Read "+(moved+m)+" of "+length+
+// " bytes for "+Long.toHexString(parent)+" ("+in+")",
+// Logger.DEBUG);
+// send.write(buffer, 0, m);
+// moved += m;
+// if(logDEBUG)
+// n.logger.log(this, "Moved "+moved+" of "+length+" bytes for "+
+// Long.toHexString(parent)+" ("+in+")",
+// Logger.DEBUG);
+// }
+// send.close();
+// closedSend = true;
+// result = Presentation.CB_OK;
+// } catch (CancelledIOException e) {
+// if(logDEBUG)
+// n.logger.log(this, "Cancelled IO: "+abortedException+" for "+
+// Long.toHexString(parent), abortedException,
+// Logger.DEBUG);
+// } catch (IOException e) {
+// // if it was aborted we can expect the aborter to set the failure
+// // code.
+// result = (inWrite ? Presentation.CB_SEND_CONN_DIED
+// : in.getFailureCode());
+// if (result == -1) // it broke some time between writing and reading
+// {
+// if(logDEBUG) n.logger.log(this, "Cache failed between writing "+
+// "and reading for "+Long.toHexString(id),
+// e, Logger.DEBUG);
+// result = Presentation.CB_CACHE_FAILED;
+// }
+// if (result == Presentation.CB_CACHE_FAILED) {
+// Core.logger.log(this,
+// "Cache failed signalled after exception " +
+// "after " + moved + " of " + length
+// + " bytes: "+e+" for "+Long.toHexString(id)+
+// " ("+Long.toHexString(parent)+".", e ,
+// Logger.ERROR);
+// }
+// } finally {
- buffer = null;
-
- try {
- in.close();
- closedIn = true;
- }
- catch (IOException e) {
- n.logger.log(this, "I/O error closing KeyInputStream",
- e, Logger.ERROR);
- }
-
- if (moved < length && !inWrite) {
- try {
- // pad until end of part
- long tmpLen = partSize + Key.getControlLength();
- tmpLen = Math.min(tmpLen - moved % tmpLen, length - moved) - 1;
+// n.diagnostics.occurrenceBinomial("sentData", 1,
+// result == Presentation.CB_OK ?
+// 1 : 0);
+
+// if(result != Presentation.CB_OK) {
+// if(abortedException != null) {
+// Core.logger.log(this, "Send aborted for "+Long.toHexString(id)+
+// " ("+Long.toHexString(parent)+" - result="+
+// Long.toHexString(result),
+// abortedException, Logger.MINOR);
+// abortedException = null;
+// } else {
+// Core.logger.log(this, "Send failed for "+Long.toHexString(id)+
+// " ("+Long.toHexString(parent)+" - result="+
+// Long.toHexString(result), Logger.MINOR);
+// }
+// }
+
+// buffer = null;
+
+// try {
+// in.close();
+// closedIn = true;
+// } catch (IOException e) {
+// n.logger.log(this, "I/O error closing KeyInputStream",
+// e, Logger.ERROR);
+// }
+
+// if (moved < length && !inWrite) {
+// try {
+// // pad until end of part
+// long tmpLen = partSize + Key.getControlLength();
+// tmpLen = Math.min(tmpLen - moved % tmpLen, length - moved) - 1;
- byte[] zeroes = new byte[Core.blockSize];
- while (tmpLen > 0) {
- int m = (int) Math.min(tmpLen, zeroes.length);
- send.write(zeroes, 0, m);
- tmpLen -= m;
- }
- send.write(result == Presentation.CB_ABORTED ?
Presentation.CB_ABORTED
- :
Presentation.CB_RESTARTED);
- }
- catch (IOException e) {
- // this is important b/c it lets the parent chain know that
- // it shouldn't try to get back in touch with the upstream
- result = Presentation.CB_SEND_CONN_DIED;
- }
- }
+// byte[] zeroes = new byte[Core.blockSize];
+// while (tmpLen > 0) {
+// int m = (int) Math.min(tmpLen, zeroes.length);
+// send.write(zeroes, 0, m);
+// tmpLen -= m;
+// }
+// send.write(result == Presentation.CB_ABORTED ?
Presentation.CB_ABORTED
+// : Presentation.CB_RESTARTED);
+// } catch (IOException e) {
+// // this is important b/c it lets the parent chain know that
+// // it shouldn't try to get back in touch with the upstream
+// result = Presentation.CB_SEND_CONN_DIED;
+// }
+// }
- if (result != Presentation.CB_OK) {
- try {
- send.close();
- closedSend = true;
- }
- catch (IOException e) {
- n.logger.log(this, "I/O error closing data send stream",
- e, Logger.MINOR);
- }
- }
+// if (result != Presentation.CB_OK) {
+// try {
+// send.close();
+// closedSend = true;
+// }
+// catch (IOException e) {
+// n.logger.log(this, "I/O error closing data send stream",
+// e, Logger.MINOR);
+// }
+// }
- // we could be exiting with an uncaught exception or something..
- if (result == -1) result = Presentation.CB_SEND_CONN_DIED;
+// // we could be exiting with an uncaught exception or something..
+// if (result == -1) result = Presentation.CB_SEND_CONN_DIED;
- // had to wait around to see if we'd get a CB_SEND_CONN_DIED
- // when padding the write
- if (!silent) n.schedule(new DataSent(this));
- }
+// // had to wait around to see if we'd get a CB_SEND_CONN_DIED
+// // when padding the write
+// if (!silent) n.schedule(new DataSent(this));
+// }
- return null;
- }
+// return null;
+// }
}
_______________________________________________
cvs mailing list
[EMAIL PROTECTED]
http://dodo.freenetproject.org/cgi-bin/mailman/listinfo/cvs