Author: toad
Date: 2006-02-22 21:12:38 +0000 (Wed, 22 Feb 2006)
New Revision: 8113
Added:
trunk/freenet/src/freenet/node/fcp/ListPersistentRequestsMessage.java
trunk/freenet/src/freenet/node/fcp/PersistentGet.java
trunk/freenet/src/freenet/node/fcp/PersistentPut.java
trunk/freenet/src/freenet/node/fcp/RemovePersistentRequest.java
Modified:
trunk/freenet/src/freenet/node/Version.java
trunk/freenet/src/freenet/node/fcp/AllDataMessage.java
trunk/freenet/src/freenet/node/fcp/ClientGet.java
trunk/freenet/src/freenet/node/fcp/ClientGetMessage.java
trunk/freenet/src/freenet/node/fcp/ClientHelloMessage.java
trunk/freenet/src/freenet/node/fcp/ClientPut.java
trunk/freenet/src/freenet/node/fcp/ClientPutMessage.java
trunk/freenet/src/freenet/node/fcp/ClientRequest.java
trunk/freenet/src/freenet/node/fcp/DataCarryingMessage.java
trunk/freenet/src/freenet/node/fcp/DataFoundMessage.java
trunk/freenet/src/freenet/node/fcp/FCPClient.java
trunk/freenet/src/freenet/node/fcp/FCPConnectionHandler.java
trunk/freenet/src/freenet/node/fcp/FCPConnectionInputHandler.java
trunk/freenet/src/freenet/node/fcp/FCPMessage.java
trunk/freenet/src/freenet/node/fcp/FCPServer.java
trunk/freenet/src/freenet/node/fcp/GetFailedMessage.java
trunk/freenet/src/freenet/node/fcp/ProtocolErrorMessage.java
trunk/freenet/src/freenet/support/LRUQueue.java
Log:
468: Persistent (not across node restarts) requests and inserts implemented and
apparently working.
Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2006-02-22 20:35:17 UTC (rev
8112)
+++ trunk/freenet/src/freenet/node/Version.java 2006-02-22 21:12:38 UTC (rev
8113)
@@ -20,7 +20,7 @@
public static final String protocolVersion = "1.0";
/** The build number of the current revision */
- private static final int buildNumber = 467;
+ private static final int buildNumber = 468;
/** Oldest build of Fred we will talk to */
private static final int lastGoodBuild = 403;
Modified: trunk/freenet/src/freenet/node/fcp/AllDataMessage.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/AllDataMessage.java 2006-02-22
20:35:17 UTC (rev 8112)
+++ trunk/freenet/src/freenet/node/fcp/AllDataMessage.java 2006-02-22
21:12:38 UTC (rev 8113)
@@ -14,7 +14,7 @@
final long dataLength;
final String identifier;
- public AllDataMessage(FCPConnectionHandler handler, Bucket bucket,
String identifier) {
+ public AllDataMessage(Bucket bucket, String identifier) {
this.bucket = bucket;
this.dataLength = bucket.size();
this.identifier = identifier;
Modified: trunk/freenet/src/freenet/node/fcp/ClientGet.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientGet.java 2006-02-22 20:35:17 UTC
(rev 8112)
+++ trunk/freenet/src/freenet/node/fcp/ClientGet.java 2006-02-22 21:12:38 UTC
(rev 8113)
@@ -30,24 +30,40 @@
private final FetcherContext fctx;
private final String identifier;
private final int verbosity;
- private final FCPConnectionHandler handler;
+ /** Original FCPConnectionHandler. Null if persistence != connection */
+ private final FCPConnectionHandler origHandler;
+ private final FCPClient client;
private final ClientGetter getter;
private final short priorityClass;
private final short returnType;
+ private final short persistenceType;
+ /** Has the request finished? */
private boolean finished;
private final File targetFile;
private final File tempFile;
+ final String clientToken;
// Verbosity bitmasks
private int VERBOSITY_SPLITFILE_PROGRESS = 1;
+ // Stuff waiting for reconnection
+ private FCPMessage dataFoundOrGetFailedPending;
+ private AllDataMessage allDataPending;
+ private SimpleProgressMessage progressPending;
+
public ClientGet(FCPConnectionHandler handler, ClientGetMessage
message) {
uri = message.uri;
+ clientToken = message.clientToken;
// FIXME
this.priorityClass = message.priorityClass;
+ this.persistenceType = message.persistenceType;
// Create a Fetcher directly in order to get more fine-grained
control,
// since the client may override a few context elements.
- this.handler = handler;
+ if(persistenceType == PERSIST_CONNECTION)
+ this.origHandler = handler;
+ else
+ origHandler = null;
+ this.client = handler.getClient();
fctx = new FetcherContext(handler.defaultFetchContext,
FetcherContext.IDENTICAL_MASK);
fctx.eventProducer.addEventListener(this);
// ignoreDS
@@ -75,25 +91,36 @@
}
}
+ public void onLostConnection() {
+ if(persistenceType == PERSIST_CONNECTION)
+ cancel();
+ // Otherwise ignore
+ }
+
public void cancel() {
getter.cancel();
}
public void onSuccess(FetchResult result, ClientGetter state) {
+ progressPending = null;
finished = true;
- FCPMessage msg = new DataFoundMessage(handler, result,
identifier);
+ FCPMessage msg = new DataFoundMessage(result, identifier);
Bucket data = result.asBucket();
if(returnType == ClientGetMessage.RETURN_TYPE_DIRECT) {
// Send all the data at once
// FIXME there should be other options
- handler.outputHandler.queue(msg);
- msg = new AllDataMessage(handler, data, identifier);
- handler.outputHandler.queue(msg);
- return; // don't delete the bucket yet
+ trySendDataFoundOrGetFailed(msg);
+ AllDataMessage m = new AllDataMessage(data, identifier);
+ if(persistenceType == PERSIST_CONNECTION)
+ m.setFreeOnSent();
+ trySendAllDataMessage(m);
+ finish();
+ return;
} else if(returnType == ClientGetMessage.RETURN_TYPE_NONE) {
// Do nothing
- handler.outputHandler.queue(msg);
+ trySendDataFoundOrGetFailed(msg);
data.free();
+ finish();
return;
} else if(returnType == ClientGetMessage.RETURN_TYPE_DISK) {
// Write to temp file, then rename over filename
@@ -102,21 +129,23 @@
fos = new FileOutputStream(tempFile);
} catch (FileNotFoundException e) {
ProtocolErrorMessage pm = new
ProtocolErrorMessage(ProtocolErrorMessage.COULD_NOT_WRITE_FILE, false, null,
identifier);
- handler.outputHandler.queue(pm);
+ trySendDataFoundOrGetFailed(pm);
data.free();
+ finish();
return;
}
try {
BucketTools.copyTo(data, fos, data.size());
} catch (IOException e) {
ProtocolErrorMessage pm = new
ProtocolErrorMessage(ProtocolErrorMessage.COULD_NOT_WRITE_FILE, false, null,
identifier);
- handler.outputHandler.queue(pm);
+ trySendDataFoundOrGetFailed(pm);
data.free();
try {
fos.close();
} catch (IOException e1) {
// Ignore
}
+ finish();
return;
}
try {
@@ -126,22 +155,77 @@
}
if(!tempFile.renameTo(targetFile)) {
ProtocolErrorMessage pm = new
ProtocolErrorMessage(ProtocolErrorMessage.COULD_NOT_RENAME_FILE, false, null,
identifier);
- handler.outputHandler.queue(pm);
+ trySendDataFoundOrGetFailed(pm);
// Don't delete temp file, user might want it.
}
data.free();
- handler.outputHandler.queue(msg);
+ trySendDataFoundOrGetFailed(msg);
+ finish();
return;
}
}
+ private void trySendDataFoundOrGetFailed(FCPMessage msg) {
+ if(persistenceType != ClientRequest.PERSIST_CONNECTION) {
+ dataFoundOrGetFailedPending = msg;
+ }
+ FCPConnectionHandler conn = client.getConnection();
+ if(conn != null)
+ conn.outputHandler.queue(msg);
+ }
+
+ private void trySendAllDataMessage(AllDataMessage msg) {
+ if(persistenceType != ClientRequest.PERSIST_CONNECTION) {
+ allDataPending = msg;
+ }
+ FCPConnectionHandler conn = client.getConnection();
+ if(conn != null)
+ conn.outputHandler.queue(msg);
+ }
+
+ private void trySendProgress(SimpleProgressMessage msg) {
+ if(persistenceType != ClientRequest.PERSIST_CONNECTION) {
+ progressPending = msg;
+ }
+ FCPConnectionHandler conn = client.getConnection();
+ if(conn != null)
+ conn.outputHandler.queue(msg);
+ }
+
+ public void sendPendingMessages(FCPConnectionOutputHandler handler,
boolean includePersistentRequest) {
+ if(persistenceType == ClientRequest.PERSIST_CONNECTION) {
+ Logger.error(this, "WTF?
persistenceType="+persistenceType, new Exception("error"));
+ return;
+ }
+ if(includePersistentRequest) {
+ FCPMessage msg = new PersistentGet(identifier, uri,
verbosity, priorityClass, returnType, persistenceType, targetFile, tempFile,
clientToken);
+ handler.queue(msg);
+ }
+ if(progressPending != null)
+ handler.queue(progressPending);
+ if(dataFoundOrGetFailedPending != null)
+ handler.queue(dataFoundOrGetFailedPending);
+ if(allDataPending != null)
+ handler.queue(allDataPending);
+ }
+
public void onFailure(FetchException e, ClientGetter state) {
finished = true;
Logger.minor(this, "Caught "+e, e);
- FCPMessage msg = new GetFailedMessage(handler, e, identifier);
- handler.outputHandler.queue(msg);
+ FCPMessage msg = new GetFailedMessage(e, identifier);
+ trySendDataFoundOrGetFailed(msg);
+ finish();
}
+ /** Request completed. But we may have to stick around until we are
acked. */
+ private void finish() {
+ if(persistenceType == ClientRequest.PERSIST_CONNECTION) {
+ origHandler.finishedClientRequest(this);
+ } else {
+ client.finishedClientRequest(this);
+ }
+ }
+
public void onSuccess(BaseClientPutter state) {
// Ignore
}
@@ -161,7 +245,21 @@
return;
SimpleProgressMessage progress =
new SimpleProgressMessage(identifier,
(SplitfileProgressEvent)ce);
- handler.outputHandler.queue(progress);
+ trySendProgress(progress);
}
+ public boolean isPersistent() {
+ return persistenceType != ClientRequest.PERSIST_CONNECTION;
+ }
+
+ public void dropped() {
+ cancel();
+ if(allDataPending != null)
+ allDataPending.bucket.free();
+ }
+
+ public String getIdentifier() {
+ return identifier;
+ }
+
}
Modified: trunk/freenet/src/freenet/node/fcp/ClientGetMessage.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientGetMessage.java 2006-02-22
20:35:17 UTC (rev 8112)
+++ trunk/freenet/src/freenet/node/fcp/ClientGetMessage.java 2006-02-22
21:12:38 UTC (rev 8113)
@@ -8,6 +8,7 @@
import freenet.node.Node;
import freenet.node.RequestStarter;
import freenet.support.Fields;
+import freenet.support.Logger;
import freenet.support.SimpleFieldSet;
/**
@@ -26,6 +27,10 @@
* MaxTempSize=1000 // maximum size of intermediary data
* MaxRetries=100 // automatic retry supported as an option
* PriorityClass=1 // priority class 1 = interactive
+ * Persistence=reboot // continue until node is restarted; report progress
while client is
+ * connected, including if it reconnects after losing connection
+ * ClientToken=hello // returned in PersistentGet, a hint to the client, so
the client
+ * doesn't need to maintain its own state
* EndMessage
*/
public class ClientGetMessage extends FCPMessage {
@@ -37,12 +42,14 @@
final String identifier;
final int verbosity;
final short returnType;
+ final short persistenceType;
final long maxSize;
final long maxTempSize;
final int maxRetries;
final short priorityClass;
final File diskFile;
final File tempFile;
+ final String clientToken;
// FIXME move these to the actual getter process
static final short RETURN_TYPE_DIRECT = 0; // over FCP
@@ -52,6 +59,7 @@
public ClientGetMessage(SimpleFieldSet fs) throws
MessageInvalidException {
short defaultPriority;
+ clientToken = fs.get("ClientToken");
ignoreDS = Fields.stringToBool(fs.get("IgnoreDS"), false);
dsOnly = Fields.stringToBool(fs.get("DSOnly"), false);
identifier = fs.get("Identifier");
@@ -140,6 +148,7 @@
throw new
MessageInvalidException(ProtocolErrorMessage.ERROR_PARSING_NUMBER, "Error
parsing MaxSize field: "+e.getMessage(), identifier);
}
}
+ Logger.minor(this, "max retries="+maxRetries);
String priorityString = fs.get("PriorityClass");
if(priorityString == null) {
// defaults to the one just below fproxy
@@ -153,6 +162,20 @@
throw new
MessageInvalidException(ProtocolErrorMessage.ERROR_PARSING_NUMBER, "Error
parsing PriorityClass field: "+e.getMessage(), identifier);
}
}
+ String persistenceString = fs.get("Persistence");
+ if(persistenceString == null ||
persistenceString.equalsIgnoreCase("connection")) {
+ // Default: persists until connection loss.
+ persistenceType = ClientRequest.PERSIST_CONNECTION;
+ } else if(persistenceString.equalsIgnoreCase("reboot")) {
+ // Reports to client by name; persists over connection
loss.
+ // Not saved to disk, so dies on reboot.
+ persistenceType = ClientRequest.PERSIST_REBOOT;
+ } else if(persistenceString.equalsIgnoreCase("forever")) {
+ // Same as reboot but saved to disk, persists forever.
+ persistenceType = ClientRequest.PERSIST_FOREVER;
+ } else {
+ throw new
MessageInvalidException(ProtocolErrorMessage.ERROR_PARSING_NUMBER, "Error
parsing Persistence field: "+persistenceString, identifier);
+ }
}
public SimpleFieldSet getFieldSet() {
@@ -183,4 +206,19 @@
handler.startClientGet(this);
}
+ public static String returnTypeString(short type) {
+ switch(type) {
+ case RETURN_TYPE_DIRECT:
+ return "direct";
+ case RETURN_TYPE_NONE:
+ return "none";
+ case RETURN_TYPE_DISK:
+ return "disk";
+ case RETURN_TYPE_CHUNKED:
+ return "chunked";
+ default:
+ return Short.toString(type);
+ }
+ }
+
}
Modified: trunk/freenet/src/freenet/node/fcp/ClientHelloMessage.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientHelloMessage.java 2006-02-22
20:35:17 UTC (rev 8112)
+++ trunk/freenet/src/freenet/node/fcp/ClientHelloMessage.java 2006-02-22
21:12:38 UTC (rev 8113)
@@ -38,9 +38,9 @@
public void run(FCPConnectionHandler handler, Node node) {
// We know the Hello is valid.
- handler.setClientName(clientName);
FCPMessage msg = new NodeHelloMessage(node);
handler.outputHandler.queue(msg);
+ handler.setClientName(clientName);
}
}
Modified: trunk/freenet/src/freenet/node/fcp/ClientPut.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientPut.java 2006-02-22 20:35:17 UTC
(rev 8112)
+++ trunk/freenet/src/freenet/node/fcp/ClientPut.java 2006-02-22 21:12:38 UTC
(rev 8113)
@@ -1,5 +1,7 @@
package freenet.node.fcp;
+import java.io.File;
+
import freenet.client.ClientMetadata;
import freenet.client.FetchException;
import freenet.client.FetchResult;
@@ -25,23 +27,47 @@
final ClientPutter inserter;
final InserterContext ctx;
final InsertBlock block;
- final FCPConnectionHandler handler;
+ /** Original FCP connection handler. Null if persistence != connection.
*/
+ final FCPConnectionHandler origHandler;
+ /** Client originating this request */
+ final FCPClient client;
final String identifier;
final boolean getCHKOnly;
final short priorityClass;
+ private final short persistenceType;
final int verbosity;
+ /** Has the request finished? */
private boolean finished;
+ /** Client token - opaque string returned to client in PersistentPut */
+ private final String clientToken;
+ /** Was this from disk? Purely for PersistentPut */
+ private final boolean fromDisk;
+ /** Original filename if from disk, otherwise null. Purely for
PersistentPut. */
+ private final File origFilename;
// Verbosity bitmasks
private int VERBOSITY_SPLITFILE_PROGRESS = 1;
private int VERBOSITY_COMPRESSION_START_END = 512;
+ // Stuff waiting for reconnection
+ private FCPMessage finalMessage;
+ private FCPMessage generatedURIMessage;
+ // This could be a SimpleProgress, or it could be started/finished
compression
+ private FCPMessage progressMessage;
+
public ClientPut(FCPConnectionHandler handler, ClientPutMessage
message) {
this.verbosity = message.verbosity;
- this.handler = handler;
this.identifier = message.identifier;
this.getCHKOnly = message.getCHKOnly;
this.priorityClass = message.priorityClass;
+ this.persistenceType = message.persistenceType;
+ this.fromDisk = message.fromDisk;
+ this.origFilename = message.origFilename;
+ if(persistenceType == PERSIST_CONNECTION)
+ this.origHandler = handler;
+ else
+ this.origHandler = null;
+ client = handler.getClient();
ctx = new InserterContext(handler.defaultInsertContext, new
SimpleEventProducer());
if(message.dontCompress)
ctx.dontCompress = true;
@@ -50,6 +76,7 @@
// Now go through the fields one at a time
uri = message.uri;
String mimeType = message.contentType;
+ clientToken = message.clientToken;
block = new InsertBlock(message.bucket, new
ClientMetadata(mimeType), uri);
inserter = new ClientPutter(this, message.bucket, uri, new
ClientMetadata(mimeType), ctx, handler.node.putScheduler, priorityClass,
getCHKOnly, false, handler);
}
@@ -62,25 +89,35 @@
}
}
+ public void onLostConnection() {
+ if(persistenceType == PERSIST_CONNECTION)
+ cancel();
+ // otherwise ignore
+ }
+
public void cancel() {
inserter.cancel();
}
public void onSuccess(BaseClientPutter state) {
+ progressMessage = null;
finished = true;
FCPMessage msg = new PutSuccessfulMessage(identifier,
state.getURI());
- handler.outputHandler.queue(msg);
+ trySendFinalMessage(msg);
+ block.getData().free();
+ finish();
}
public void onFailure(InserterException e, BaseClientPutter state) {
finished = true;
FCPMessage msg = new PutFailedMessage(e, identifier);
- handler.outputHandler.queue(msg);
+ trySendFinalMessage(msg);
+ block.getData().free();
+ finish();
}
public void onGeneratedURI(FreenetURI uri, BaseClientPutter state) {
- FCPMessage msg = new URIGeneratedMessage(uri, identifier);
- handler.outputHandler.queue(msg);
+ trySendGeneratedURIMessage(new URIGeneratedMessage(uri,
identifier));
}
public void onSuccess(FetchResult result, ClientGetter state) {
@@ -97,21 +134,84 @@
if((verbosity & VERBOSITY_SPLITFILE_PROGRESS) ==
VERBOSITY_SPLITFILE_PROGRESS) {
SimpleProgressMessage progress =
new SimpleProgressMessage(identifier,
(SplitfileProgressEvent)ce);
- handler.outputHandler.queue(progress);
+ trySendProgressMessage(progress);
}
} else if(ce instanceof StartedCompressionEvent) {
if((verbosity & VERBOSITY_COMPRESSION_START_END) ==
VERBOSITY_COMPRESSION_START_END) {
StartedCompressionMessage msg =
new
StartedCompressionMessage(identifier, ((StartedCompressionEvent)ce).codec);
- handler.outputHandler.queue(msg);
+ trySendProgressMessage(msg);
}
} else if(ce instanceof FinishedCompressionEvent) {
if((verbosity & VERBOSITY_COMPRESSION_START_END) ==
VERBOSITY_COMPRESSION_START_END) {
FinishedCompressionMessage msg =
new
FinishedCompressionMessage(identifier, (FinishedCompressionEvent)ce);
- handler.outputHandler.queue(msg);
+ trySendProgressMessage(msg);
}
}
}
+
+ private void trySendFinalMessage(FCPMessage msg) {
+ if(persistenceType != PERSIST_CONNECTION)
+ finalMessage = msg;
+ FCPConnectionHandler conn = client.getConnection();
+ if(conn != null)
+ conn.outputHandler.queue(msg);
+ }
+ private void trySendGeneratedURIMessage(URIGeneratedMessage msg) {
+ if(persistenceType != PERSIST_CONNECTION)
+ generatedURIMessage = msg;
+ FCPConnectionHandler conn = client.getConnection();
+ if(conn != null)
+ conn.outputHandler.queue(msg);
+ }
+
+ private void trySendProgressMessage(FCPMessage msg) {
+ if(persistenceType != PERSIST_CONNECTION)
+ progressMessage = msg;
+ FCPConnectionHandler conn = client.getConnection();
+ if(conn != null)
+ conn.outputHandler.queue(msg);
+ }
+
+ public void sendPendingMessages(FCPConnectionOutputHandler handler,
boolean includePersistentRequest) {
+ if(persistenceType == PERSIST_CONNECTION) {
+ Logger.error(this, "WTF?
persistenceType="+persistenceType, new Exception("error"));
+ return;
+ }
+ if(includePersistentRequest) {
+ FCPMessage msg = new PersistentPut(identifier, uri,
verbosity, priorityClass, fromDisk, persistenceType, origFilename,
block.clientMetadata.getMIMEType());
+ handler.queue(msg);
+ }
+ if(generatedURIMessage != null)
+ handler.queue(generatedURIMessage);
+ if(progressMessage != null)
+ handler.queue(progressMessage);
+ if(finalMessage != null)
+ handler.queue(finalMessage);
+ }
+
+ /** Request completed. But we may have to stick around until we are
acked. */
+ private void finish() {
+ if(persistenceType == ClientRequest.PERSIST_CONNECTION) {
+ origHandler.finishedClientRequest(this);
+ } else {
+ client.finishedClientRequest(this);
+ }
+ }
+
+ public boolean isPersistent() {
+ return persistenceType != ClientRequest.PERSIST_CONNECTION;
+ }
+
+ public void dropped() {
+ cancel();
+ block.getData().free();
+ }
+
+ public String getIdentifier() {
+ return identifier;
+ }
+
}
Modified: trunk/freenet/src/freenet/node/fcp/ClientPutMessage.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientPutMessage.java 2006-02-22
20:35:17 UTC (rev 8112)
+++ trunk/freenet/src/freenet/node/fcp/ClientPutMessage.java 2006-02-22
21:12:38 UTC (rev 8113)
@@ -42,8 +42,11 @@
final int maxRetries;
final boolean getCHKOnly;
final short priorityClass;
+ final short persistenceType;
final boolean fromDisk;
final boolean dontCompress;
+ final String clientToken;
+ final File origFilename;
public ClientPutMessage(SimpleFieldSet fs) throws
MessageInvalidException {
identifier = fs.get("Identifier");
@@ -105,6 +108,7 @@
dataLength = f.length();
FileBucket fileBucket = new FileBucket(f, true, false,
false);
this.bucket = fileBucket;
+ this.origFilename = f;
} else {
fromDisk = false;
String dataLengthString = fs.get("DataLength");
@@ -115,8 +119,24 @@
} catch (NumberFormatException e) {
throw new
MessageInvalidException(ProtocolErrorMessage.ERROR_PARSING_NUMBER, "Error
parsing DataLength field: "+e.getMessage(), identifier);
}
+ this.origFilename = null;
}
dontCompress = Fields.stringToBool(fs.get("DontCompress"),
false);
+ String persistenceString = fs.get("Persistence");
+ if(persistenceString == null ||
persistenceString.equalsIgnoreCase("connection")) {
+ // Default: persists until connection loss.
+ persistenceType = ClientRequest.PERSIST_CONNECTION;
+ } else if(persistenceString.equalsIgnoreCase("reboot")) {
+ // Reports to client by name; persists over connection
loss.
+ // Not saved to disk, so dies on reboot.
+ persistenceType = ClientRequest.PERSIST_REBOOT;
+ } else if(persistenceString.equalsIgnoreCase("forever")) {
+ // Same as reboot but saved to disk, persists forever.
+ persistenceType = ClientRequest.PERSIST_FOREVER;
+ } else {
+ throw new
MessageInvalidException(ProtocolErrorMessage.ERROR_PARSING_NUMBER, "Error
parsing Persistence field: "+persistenceString, identifier);
+ }
+ clientToken = fs.get("ClientToken");
}
public SimpleFieldSet getFieldSet() {
Modified: trunk/freenet/src/freenet/node/fcp/ClientRequest.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientRequest.java 2006-02-22
20:35:17 UTC (rev 8112)
+++ trunk/freenet/src/freenet/node/fcp/ClientRequest.java 2006-02-22
21:12:38 UTC (rev 8113)
@@ -6,7 +6,38 @@
*/
public abstract class ClientRequest {
- /** Cancel */
- public abstract void cancel();
+ /** Lost connection */
+ public abstract void onLostConnection();
+
+ /** Is the request persistent? False = we can drop the request if we
lose the connection */
+ public abstract boolean isPersistent();
+ /** Completed request dropped off the end without being acknowledged */
+ public abstract void dropped();
+
+ /** Get identifier string for request */
+ public abstract String getIdentifier();
+
+ /** Send any pending messages for a persistent request e.g. after
reconnecting */
+ public abstract void sendPendingMessages(FCPConnectionOutputHandler
handler, boolean includePersistentRequest);
+
+ // Persistence
+
+ static final short PERSIST_CONNECTION = 0;
+ static final short PERSIST_REBOOT = 1;
+ static final short PERSIST_FOREVER = 2;
+
+ public static String persistenceTypeString(short type) {
+ switch(type) {
+ case PERSIST_CONNECTION:
+ return "connection";
+ case PERSIST_REBOOT:
+ return "reboot";
+ case PERSIST_FOREVER:
+ return "forever";
+ default:
+ return Short.toString(type);
+ }
+ }
+
}
Modified: trunk/freenet/src/freenet/node/fcp/DataCarryingMessage.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/DataCarryingMessage.java 2006-02-22
20:35:17 UTC (rev 8112)
+++ trunk/freenet/src/freenet/node/fcp/DataCarryingMessage.java 2006-02-22
21:12:38 UTC (rev 8113)
@@ -14,6 +14,12 @@
protected Bucket bucket;
abstract long dataLength();
+
+ protected boolean freeOnSent;
+
+ void setFreeOnSent() {
+ freeOnSent = true;
+ }
public void readFrom(InputStream is, BucketFactory bf) throws
IOException {
long len = dataLength();
@@ -28,6 +34,7 @@
public void send(OutputStream os) throws IOException {
super.send(os);
BucketTools.copyTo(bucket, os, dataLength());
+ if(freeOnSent) bucket.free();
}
String getEndString() {
Modified: trunk/freenet/src/freenet/node/fcp/DataFoundMessage.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/DataFoundMessage.java 2006-02-22
20:35:17 UTC (rev 8112)
+++ trunk/freenet/src/freenet/node/fcp/DataFoundMessage.java 2006-02-22
21:12:38 UTC (rev 8113)
@@ -10,7 +10,7 @@
final String mimeType;
final long dataLength;
- public DataFoundMessage(FCPConnectionHandler handler, FetchResult fr,
String identifier) {
+ public DataFoundMessage(FetchResult fr, String identifier) {
this.identifier = identifier;
this.mimeType = fr.getMimeType();
this.dataLength = fr.size();
Modified: trunk/freenet/src/freenet/node/fcp/FCPClient.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/FCPClient.java 2006-02-22 20:35:17 UTC
(rev 8112)
+++ trunk/freenet/src/freenet/node/fcp/FCPClient.java 2006-02-22 21:12:38 UTC
(rev 8113)
@@ -1,27 +1,111 @@
package freenet.node.fcp;
+import java.util.HashMap;
+import java.util.HashSet;
+
+import freenet.support.LRUQueue;
+
/**
* An FCP client.
* Identified by its Name which is sent on connection.
*/
public class FCPClient {
+ /** Maximum number of unacknowledged completed requests */
+ private static final int MAX_UNACKED_REQUESTS = 256;
+
public FCPClient(String name2, FCPConnectionHandler handler) {
this.name = name2;
this.currentConnection = handler;
+ this.runningPersistentRequests = new HashSet();
+ this.completedUnackedRequests = new LRUQueue();
+ this.clientRequestsByIdentifier = new HashMap();
}
/** The client's Name sent in the ClientHello message */
final String name;
/** The current connection handler, if any. */
private FCPConnectionHandler currentConnection;
+ /** Currently running persistent requests */
+ private final HashSet runningPersistentRequests;
+ /** Completed unacknowledged persistent requests */
+ private final LRUQueue completedUnackedRequests;
+ /** ClientRequest's by identifier */
+ private final HashMap clientRequestsByIdentifier;
- public FCPConnectionHandler getConnection() {
+ public synchronized FCPConnectionHandler getConnection() {
return currentConnection;
}
-
- public void setConnection(FCPConnectionHandler handler) {
+
+ public synchronized void setConnection(FCPConnectionHandler handler) {
this.currentConnection = handler;
}
+
+ public synchronized void onLostConnection(FCPConnectionHandler handler)
{
+ if(currentConnection == handler)
+ currentConnection = null;
+ }
+
+ /**
+ * Called when a client request has finished, but is persistent. It has
not been
+ * acked yet, so it should be moved to the unacked-completed-requests
set.
+ */
+ public void finishedClientRequest(ClientRequest get) {
+ ClientRequest dropped = null;
+ synchronized(this) {
+ runningPersistentRequests.remove(get);
+ completedUnackedRequests.push(get);
+
+ if(completedUnackedRequests.size() >
MAX_UNACKED_REQUESTS) {
+ dropped = (ClientRequest)
completedUnackedRequests.pop();
+ }
+ }
+ if(dropped != null)
+ dropped.dropped();
+ }
+
+ /**
+ * Queue any and all pending messages from already completed,
unacknowledged, persistent
+ * requests, to be immediately sent. This happens automatically on
startup and hopefully
+ * will encourage clients to acknowledge persistent requests!
+ */
+ public void
queuePendingMessagesOnConnectionRestart(FCPConnectionOutputHandler
outputHandler) {
+ Object[] reqs;
+ synchronized(this) {
+ reqs = completedUnackedRequests.toArray();
+ }
+ for(int i=0;i<reqs.length;i++)
+
((ClientRequest)reqs[i]).sendPendingMessages(outputHandler, true);
+ }
+ /**
+ * Queue any and all pending messages from running requests. Happens on
demand.
+ */
+ public void
queuePendingMessagesFromRunningRequests(FCPConnectionOutputHandler
outputHandler) {
+ Object[] reqs;
+ synchronized(this) {
+ reqs = runningPersistentRequests.toArray();
+ }
+ for(int i=0;i<reqs.length;i++)
+
((ClientRequest)reqs[i]).sendPendingMessages(outputHandler, true);
+ }
+
+ public void register(ClientRequest cg) {
+ synchronized(this) {
+ runningPersistentRequests.add(cg);
+ clientRequestsByIdentifier.put(cg.getIdentifier(), cg);
+ }
+ }
+
+ public void removeByIdentifier(String identifier) throws
MessageInvalidException {
+ synchronized(this) {
+ ClientRequest req = (ClientRequest)
clientRequestsByIdentifier.get(identifier);
+ if(req == null)
+ throw new
MessageInvalidException(ProtocolErrorMessage.NO_SUCH_IDENTIFIER, null,
identifier);
+ if(runningPersistentRequests.remove(req) ||
completedUnackedRequests.remove(req))
+ return;
+ throw new
MessageInvalidException(ProtocolErrorMessage.NO_SUCH_IDENTIFIER, null,
identifier);
+ }
+ }
+
}
Modified: trunk/freenet/src/freenet/node/fcp/FCPConnectionHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/FCPConnectionHandler.java
2006-02-22 20:35:17 UTC (rev 8112)
+++ trunk/freenet/src/freenet/node/fcp/FCPConnectionHandler.java
2006-02-22 21:12:38 UTC (rev 8113)
@@ -45,13 +45,15 @@
public void close() {
ClientRequest[] requests;
+ if(client != null)
+ client.onLostConnection(this);
synchronized(this) {
isClosed = true;
requests = new
ClientRequest[requestsByIdentifier.size()];
requests = (ClientRequest[])
requestsByIdentifier.values().toArray(requests);
}
for(int i=0;i<requests.length;i++)
- requests[i].cancel();
+ requests[i].onLostConnection();
}
public boolean isClosed() {
@@ -120,6 +122,8 @@
return;
} else {
cg.start();
+ if(cg.isPersistent())
+ client.register(cg);
}
}
@@ -142,7 +146,19 @@
return;
} else {
cp.start();
+ if(cp.isPersistent())
+ client.register(cp);
}
}
+
+ public FCPClient getClient() {
+ return client;
+ }
+
+ public void finishedClientRequest(ClientRequest get) {
+ synchronized(this) {
+ requestsByIdentifier.remove(get.getIdentifier());
+ }
+ }
}
Modified: trunk/freenet/src/freenet/node/fcp/FCPConnectionInputHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/FCPConnectionInputHandler.java
2006-02-22 20:35:17 UTC (rev 8112)
+++ trunk/freenet/src/freenet/node/fcp/FCPConnectionInputHandler.java
2006-02-22 21:12:38 UTC (rev 8113)
@@ -44,7 +44,11 @@
SimpleFieldSet fs;
// Read a message
String messageType = lis.readLine(64, 64);
- if(messageType==null || messageType.equals(""))
continue;
+ if(messageType == null) {
+ is.close();
+ return;
+ }
+ if(messageType.equals("")) continue;
fs = new SimpleFieldSet(lis, 4096, 128, false, false);
FCPMessage msg;
try {
Modified: trunk/freenet/src/freenet/node/fcp/FCPMessage.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/FCPMessage.java 2006-02-22 20:35:17 UTC
(rev 8112)
+++ trunk/freenet/src/freenet/node/fcp/FCPMessage.java 2006-02-22 21:12:38 UTC
(rev 8113)
@@ -33,12 +33,16 @@
return new ClientPutMessage(fs);
if(name.equals(GenerateSSKMessage.name))
return new GenerateSSKMessage(fs);
+ if(name.equals(ListPersistentRequestsMessage.name))
+ return new ListPersistentRequestsMessage(fs);
+ if(name.equals(RemovePersistentRequest.name))
+ return new RemovePersistentRequest(fs);
if(name.equals("Void"))
return null;
+ throw new
MessageInvalidException(ProtocolErrorMessage.INVALID_MESSAGE, "Unknown message
name "+name, null);
// if(name.equals("ClientPut"))
// return new ClientPutFCPMessage(fs);
// TODO Auto-generated method stub
- return null;
}
/** Do whatever it is that we do with this type of message.
Modified: trunk/freenet/src/freenet/node/fcp/FCPServer.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/FCPServer.java 2006-02-22 20:35:17 UTC
(rev 8112)
+++ trunk/freenet/src/freenet/node/fcp/FCPServer.java 2006-02-22 21:12:38 UTC
(rev 8113)
@@ -150,27 +150,32 @@
return fcp;
}
- public synchronized FCPClient registerClient(String name,
FCPConnectionHandler handler) {
- FCPClient oldClient = (FCPClient) clientsByName.get(name);
- if(oldClient == null) {
- // Create new client
- FCPClient client = new FCPClient(name, handler);
- clientsByName.put(name, client);
- return client;
- } else {
- FCPConnectionHandler oldConn =
oldClient.getConnection();
- // Have existing client
- if(oldConn == null) {
- // Easy
- oldClient.setConnection(handler);
+ public FCPClient registerClient(String name, FCPConnectionHandler
handler) {
+ FCPClient oldClient;
+ synchronized(this) {
+ oldClient = (FCPClient) clientsByName.get(name);
+ if(oldClient == null) {
+ // Create new client
+ FCPClient client = new FCPClient(name, handler);
+ clientsByName.put(name, client);
+ return client;
} else {
- // Kill old connection
- oldConn.outputHandler.queue(new
CloseConnectionDuplicateClientNameMessage());
- oldConn.close();
- oldClient.setConnection(handler);
+ FCPConnectionHandler oldConn =
oldClient.getConnection();
+ // Have existing client
+ if(oldConn == null) {
+ // Easy
+ oldClient.setConnection(handler);
+ } else {
+ // Kill old connection
+ oldConn.outputHandler.queue(new
CloseConnectionDuplicateClientNameMessage());
+ oldConn.close();
+ oldClient.setConnection(handler);
+ return oldClient;
+ }
}
- return oldClient;
}
+
oldClient.queuePendingMessagesOnConnectionRestart(handler.outputHandler);
+ return oldClient;
}
}
Modified: trunk/freenet/src/freenet/node/fcp/GetFailedMessage.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/GetFailedMessage.java 2006-02-22
20:35:17 UTC (rev 8112)
+++ trunk/freenet/src/freenet/node/fcp/GetFailedMessage.java 2006-02-22
21:12:38 UTC (rev 8113)
@@ -15,7 +15,7 @@
final boolean isFatal;
final String identifier;
- public GetFailedMessage(FCPConnectionHandler handler, FetchException e,
String identifier) {
+ public GetFailedMessage(FetchException e, String identifier) {
this.tracker = e.errorCodes;
this.code = e.mode;
this.codeDescription = FetchException.getMessage(code);
Added: trunk/freenet/src/freenet/node/fcp/ListPersistentRequestsMessage.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ListPersistentRequestsMessage.java
2006-02-22 20:35:17 UTC (rev 8112)
+++ trunk/freenet/src/freenet/node/fcp/ListPersistentRequestsMessage.java
2006-02-22 21:12:38 UTC (rev 8113)
@@ -0,0 +1,28 @@
+package freenet.node.fcp;
+
+import freenet.node.Node;
+import freenet.support.SimpleFieldSet;
+
+public class ListPersistentRequestsMessage extends FCPMessage {
+
+ static final String name = "ListPersistentRequests";
+
+ public ListPersistentRequestsMessage(SimpleFieldSet fs) {
+ // Do nothing
+ }
+
+ public SimpleFieldSet getFieldSet() {
+ return new SimpleFieldSet(false);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void run(FCPConnectionHandler handler, Node node)
+ throws MessageInvalidException {
+
handler.getClient().queuePendingMessagesOnConnectionRestart(handler.outputHandler);
+
handler.getClient().queuePendingMessagesFromRunningRequests(handler.outputHandler);
+ }
+
+}
Added: trunk/freenet/src/freenet/node/fcp/PersistentGet.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/PersistentGet.java 2006-02-22
20:35:17 UTC (rev 8112)
+++ trunk/freenet/src/freenet/node/fcp/PersistentGet.java 2006-02-22
21:12:38 UTC (rev 8113)
@@ -0,0 +1,67 @@
+package freenet.node.fcp;
+
+import java.io.File;
+
+import freenet.keys.FreenetURI;
+import freenet.node.Node;
+import freenet.support.SimpleFieldSet;
+
+/**
+ * Sent by the node to a client when it asks for a list of current requests.
+ * PersistentGet
+ * End
+ */
+public class PersistentGet extends FCPMessage {
+
+ static final String name = "PersistentGet";
+
+ final String identifier;
+ final FreenetURI uri;
+ final int verbosity;
+ final short priorityClass;
+ final short returnType;
+ final short persistenceType;
+ final File targetFile;
+ final File tempFile;
+ final String clientToken;
+
+ public PersistentGet(String identifier, FreenetURI uri, int verbosity,
+ short priorityClass, short returnType, short
persistenceType,
+ File targetFile, File tempFile, String clientToken) {
+ this.identifier = identifier;
+ this.uri = uri;
+ this.verbosity = verbosity;
+ this.priorityClass = priorityClass;
+ this.returnType = returnType;
+ this.persistenceType = persistenceType;
+ this.targetFile = targetFile;
+ this.tempFile = tempFile;
+ this.clientToken = clientToken;
+ }
+
+ public SimpleFieldSet getFieldSet() {
+ SimpleFieldSet fs = new SimpleFieldSet(false);
+ fs.put("Identifier", identifier);
+ fs.put("URI", uri.toString(false));
+ fs.put("Verbosity", Integer.toString(verbosity));
+ fs.put("ReturnType",
ClientGetMessage.returnTypeString(returnType));
+ fs.put("PersistenceType",
ClientGetMessage.persistenceTypeString(persistenceType));
+ if(returnType == ClientGetMessage.RETURN_TYPE_DISK) {
+ fs.put("Filename", targetFile.getAbsolutePath());
+ fs.put("TempFilename", tempFile.getAbsolutePath());
+ }
+ if(clientToken != null)
+ fs.put("ClientToken", clientToken);
+ return fs;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void run(FCPConnectionHandler handler, Node node)
+ throws MessageInvalidException {
+ throw new
MessageInvalidException(ProtocolErrorMessage.INVALID_MESSAGE, "PersistentGet
goes from server to client not the other way around", identifier);
+ }
+
+}
Added: trunk/freenet/src/freenet/node/fcp/PersistentPut.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/PersistentPut.java 2006-02-22
20:35:17 UTC (rev 8112)
+++ trunk/freenet/src/freenet/node/fcp/PersistentPut.java 2006-02-22
21:12:38 UTC (rev 8113)
@@ -0,0 +1,59 @@
+package freenet.node.fcp;
+
+import java.io.File;
+
+import freenet.keys.FreenetURI;
+import freenet.node.Node;
+import freenet.support.SimpleFieldSet;
+
+public class PersistentPut extends FCPMessage {
+
+ static final String name = "PersistentPut";
+
+ final String identifier;
+ final FreenetURI uri;
+ final int verbosity;
+ final short priorityClass;
+ final boolean fromDisk;
+ final short persistenceType;
+ final File origFilename;
+ final String mimeType;
+
+ public PersistentPut(String identifier, FreenetURI uri, int verbosity,
+ short priorityClass, boolean fromDisk, short
persistenceType,
+ File origFilename, String mimeType) {
+ this.identifier = identifier;
+ this.uri = uri;
+ this.verbosity = verbosity;
+ this.priorityClass = priorityClass;
+ this.fromDisk = fromDisk;
+ this.persistenceType = persistenceType;
+ this.origFilename = origFilename;
+ this.mimeType = mimeType;
+ }
+
+ public SimpleFieldSet getFieldSet() {
+ SimpleFieldSet fs = new SimpleFieldSet(false);
+ fs.put("Identifier", identifier);
+ fs.put("URI", uri.toString(false));
+ fs.put("Verbosity", Integer.toString(verbosity));
+ fs.put("PriorityClass", Short.toString(priorityClass));
+ fs.put("UploadFrom", (fromDisk ? "disk" : "direct"));
+ fs.put("Persistence",
ClientRequest.persistenceTypeString(persistenceType));
+ if(origFilename != null)
+ fs.put("Filename", origFilename.getAbsolutePath());
+ if(mimeType != null)
+ fs.put("Metadata.ContentType", mimeType);
+ return fs;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void run(FCPConnectionHandler handler, Node node)
+ throws MessageInvalidException {
+ throw new
MessageInvalidException(ProtocolErrorMessage.INVALID_MESSAGE, "PersistentPut
goes from server to client not the other way around", identifier);
+ }
+
+}
Modified: trunk/freenet/src/freenet/node/fcp/ProtocolErrorMessage.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ProtocolErrorMessage.java
2006-02-22 20:35:17 UTC (rev 8112)
+++ trunk/freenet/src/freenet/node/fcp/ProtocolErrorMessage.java
2006-02-22 21:12:38 UTC (rev 8113)
@@ -32,6 +32,7 @@
static final int COULD_NOT_CREATE_FILE = 12;
static final int COULD_NOT_WRITE_FILE = 13;
static final int COULD_NOT_RENAME_FILE = 14;
+ static final int NO_SUCH_IDENTIFIER = 15;
final int code;
final String extra;
@@ -68,6 +69,8 @@
return "Could not write file";
case COULD_NOT_RENAME_FILE:
return "Could not rename file";
+ case NO_SUCH_IDENTIFIER:
+ return "No such identifier";
default:
Logger.error(this, "Unknown error code: "+code, new
Exception("debug"));
return "(Unknown)";
Added: trunk/freenet/src/freenet/node/fcp/RemovePersistentRequest.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/RemovePersistentRequest.java
2006-02-22 20:35:17 UTC (rev 8112)
+++ trunk/freenet/src/freenet/node/fcp/RemovePersistentRequest.java
2006-02-22 21:12:38 UTC (rev 8113)
@@ -0,0 +1,36 @@
+package freenet.node.fcp;
+
+import freenet.node.Node;
+import freenet.support.SimpleFieldSet;
+
+/**
+ * Client telling node to remove a (completed or not) persistent request.
+ */
+public class RemovePersistentRequest extends FCPMessage {
+
+ final static String name = "RemovePersistentRequest";
+
+ final String identifier;
+
+ public RemovePersistentRequest(SimpleFieldSet fs) throws
MessageInvalidException {
+ this.identifier = fs.get("Identifier");
+ if(identifier == null)
+ throw new
MessageInvalidException(ProtocolErrorMessage.MISSING_FIELD, "Must have
Identifier", null);
+ }
+
+ public SimpleFieldSet getFieldSet() {
+ SimpleFieldSet fs = new SimpleFieldSet(false);
+ fs.put("Identifier", identifier);
+ return fs;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void run(FCPConnectionHandler handler, Node node)
+ throws MessageInvalidException {
+ handler.getClient().removeByIdentifier(identifier);
+ }
+
+}
Modified: trunk/freenet/src/freenet/support/LRUQueue.java
===================================================================
--- trunk/freenet/src/freenet/support/LRUQueue.java 2006-02-22 20:35:17 UTC
(rev 8112)
+++ trunk/freenet/src/freenet/support/LRUQueue.java 2006-02-22 21:12:38 UTC
(rev 8113)
@@ -90,12 +90,9 @@
this.obj = obj;
}
}
+
+ public synchronized Object[] toArray() {
+ return hash.keySet().toArray();
+ }
}
-
-
-
-
-
-
-