Author: toad
Date: 2006-03-02 16:13:21 +0000 (Thu, 02 Mar 2006)
New Revision: 8138
Modified:
trunk/freenet/src/freenet/client/async/ClientRequest.java
trunk/freenet/src/freenet/node/Node.java
trunk/freenet/src/freenet/node/Version.java
trunk/freenet/src/freenet/node/fcp/ClientGet.java
trunk/freenet/src/freenet/node/fcp/ClientGetMessage.java
trunk/freenet/src/freenet/node/fcp/ClientPut.java
trunk/freenet/src/freenet/node/fcp/ClientRequest.java
trunk/freenet/src/freenet/node/fcp/FCPClient.java
trunk/freenet/src/freenet/node/fcp/FCPConnectionHandler.java
trunk/freenet/src/freenet/node/fcp/FCPConnectionInputHandler.java
trunk/freenet/src/freenet/node/fcp/FCPServer.java
trunk/freenet/src/freenet/support/LRUQueue.java
trunk/freenet/src/freenet/support/SimpleFieldSet.java
Log:
479:
Persistence=forever support.
Very basic; will restart all requests from scratch on startup!
Next step is to remember whether the request is finished, and any messages that
need to be sent.
Modified: trunk/freenet/src/freenet/client/async/ClientRequest.java
===================================================================
--- trunk/freenet/src/freenet/client/async/ClientRequest.java 2006-02-27
23:25:20 UTC (rev 8137)
+++ trunk/freenet/src/freenet/client/async/ClientRequest.java 2006-03-02
16:13:21 UTC (rev 8138)
@@ -56,6 +56,7 @@
if(blockSetFinalized) return;
blockSetFinalized = true;
}
+ Logger.minor(this, "Finalized set of blocks for "+this, new
Exception("debug"));
notifyClients();
}
Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java 2006-02-27 23:25:20 UTC (rev
8137)
+++ trunk/freenet/src/freenet/node/Node.java 2006-03-02 16:13:21 UTC (rev
8138)
@@ -889,6 +889,8 @@
fcpServer = FCPServer.maybeCreate(this, config);
} catch (IOException e) {
throw new NodeInitException(EXIT_COULD_NOT_START_FCP,
"Could not start FCP: "+e);
+ } catch (InvalidConfigValueException e) {
+ throw new NodeInitException(EXIT_COULD_NOT_START_FCP,
"Could not start FCP: "+e);
}
// SNMP
Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2006-02-27 23:25:20 UTC (rev
8137)
+++ trunk/freenet/src/freenet/node/Version.java 2006-03-02 16:13:21 UTC (rev
8138)
@@ -20,7 +20,7 @@
public static final String protocolVersion = "1.0";
/** The build number of the current revision */
- private static final int buildNumber = 478;
+ private static final int buildNumber = 479;
/** Oldest build of Fred we will talk to */
private static final int lastGoodBuild = 475;
Modified: trunk/freenet/src/freenet/node/fcp/ClientGet.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientGet.java 2006-02-27 23:25:20 UTC
(rev 8137)
+++ trunk/freenet/src/freenet/node/fcp/ClientGet.java 2006-03-02 16:13:21 UTC
(rev 8138)
@@ -1,9 +1,11 @@
package freenet.node.fcp;
+import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.net.MalformedURLException;
import freenet.client.FetchException;
import freenet.client.FetchResult;
@@ -18,7 +20,9 @@
import freenet.keys.FreenetURI;
import freenet.support.Bucket;
import freenet.support.BucketTools;
+import freenet.support.Fields;
import freenet.support.Logger;
+import freenet.support.SimpleFieldSet;
/**
* A simple client fetch. This can of course fetch arbitrarily large
@@ -64,13 +68,13 @@
else
origHandler = null;
this.client = handler.getClient();
- fctx = new FetcherContext(handler.defaultFetchContext,
FetcherContext.IDENTICAL_MASK);
+ fctx = new FetcherContext(client.defaultFetchContext,
FetcherContext.IDENTICAL_MASK);
fctx.eventProducer.addEventListener(this);
// ignoreDS
fctx.localRequestOnly = message.dsOnly;
fctx.ignoreStore = message.ignoreDS;
fctx.maxNonSplitfileRetries = message.maxRetries;
- fctx.maxSplitfileBlockRetries =
Math.max(fctx.maxSplitfileBlockRetries, message.maxRetries);
+ fctx.maxSplitfileBlockRetries = message.maxRetries;
this.identifier = message.identifier;
this.verbosity = message.verbosity;
// FIXME do something with verbosity !!
@@ -80,9 +84,53 @@
fctx.maxTempLength = message.maxTempSize;
this.targetFile = message.diskFile;
this.tempFile = message.tempFile;
- getter = new ClientGetter(this, handler.node.fetchScheduler,
uri, fctx, priorityClass, handler);
+ getter = new ClientGetter(this, client.node.fetchScheduler,
uri, fctx, priorityClass, client);
}
+ /**
+ * Create a ClientGet from a request serialized to a SimpleFieldSet.
+ * Can throw, and does minimal verification, as is dealing with data
+ * supposedly serialized out by the node.
+ * @throws MalformedURLException
+ */
+ public ClientGet(SimpleFieldSet fs, FCPClient client2) throws
MalformedURLException {
+ uri = new FreenetURI(fs.get("URI"));
+ identifier = fs.get("Identifier");
+ verbosity = Integer.parseInt(fs.get("Verbosity"));
+ priorityClass = Short.parseShort(fs.get("PriorityClass"));
+ returnType =
ClientGetMessage.parseValidReturnType(fs.get("ReturnType"));
+ persistenceType =
ClientRequest.parsePersistence(fs.get("Persistence"));
+ if(persistenceType == ClientRequest.PERSIST_CONNECTION)
+ throw new IllegalArgumentException("Reading persistent
get with type CONNECTION !!");
+ if(!(persistenceType == ClientRequest.PERSIST_FOREVER ||
persistenceType == ClientRequest.PERSIST_REBOOT))
+ throw new IllegalArgumentException("Unknown persistence
type "+ClientRequest.persistenceTypeString(persistenceType));
+ this.client = client2;
+ this.origHandler = null;
+ String f = fs.get("Filename");
+ if(f != null)
+ targetFile = new File(f);
+ else
+ targetFile = null;
+ f = fs.get("TempFilename");
+ if(f != null)
+ tempFile = new File(f);
+ else
+ tempFile = null;
+ clientToken = fs.get("ClientToken");
+ finished = Boolean.parseBoolean(fs.get("Finished"));
+ boolean ignoreDS = Fields.stringToBool(fs.get("IgnoreDS"),
false);
+ boolean dsOnly = Fields.stringToBool(fs.get("DSOnly"), false);
+ int maxRetries = Integer.parseInt(fs.get("MaxRetries"));
+ fctx = new FetcherContext(client.defaultFetchContext,
FetcherContext.IDENTICAL_MASK);
+ fctx.eventProducer.addEventListener(this);
+ // ignoreDS
+ fctx.localRequestOnly = dsOnly;
+ fctx.ignoreStore = ignoreDS;
+ fctx.maxNonSplitfileRetries = maxRetries;
+ fctx.maxSplitfileBlockRetries = maxRetries;
+ getter = new ClientGetter(this, client.node.fetchScheduler,
uri, fctx, priorityClass, client);
+ }
+
void start() {
try {
getter.start();
@@ -262,4 +310,51 @@
return identifier;
}
+ public void write(BufferedWriter w) throws IOException {
+ if(persistenceType != ClientRequest.PERSIST_REBOOT) {
+ Logger.error(this, "Not persisting as
persistenceType="+persistenceType);
+ }
+ // Persist the request to disk
+ SimpleFieldSet fs = getFieldSet();
+ fs.writeTo(w);
+ }
+
+ // This is distinct from the ClientGetMessage code, as later on it will
be radically
+ // different (it can store detailed state).
+ public SimpleFieldSet getFieldSet() {
+ SimpleFieldSet fs = new SimpleFieldSet(true); // we will need
multi-level later...
+ fs.put("Type", "GET");
+ fs.put("URI", uri.toString(false));
+ fs.put("Identifier", identifier);
+ fs.put("Verbosity", Integer.toString(verbosity));
+ fs.put("PriorityClass", Short.toString(priorityClass));
+ fs.put("ReturnType",
ClientGetMessage.returnTypeString(returnType));
+ fs.put("Persistence",
ClientRequest.persistenceTypeString(persistenceType));
+ fs.put("ClientName", client.name);
+ if(targetFile != null)
+ fs.put("Filename", targetFile.getPath());
+ if(tempFile != null)
+ fs.put("TempFilename", tempFile.getPath());
+ if(clientToken != null)
+ fs.put("ClientToken", clientToken);
+ if(returnType == ClientGetMessage.RETURN_TYPE_DISK &&
targetFile != null) {
+ // Otherwise we must re-run it anyway as we don't have
the data.
+
+ // finished => persistence of completion state, pending
messages
+ //fs.put("Finished", Boolean.toString(finished));
+ }
+ fs.put("IgnoreDS", Boolean.toString(fctx.ignoreStore));
+ fs.put("DSOnly", Boolean.toString(fctx.localRequestOnly));
+ fs.put("MaxRetries",
Integer.toString(fctx.maxNonSplitfileRetries));
+ return fs;
+ }
+
+ public boolean hasFinished() {
+ return finished;
+ }
+
+ public boolean isPersistentForever() {
+ return persistenceType == ClientRequest.PERSIST_FOREVER;
+ }
+
}
Modified: trunk/freenet/src/freenet/node/fcp/ClientGetMessage.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientGetMessage.java 2006-02-27
23:25:20 UTC (rev 8137)
+++ trunk/freenet/src/freenet/node/fcp/ClientGetMessage.java 2006-03-02
16:13:21 UTC (rev 8138)
@@ -81,20 +81,18 @@
}
}
String returnTypeString = fs.get("ReturnType");
- if(returnTypeString == null ||
returnTypeString.equalsIgnoreCase("direct")) {
- returnType = RETURN_TYPE_DIRECT;
+ returnType = parseReturnType(returnTypeString);
+ if(returnType == RETURN_TYPE_DIRECT) {
diskFile = null;
tempFile = null;
// default just below fproxy
defaultPriority =
RequestStarter.IMMEDIATE_SPLITFILE_PRIORITY_CLASS;
- } else if(returnTypeString.equalsIgnoreCase("none")) {
+ } else if(returnType == RETURN_TYPE_NONE) {
diskFile = null;
tempFile = null;
- returnType = RETURN_TYPE_NONE;
defaultPriority =
RequestStarter.PREFETCH_PRIORITY_CLASS;
- } else if(returnTypeString.equalsIgnoreCase("disk")) {
+ } else if(returnType == RETURN_TYPE_DISK) {
defaultPriority =
RequestStarter.BULK_SPLITFILE_PRIORITY_CLASS;
- returnType = RETURN_TYPE_DISK;
String filename = fs.get("Filename");
if(filename == null)
throw new
MessageInvalidException(ProtocolErrorMessage.MISSING_FIELD, "Missing Filename",
identifier);
@@ -192,10 +190,7 @@
}
private String getReturnTypeString() {
- if(returnType == RETURN_TYPE_DIRECT)
- return "direct";
- else
- throw new IllegalStateException("Unknown return type:
"+returnType);
+ return returnTypeString(returnType);
}
public String getName() {
@@ -221,4 +216,25 @@
}
}
+ public static short parseReturnType(String string) {
+ if(string == null)
+ return RETURN_TYPE_DIRECT;
+ if(string.equalsIgnoreCase("direct"))
+ return RETURN_TYPE_DIRECT;
+ if(string.equalsIgnoreCase("none"))
+ return RETURN_TYPE_NONE;
+ if(string.equalsIgnoreCase("disk"))
+ return RETURN_TYPE_DISK;
+ if(string.equalsIgnoreCase("chunked"))
+ return RETURN_TYPE_CHUNKED;
+ return Short.parseShort(string);
+ }
+
+ public static short parseValidReturnType(String string) {
+ short s = parseReturnType(string);
+ if(s == RETURN_TYPE_DIRECT || s == RETURN_TYPE_NONE || s ==
RETURN_TYPE_DISK)
+ return s;
+ throw new IllegalArgumentException("Invalid or unsupported
return type: "+returnTypeString(s));
+ }
+
}
Modified: trunk/freenet/src/freenet/node/fcp/ClientPut.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientPut.java 2006-02-27 23:25:20 UTC
(rev 8137)
+++ trunk/freenet/src/freenet/node/fcp/ClientPut.java 2006-03-02 16:13:21 UTC
(rev 8138)
@@ -1,6 +1,9 @@
package freenet.node.fcp;
+import java.io.BufferedWriter;
import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
import freenet.client.ClientMetadata;
import freenet.client.FetchException;
@@ -19,7 +22,10 @@
import freenet.client.events.SplitfileProgressEvent;
import freenet.client.events.StartedCompressionEvent;
import freenet.keys.FreenetURI;
+import freenet.support.Bucket;
import freenet.support.Logger;
+import freenet.support.SimpleFieldSet;
+import freenet.support.io.FileBucket;
public class ClientPut extends ClientRequest implements ClientCallback,
ClientEventListener {
@@ -68,9 +74,8 @@
else
this.origHandler = null;
client = handler.getClient();
- ctx = new InserterContext(handler.defaultInsertContext, new
SimpleEventProducer());
- if(message.dontCompress)
- ctx.dontCompress = true;
+ ctx = new InserterContext(client.defaultInsertContext, new
SimpleEventProducer());
+ ctx.dontCompress = message.dontCompress;
ctx.eventProducer.addEventListener(this);
ctx.maxInsertRetries = message.maxRetries;
// Now go through the fields one at a time
@@ -78,8 +83,42 @@
String mimeType = message.contentType;
clientToken = message.clientToken;
block = new InsertBlock(message.bucket, new
ClientMetadata(mimeType), uri);
- inserter = new ClientPutter(this, message.bucket, uri, new
ClientMetadata(mimeType), ctx, handler.node.putScheduler, priorityClass,
getCHKOnly, false, handler);
+ inserter = new ClientPutter(this, message.bucket, uri, new
ClientMetadata(mimeType), ctx, client.node.putScheduler, priorityClass,
getCHKOnly, false, client);
}
+
+ /**
+ * Create from a persisted SimpleFieldSet.
+ * Not very tolerant of errors, as the input was generated
+ * by the node.
+ * @throws MalformedURLException
+ */
+ public ClientPut(SimpleFieldSet fs, FCPClient client2) throws
MalformedURLException {
+ uri = new FreenetURI(fs.get("URI"));
+ identifier = fs.get("Identifier");
+ verbosity = Integer.parseInt(fs.get("Verbosity"));
+ priorityClass = Short.parseShort(fs.get("PriorityClass"));
+ persistenceType =
ClientRequest.parsePersistence(fs.get("Persistence"));
+ if(persistenceType == ClientRequest.PERSIST_CONNECTION
+ || persistenceType ==
ClientRequest.PERSIST_REBOOT)
+ throw new IllegalArgumentException("Reading in
persistent ClientPut, but persistence type =
"+ClientRequest.persistenceTypeString(persistenceType)+" so shouldn't have been
saved in the first place");
+ this.client = client2;
+ origHandler = null;
+ String mimeType = fs.get("Metadata.ContentType");
+ getCHKOnly = Boolean.parseBoolean(fs.get("GetCHKOnly"));
+ boolean dontCompress =
Boolean.parseBoolean(fs.get("DontCompress"));
+ int maxRetries = Integer.parseInt(fs.get("MaxRetries"));
+ clientToken = fs.get("ClientToken");
+ fromDisk = true;
+ origFilename = new File(fs.get("Filename"));
+ Bucket data = new FileBucket(origFilename, true, false, false);
+ ctx = new InserterContext(client.defaultInsertContext, new
SimpleEventProducer());
+ ctx.dontCompress = dontCompress;
+ ctx.eventProducer.addEventListener(this);
+ ctx.maxInsertRetries = maxRetries;
+ block = new InsertBlock(data, new ClientMetadata(mimeType),
uri);
+ inserter = new ClientPutter(this, data, uri, new
ClientMetadata(mimeType), ctx, client.node.putScheduler, priorityClass,
getCHKOnly, false, client);
+ start();
+ }
void start() {
try {
@@ -214,4 +253,42 @@
return identifier;
}
+ public void write(BufferedWriter w) throws IOException {
+ if(persistenceType != ClientRequest.PERSIST_REBOOT) {
+ Logger.error(this, "Not persisting as
persistenceType="+persistenceType);
+ }
+ // Persist the request to disk
+ SimpleFieldSet fs = getFieldSet();
+ fs.writeTo(w);
+ }
+
+ public SimpleFieldSet getFieldSet() throws IOException {
+ SimpleFieldSet fs = new SimpleFieldSet(true); // we will need
multi-level later...
+ fs.put("Type", "PUT");
+ fs.put("URI", uri.toString(false));
+ fs.put("Identifier", identifier);
+ fs.put("Verbosity", Integer.toString(verbosity));
+ fs.put("PriorityClass", Short.toString(priorityClass));
+ fs.put("Persistence",
ClientRequest.persistenceTypeString(persistenceType));
+ fs.put("ClientName", client.name);
+ fs.put("Metadata.ContentType",
block.clientMetadata.getMIMEType());
+ fs.put("GetCHKOnly", Boolean.toString(getCHKOnly));
+ // finished => persistence of completion state, pending messages
+ //fs.put("Finished", Boolean.toString(finished));
+ fs.put("ClientToken", clientToken);
+ if(!fromDisk) throw new
UnsupportedOperationException("Persistent insert not from disk - NOT
SUPPORTED");
+ fs.put("Filename", origFilename.getPath());
+ fs.put("DontCompress", Boolean.toString(ctx.dontCompress));
+ fs.put("MaxRetries", Integer.toString(ctx.maxInsertRetries));
+ return fs;
+ }
+
+ public boolean hasFinished() {
+ return finished;
+ }
+
+ public boolean isPersistentForever() {
+ return persistenceType == ClientRequest.PERSIST_FOREVER;
+ }
+
}
Modified: trunk/freenet/src/freenet/node/fcp/ClientRequest.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientRequest.java 2006-02-27
23:25:20 UTC (rev 8137)
+++ trunk/freenet/src/freenet/node/fcp/ClientRequest.java 2006-03-02
16:13:21 UTC (rev 8138)
@@ -1,5 +1,12 @@
package freenet.node.fcp;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+
+import freenet.support.Logger;
+import freenet.support.SimpleFieldSet;
+
/**
* A request process carried out by the node for an FCP client.
* Examples: ClientGet, ClientPut, MultiGet.
@@ -40,4 +47,43 @@
}
}
+ public static short parsePersistence(String string) {
+ if(string == null || string.equalsIgnoreCase("connection"))
+ return PERSIST_CONNECTION;
+ if(string.equalsIgnoreCase("reboot"))
+ return PERSIST_REBOOT;
+ if(string.equalsIgnoreCase("forever"))
+ return PERSIST_FOREVER;
+ return Short.parseShort(string);
+ }
+
+ /**
+ * Write a persistent request to disk.
+ * @throws IOException
+ */
+ public abstract void write(BufferedWriter w) throws IOException;
+
+ public static ClientRequest readAndRegister(BufferedReader br,
FCPServer server) throws IOException {
+ SimpleFieldSet fs = new SimpleFieldSet(br, true);
+ String clientName = fs.get("ClientName");
+ FCPClient client = server.registerClient(clientName,
server.node, null);
+ String type = fs.get("Type");
+ if(type.equals("GET")) {
+ ClientGet cg = new ClientGet(fs, client);
+ client.register(cg);
+ return cg;
+ } else if(type.equals("PUT")) {
+ ClientPut cp = new ClientPut(fs, client);
+ client.register(cp);
+ return cp;
+ } else {
+ Logger.error(ClientRequest.class, "Unrecognized type:
"+type);
+ return null;
+ }
+ }
+
+ public abstract boolean hasFinished();
+
+ public abstract boolean isPersistentForever();
+
}
Modified: trunk/freenet/src/freenet/node/fcp/FCPClient.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/FCPClient.java 2006-02-27 23:25:20 UTC
(rev 8137)
+++ trunk/freenet/src/freenet/node/fcp/FCPClient.java 2006-03-02 16:13:21 UTC
(rev 8138)
@@ -2,7 +2,13 @@
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Vector;
+import freenet.client.FetcherContext;
+import freenet.client.HighLevelSimpleClient;
+import freenet.client.InserterContext;
+import freenet.node.Node;
import freenet.support.LRUQueue;
/**
@@ -14,16 +20,23 @@
/** Maximum number of unacknowledged completed requests */
private static final int MAX_UNACKED_REQUESTS = 256;
- public FCPClient(String name2, FCPConnectionHandler handler) {
+ public FCPClient(String name2, FCPServer server, FCPConnectionHandler
handler) {
this.name = name2;
this.currentConnection = handler;
this.runningPersistentRequests = new HashSet();
this.completedUnackedRequests = new LRUQueue();
this.clientRequestsByIdentifier = new HashMap();
+ this.server = server;
+ this.node = server.node;
+ this.client = node.makeClient((short)0);
+ defaultFetchContext = client.getFetcherContext();
+ defaultInsertContext = client.getInserterContext();
}
/** The client's Name sent in the ClientHello message */
final String name;
+ /** The FCPServer */
+ final FCPServer server;
/** The current connection handler, if any. */
private FCPConnectionHandler currentConnection;
/** Currently running persistent requests */
@@ -32,6 +45,11 @@
private final LRUQueue completedUnackedRequests;
/** ClientRequest's by identifier */
private final HashMap clientRequestsByIdentifier;
+ /** Client (one FCPClient = one HighLevelSimpleClient = one round-robin
slot) */
+ private final HighLevelSimpleClient client;
+ public final FetcherContext defaultFetchContext;
+ public final InserterContext defaultInsertContext;
+ public final Node node;
public synchronized FCPConnectionHandler getConnection() {
return currentConnection;
@@ -92,14 +110,18 @@
public void register(ClientRequest cg) {
synchronized(this) {
- runningPersistentRequests.add(cg);
+ if(cg.hasFinished())
+ completedUnackedRequests.push(cg);
+ else
+ runningPersistentRequests.add(cg);
clientRequestsByIdentifier.put(cg.getIdentifier(), cg);
}
}
public void removeByIdentifier(String identifier) throws
MessageInvalidException {
+ ClientRequest req;
synchronized(this) {
- ClientRequest req = (ClientRequest)
clientRequestsByIdentifier.get(identifier);
+ req = (ClientRequest)
clientRequestsByIdentifier.get(identifier);
if(req == null)
throw new
MessageInvalidException(ProtocolErrorMessage.NO_SUCH_IDENTIFIER, null,
identifier);
if(runningPersistentRequests.remove(req) ||
completedUnackedRequests.remove(req))
@@ -107,5 +129,23 @@
throw new
MessageInvalidException(ProtocolErrorMessage.NO_SUCH_IDENTIFIER, null,
identifier);
}
}
+
+ public boolean hasPersistentRequests() {
+ return !(runningPersistentRequests.isEmpty() &&
completedUnackedRequests.isEmpty());
+ }
+
+ public void addPersistentRequests(Vector v) {
+ synchronized(this) {
+ Iterator i = runningPersistentRequests.iterator();
+ while(i.hasNext()) {
+ ClientRequest req = (ClientRequest) i.next();
+ if(req.isPersistentForever())
+ v.add(req);
+ }
+ Object[] unacked = completedUnackedRequests.toArray();
+ for(int j=0;j<unacked.length;j++)
+ v.add(unacked[j]);
+ }
+ }
}
Modified: trunk/freenet/src/freenet/node/fcp/FCPConnectionHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/FCPConnectionHandler.java
2006-02-27 23:25:20 UTC (rev 8137)
+++ trunk/freenet/src/freenet/node/fcp/FCPConnectionHandler.java
2006-03-02 16:13:21 UTC (rev 8138)
@@ -17,7 +17,6 @@
final Socket sock;
final FCPConnectionInputHandler inputHandler;
final FCPConnectionOutputHandler outputHandler;
- final Node node;
private boolean isClosed;
private boolean inputClosed;
private boolean outputClosed;
@@ -25,19 +24,13 @@
private FCPClient client;
final BucketFactory bf;
final HashMap requestsByIdentifier;
- final FetcherContext defaultFetchContext;
- public InserterContext defaultInsertContext;
- public FCPConnectionHandler(Socket s, Node node, FCPServer server) {
+ public FCPConnectionHandler(Socket s, FCPServer server) {
this.sock = s;
- this.node = node;
this.server = server;
isClosed = false;
- this.bf = node.tempBucketFactory;
+ this.bf = server.node.tempBucketFactory;
requestsByIdentifier = new HashMap();
- HighLevelSimpleClient client = node.makeClient((short)0);
- defaultFetchContext = client.getFetcherContext();
- defaultInsertContext = client.getInserterContext();
this.inputHandler = new FCPConnectionInputHandler(this);
this.outputHandler = new FCPConnectionOutputHandler(this);
inputHandler.start();
@@ -54,6 +47,8 @@
}
for(int i=0;i<requests.length;i++)
requests[i].onLostConnection();
+ if(client != null && !client.hasPersistentRequests())
+ server.unregisterClient(client);
}
public boolean isClosed() {
@@ -96,7 +91,7 @@
public void setClientName(String name) {
this.clientName = name;
- client = server.registerClient(name, this);
+ client = server.registerClient(name, server.node, this);
}
public String getClientName() {
@@ -122,8 +117,11 @@
return;
} else {
cg.start();
- if(cg.isPersistent())
+ if(cg.isPersistent()) {
client.register(cg);
+ if(cg.isPersistentForever())
+ server.forceStorePersistentRequests();
+ }
}
}
@@ -146,8 +144,11 @@
return;
} else {
cp.start();
- if(cp.isPersistent())
+ if(cp.isPersistent()) {
client.register(cp);
+ if(cp.isPersistentForever())
+ server.forceStorePersistentRequests();
+ }
}
}
Modified: trunk/freenet/src/freenet/node/fcp/FCPConnectionInputHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/FCPConnectionInputHandler.java
2006-02-27 23:25:20 UTC (rev 8137)
+++ trunk/freenet/src/freenet/node/fcp/FCPConnectionInputHandler.java
2006-03-02 16:13:21 UTC (rev 8138)
@@ -74,7 +74,7 @@
continue;
}
try {
- msg.run(handler, handler.node);
+ msg.run(handler, handler.server.node);
} catch (MessageInvalidException e) {
FCPMessage err = new
ProtocolErrorMessage(e.protocolCode, false, e.getMessage(), e.ident);
handler.outputHandler.queue(err);
Modified: trunk/freenet/src/freenet/node/fcp/FCPServer.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/FCPServer.java 2006-02-27 23:25:20 UTC
(rev 8137)
+++ trunk/freenet/src/freenet/node/fcp/FCPServer.java 2006-03-02 16:13:21 UTC
(rev 8138)
@@ -1,15 +1,31 @@
package freenet.node.fcp;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
+import java.util.Iterator;
+import java.util.Vector;
import java.util.WeakHashMap;
+import freenet.client.FetcherContext;
+import freenet.client.HighLevelSimpleClient;
+import freenet.client.InserterContext;
import freenet.config.BooleanCallback;
import freenet.config.Config;
import freenet.config.IntCallback;
import freenet.config.InvalidConfigValueException;
+import freenet.config.LongCallback;
import freenet.config.StringCallback;
import freenet.config.SubConfig;
import freenet.node.Node;
@@ -26,29 +42,52 @@
final boolean enabled;
final String bindto;
final WeakHashMap clientsByName;
+ private boolean enablePersistentDownloads;
+ private File persistentDownloadsFile;
+ private File persistentDownloadsTempFile;
+ /** Lock for persistence operations.
+ * MUST ALWAYS BE THE OUTERMOST LOCK.
+ */
+ private final Object persistenceSync = new Object();
+ private FCPServerPersister persister;
+ private boolean haveLoadedPersistentRequests;
+ private long persistenceInterval;
+ final FetcherContext defaultFetchContext;
+ public InserterContext defaultInsertContext;
- public FCPServer(int port, Node node) throws IOException {
- this.port = port;
- this.enabled = true;
- this.bindto = new String("127.0.0.1");
- this.sock = new ServerSocket(port, 0,
InetAddress.getByName(bindto));
- this.node = node;
- clientsByName = new WeakHashMap();
- Thread t = new Thread(this, "FCP server");
+ private void startPersister() {
+ Thread t = new Thread(persister = new FCPServerPersister(),
"FCP request persistence handler");
t.setDaemon(true);
t.start();
}
-
- public FCPServer(String ipToBindTo, int port, Node node) throws
IOException {
+
+ private void killPersister() {
+ persister.kill();
+ persister = null;
+ }
+
+ public FCPServer(String ipToBindTo, int port, Node node, boolean
persistentDownloadsEnabled, String persistentDownloadsDir, long
persistenceInterval) throws IOException, InvalidConfigValueException {
this.bindto = new String(ipToBindTo);
+ this.persistenceInterval = persistenceInterval;
this.port = port;
this.enabled = true;
this.sock = new ServerSocket(port, 0,
InetAddress.getByName(bindto));
this.node = node;
clientsByName = new WeakHashMap();
+ // This one is only used to get the default settings.
Individual FCP conns
+ // will make their own.
+ HighLevelSimpleClient client = node.makeClient((short)0);
+ defaultFetchContext = client.getFetcherContext();
+ defaultInsertContext = client.getInserterContext();
Thread t = new Thread(this, "FCP server");
+ this.enablePersistentDownloads = persistentDownloadsEnabled;
+ setPersistentDownloadsFile(new File(persistentDownloadsDir));
t.setDaemon(true);
t.start();
+ if(enablePersistentDownloads) {
+ loadPersistentRequests();
+ startPersister();
+ }
}
public void run() {
@@ -66,7 +105,7 @@
private void realRun() throws IOException {
// Accept a connection
Socket s = sock.accept();
- FCPConnectionHandler handler = new FCPConnectionHandler(s,
node, this);
+ FCPConnectionHandler handler = new FCPConnectionHandler(s,
this);
}
static class FCPPortNumberCallback implements IntCallback {
@@ -107,6 +146,9 @@
}
}
+ // FIXME: Consider moving everything except enabled into constructor
+ // Actually we could move enabled in too with an exception???
+
static class FCPBindtoCallback implements StringCallback{
final Node node;
@@ -127,34 +169,153 @@
}
}
+ static class PersistentDownloadsEnabledCallback implements
BooleanCallback {
+
+ FCPServer server;
+
+ public boolean get() {
+ return server.persistentDownloadsEnabled();
+ }
+
+ public void set(boolean set) {
+ if(server.persistentDownloadsEnabled() != set)
+ server.setPersistentDownloadsEnabled(set);
+ }
+
+ }
+
+ static class PersistentDownloadsFileCallback implements StringCallback {
+
+ FCPServer server;
+
+ public String get() {
+ return server.persistentDownloadsFile.toString();
+ }
+
+ public void set(String val) throws InvalidConfigValueException {
+ File f = new File(val);
+ if(f.equals(server.persistentDownloadsFile)) return;
+ server.setPersistentDownloadsFile(f);
+ }
+ }
+
+ static class PersistentDownloadsIntervalCallback implements
LongCallback {
+
+ FCPServer server;
+
+ public long get() {
+ return server.persistenceInterval;
+ }
+
+ public void set(long value) {
+ server.persistenceInterval = value;
+ FCPServerPersister p = server.persister;
+ if(p != null) {
+ synchronized(p) {
+ p.notify();
+ }
+ }
+ }
+ }
- public static FCPServer maybeCreate(Node node, Config config) throws
IOException {
+ public static FCPServer maybeCreate(Node node, Config config) throws
IOException, InvalidConfigValueException {
SubConfig fcpConfig = new SubConfig("fcp", config);
fcpConfig.register("enabled", true, 2, true, "Is FCP server
enabled ?", "Is FCP server enabled ?", new FCPEnabledCallback(node));
fcpConfig.register("port", 9481 /* anagram of 1984, and 1000 up
from old number */,
2, true, "FCP port number", "FCP port number",
new FCPPortNumberCallback(node));
fcpConfig.register("bindto", "127.0.0.1", 2, true, "Ip address
to bind to", "Ip address to bind the FCP server to", new
FCPBindtoCallback(node));
+ PersistentDownloadsEnabledCallback cb1;
+ PersistentDownloadsFileCallback cb2;
+ PersistentDownloadsIntervalCallback cb3;
+ fcpConfig.register("persistentDownloadsEnabled", true, 3, true,
"Enable persistent downloads?", "Whether to enable Persistence=forever for FCP
requests. Meaning whether to support requests which persist over node restarts;
they must be written to disk and this may constitute a security risk for some
people.",
+ cb1 = new PersistentDownloadsEnabledCallback());
+ boolean persistentDownloadsEnabled =
fcpConfig.getBoolean("persistentDownloadsEnabled");
+ fcpConfig.register("persistentDownloadsFile", "downloads.dat",
4, true, "Filename to store persistent downloads in", "Filename to store
details of persistent downloads to",
+ cb2 = new PersistentDownloadsFileCallback());
+ String persistentDownloadsDir =
+ fcpConfig.getString("persistentDownloadsFile");
+ fcpConfig.register("persistentDownloadsInterval",
(long)(5*60*1000), 5, true, "Interval between writing persistent downloads to
disk", "Interval between writing persistent downloads to disk",
+ cb3 = new
PersistentDownloadsIntervalCallback());
+
+ long persistentDownloadsInterval =
fcpConfig.getLong("persistentDownloadsInterval");
+
FCPServer fcp;
if(fcpConfig.getBoolean("enabled")){
Logger.normal(node, "Starting FCP server on
"+fcpConfig.getString("bindto")+":"+fcpConfig.getInt("port")+".");
- fcp = new FCPServer(fcpConfig.getString("bindto"),
fcpConfig.getInt("port"), node);
+ fcp = new FCPServer(fcpConfig.getString("bindto"),
fcpConfig.getInt("port"), node, persistentDownloadsEnabled,
persistentDownloadsDir, persistentDownloadsInterval);
node.setFCPServer(fcp);
}else{
Logger.normal(node, "Not starting FCP server as it's
disabled");
fcp = null;
}
+
+ if(fcp != null) {
+ cb1.server = fcp;
+ cb2.server = fcp;
+ cb3.server = fcp;
+ }
+
fcpConfig.finishedInitialization();
return fcp;
}
- public FCPClient registerClient(String name, FCPConnectionHandler
handler) {
+ public void setPersistentDownloadsFile(File f) throws
InvalidConfigValueException {
+ synchronized(persistenceSync) {
+ checkFile(f);
+ File temp = new File(f.getPath()+".tmp");
+ checkFile(temp);
+ // Else is ok
+ persistentDownloadsFile = f;
+ persistentDownloadsTempFile = temp;
+ }
+ }
+
+ private void checkFile(File f) throws InvalidConfigValueException {
+ if(f.isDirectory())
+ throw new InvalidConfigValueException("Invalid filename
for downloads list: is a directory");
+ if(f.isFile() && !(f.canRead() && f.canWrite()))
+ throw new InvalidConfigValueException("File exists but
cannot be read");
+ File parent = f.getParentFile();
+ if(parent != null && !parent.exists())
+ throw new InvalidConfigValueException("Parent directory
does not exist");
+ if(!f.exists()) {
+ try {
+ if(!((f.createNewFile() || f.exists()) &&
(f.canRead() && f.canWrite())))
+ throw new
InvalidConfigValueException("File does not exist, cannot create it and/or
cannot read/write it");
+ } catch (IOException e) {
+ throw new InvalidConfigValueException("File
does not exist and cannot be created");
+ }
+ }
+ }
+
+ public void setPersistentDownloadsEnabled(boolean set) {
+ synchronized(persistenceSync) {
+ if(enablePersistentDownloads == set) return;
+ if(set) {
+ if(!haveLoadedPersistentRequests)
+ loadPersistentRequests();
+ startPersister();
+ } else {
+ killPersister();
+ }
+ enablePersistentDownloads = set;
+ }
+ }
+
+ public boolean persistentDownloadsEnabled() {
+ synchronized(persistenceSync) {
+ return enablePersistentDownloads;
+ }
+ }
+
+ public FCPClient registerClient(String name, Node node,
FCPConnectionHandler handler) {
FCPClient oldClient;
synchronized(this) {
oldClient = (FCPClient) clientsByName.get(name);
if(oldClient == null) {
// Create new client
- FCPClient client = new FCPClient(name, handler);
+ FCPClient client = new FCPClient(name, this,
handler);
clientsByName.put(name, client);
return client;
} else {
@@ -172,8 +333,150 @@
}
}
}
-
oldClient.queuePendingMessagesOnConnectionRestart(handler.outputHandler);
+ if(handler != null)
+
oldClient.queuePendingMessagesOnConnectionRestart(handler.outputHandler);
return oldClient;
}
+ public void unregisterClient(FCPClient client) {
+ synchronized(this) {
+ String name = client.name;
+ clientsByName.remove(name);
+ }
+ }
+
+ class FCPServerPersister implements Runnable {
+
+ private boolean killed;
+ private boolean storeNow;
+
+ public void force() {
+ synchronized(this) {
+ storeNow = true;
+ notifyAll();
+ }
+ }
+
+ void kill() {
+ synchronized(this) {
+ killed = true;
+ notifyAll();
+ }
+ }
+
+ public void run() {
+ while(true) {
+ synchronized(this) {
+ if(killed) return;
+ long startTime =
System.currentTimeMillis();
+ long now;
+ while((now =
System.currentTimeMillis()) < startTime + persistenceInterval && !storeNow) {
+ try {
+ long wait =
Math.max((startTime + persistenceInterval) - now, Integer.MAX_VALUE);
+ if(wait > 0)
+
wait(Math.min(wait, 5000));
+ } catch (InterruptedException
e) {
+ // Ignore
+ }
+ if(killed) return;
+ }
+ }
+ storeNow = false;
+ storePersistentRequests();
+ }
+ }
+
+ }
+
+ public void forceStorePersistentRequests() {
+ Logger.minor(this, "Forcing store persistent requests");
+ if(persister != null) {
+ persister.force();
+ } else {
+ Logger.error(this, "Persister not running, cannot store
persistent requests");
+ }
+ }
+
+ /** Store all persistent requests to disk */
+ public void storePersistentRequests() {
+ Logger.minor(this, "Storing persistent requests");
+ ClientRequest[] persistentRequests = getPersistentRequests();
+ synchronized(persistenceSync) {
+ try {
+ FileOutputStream fos = new
FileOutputStream(persistentDownloadsTempFile);
+ BufferedOutputStream bos = new
BufferedOutputStream(fos);
+ OutputStreamWriter osw = new
OutputStreamWriter(bos);
+ BufferedWriter w = new BufferedWriter(osw);
+
w.write(Integer.toString(persistentRequests.length)+"\n");
+ for(int i=0;i<persistentRequests.length;i++)
+ persistentRequests[i].write(w);
+ w.close();
+
if(!persistentDownloadsTempFile.renameTo(persistentDownloadsFile)) {
+ Logger.minor(this, "Rename failed");
+ persistentDownloadsFile.delete();
+
if(!persistentDownloadsTempFile.renameTo(persistentDownloadsFile)) {
+ Logger.error(this, "Could not
rename persisted requests temp file "+persistentDownloadsTempFile+" to
"+persistentDownloadsFile);
+ }
+ }
+ } catch (IOException e) {
+ Logger.error(this, "Cannot write persistent
requests to disk: "+e);
+ }
+ }
+ }
+
+ private void loadPersistentRequests() {
+ synchronized(persistenceSync) {
+ FileInputStream fis;
+ try {
+ fis = new
FileInputStream(persistentDownloadsFile);
+ } catch (FileNotFoundException e) {
+ Logger.normal(this, "Not reading any persistent
requests from disk because no file exists");
+ return;
+ }
+ BufferedInputStream bis = new BufferedInputStream(fis);
+ InputStreamReader ris = new InputStreamReader(bis);
+ BufferedReader br = new BufferedReader(ris);
+ try {
+ String r = br.readLine();
+ int count;
+ try {
+ count = Integer.parseInt(r);
+ } catch (NumberFormatException e) {
+ Logger.error(this, "Corrupt persistent
downloads file: "+persistentDownloadsFile);
+ try {
+ br.close();
+ } catch (IOException e1) {
+ Logger.error(this, "Error
closing: "+e1, e1);
+ }
+ return;
+ }
+ for(int i=0;i<count;i++) {
+ ClientRequest req =
ClientRequest.readAndRegister(br, this);
+ }
+ br.close();
+ } catch (IOException e) {
+ Logger.error(this, "Error reading persistent
downloads file: "+persistentDownloadsFile+" : "+e, e);
+ try {
+ br.close();
+ } catch (IOException e1) {
+ Logger.error(this, "Error closing:
"+e1, e1);
+ }
+ return;
+ }
+ return;
+ }
+ }
+
+ private ClientRequest[] getPersistentRequests() {
+ Vector v = new Vector();
+ synchronized(this) {
+ Iterator i = clientsByName.values().iterator();
+ while(i.hasNext()) {
+ FCPClient client = (FCPClient) (i.next());
+ client.addPersistentRequests(v);
+ }
+ }
+ return (ClientRequest[]) v.toArray(new ClientRequest[v.size()]);
+ }
+
}
Modified: trunk/freenet/src/freenet/support/LRUQueue.java
===================================================================
--- trunk/freenet/src/freenet/support/LRUQueue.java 2006-02-27 23:25:20 UTC
(rev 8137)
+++ trunk/freenet/src/freenet/support/LRUQueue.java 2006-03-02 16:13:21 UTC
(rev 8138)
@@ -94,5 +94,9 @@
public synchronized Object[] toArray() {
return hash.keySet().toArray();
}
+
+ public synchronized boolean isEmpty() {
+ return hash.isEmpty();
+ }
}
Modified: trunk/freenet/src/freenet/support/SimpleFieldSet.java
===================================================================
--- trunk/freenet/src/freenet/support/SimpleFieldSet.java 2006-02-27
23:25:20 UTC (rev 8137)
+++ trunk/freenet/src/freenet/support/SimpleFieldSet.java 2006-03-02
16:13:21 UTC (rev 8138)
@@ -180,6 +180,7 @@
public void put(String key, String value) {
int idx;
+ if(value == null) return;
if((!multiLevel) || (idx = key.indexOf(MULTI_LEVEL_CHAR)) ==
-1) {
String x = (String) map.get(key);
@@ -220,6 +221,7 @@
w.write(prefix+key+"="+value+"\n");
} else {
SimpleFieldSet sfs = (SimpleFieldSet) v;
+ if(sfs == null) throw new NullPointerException();
sfs.writeTo(w, prefix+key+MULTI_LEVEL_CHAR, true);
}
}