Author: toad
Date: 2006-03-04 20:25:41 +0000 (Sat, 04 Mar 2006)
New Revision: 8161
Added:
trunk/freenet/src/freenet/node/fcp/EndListPersistentRequestsMessage.java
trunk/freenet/src/freenet/node/fcp/WatchGlobal.java
Modified:
trunk/freenet/src/freenet/node/Version.java
trunk/freenet/src/freenet/node/fcp/ClientGet.java
trunk/freenet/src/freenet/node/fcp/ClientGetMessage.java
trunk/freenet/src/freenet/node/fcp/ClientPut.java
trunk/freenet/src/freenet/node/fcp/ClientPutMessage.java
trunk/freenet/src/freenet/node/fcp/ClientRequest.java
trunk/freenet/src/freenet/node/fcp/FCPClient.java
trunk/freenet/src/freenet/node/fcp/FCPMessage.java
trunk/freenet/src/freenet/node/fcp/FCPServer.java
trunk/freenet/src/freenet/node/fcp/ListPersistentRequestsMessage.java
trunk/freenet/src/freenet/node/fcp/PersistentGet.java
trunk/freenet/src/freenet/node/fcp/PersistentPut.java
Log:
494:
Global queue support, apparently working (very lightly tested).
Add EndListPersistentRequests at end of a persistent request list call.
Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2006-03-04 19:51:04 UTC (rev
8160)
+++ trunk/freenet/src/freenet/node/Version.java 2006-03-04 20:25:41 UTC (rev
8161)
@@ -20,7 +20,7 @@
public static final String protocolVersion = "1.0";
/** The build number of the current revision */
- private static final int buildNumber = 493;
+ private static final int buildNumber = 494;
/** Oldest build of Fred we will talk to */
private static final int lastGoodBuild = 475;
Modified: trunk/freenet/src/freenet/node/fcp/ClientGet.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientGet.java 2006-03-04 19:51:04 UTC
(rev 8160)
+++ trunk/freenet/src/freenet/node/fcp/ClientGet.java 2006-03-04 20:25:41 UTC
(rev 8161)
@@ -86,7 +86,11 @@
this.origHandler = handler;
else
origHandler = null;
- this.client = handler.getClient();
+ if(message.global) {
+ client = handler.server.globalClient;
+ } else {
+ client = handler.getClient();
+ }
fctx = new FetcherContext(client.defaultFetchContext,
FetcherContext.IDENTICAL_MASK);
fctx.eventProducer.addEventListener(this);
// ignoreDS
@@ -124,6 +128,9 @@
}
returnBucket = ret;
getter = new ClientGetter(this, client.node.fetchScheduler,
uri, fctx, priorityClass, client, returnBucket);
+ if(persistenceType != PERSIST_CONNECTION && handler != null)
+ sendPendingMessages(handler.outputHandler, true);
+
}
/**
@@ -267,15 +274,15 @@
this.succeeded = true;
finished = true;
}
- trySendDataFoundOrGetFailed();
+ trySendDataFoundOrGetFailed(null);
if(adm != null)
- trySendAllDataMessage(adm);
+ trySendAllDataMessage(adm, null);
if(!dontFree)
data.free();
finish();
}
- private void trySendDataFoundOrGetFailed() {
+ private void trySendDataFoundOrGetFailed(FCPConnectionOutputHandler
handler) {
FCPMessage msg;
@@ -284,31 +291,32 @@
} else {
msg = getFailedMessage;
}
-
- FCPConnectionHandler conn = client.getConnection();
- if(conn != null) {
- conn.outputHandler.queue(msg);
- if(postFetchProtocolErrorMessage != null)
-
conn.outputHandler.queue(postFetchProtocolErrorMessage);
+
+ if(handler != null)
+ handler.queue(msg);
+ else
+ client.queueClientRequestMessage(msg, 0);
+ if(postFetchProtocolErrorMessage != null) {
+ if(handler != null)
+ handler.queue(postFetchProtocolErrorMessage);
+ else
+
client.queueClientRequestMessage(postFetchProtocolErrorMessage, 0);
}
+
}
- private void trySendAllDataMessage(AllDataMessage msg) {
+ private void trySendAllDataMessage(AllDataMessage msg,
FCPConnectionOutputHandler handler) {
if(persistenceType != ClientRequest.PERSIST_CONNECTION) {
allDataPending = msg;
}
- FCPConnectionHandler conn = client.getConnection();
- if(conn != null)
- conn.outputHandler.queue(msg);
+ client.queueClientRequestMessage(msg, 0);
}
- private void trySendProgress(SimpleProgressMessage msg) {
+ private void trySendProgress(SimpleProgressMessage msg,
FCPConnectionOutputHandler handler) {
if(persistenceType != ClientRequest.PERSIST_CONNECTION) {
progressPending = msg;
}
- FCPConnectionHandler conn = client.getConnection();
- if(conn != null)
- conn.outputHandler.queue(msg);
+ client.queueClientRequestMessage(msg,
VERBOSITY_SPLITFILE_PROGRESS);
}
public void sendPendingMessages(FCPConnectionOutputHandler handler,
boolean includePersistentRequest) {
@@ -317,13 +325,13 @@
return;
}
if(includePersistentRequest) {
- FCPMessage msg = new PersistentGet(identifier, uri,
verbosity, priorityClass, returnType, persistenceType, targetFile, tempFile,
clientToken);
+ FCPMessage msg = new PersistentGet(identifier, uri,
verbosity, priorityClass, returnType, persistenceType, targetFile, tempFile,
clientToken, client.isGlobalQueue);
handler.queue(msg);
}
if(progressPending != null)
handler.queue(progressPending);
if(finished)
- trySendDataFoundOrGetFailed();
+ trySendDataFoundOrGetFailed(handler);
if(allDataPending != null)
handler.queue(allDataPending);
}
@@ -335,7 +343,7 @@
finished = true;
}
Logger.minor(this, "Caught "+e, e);
- trySendDataFoundOrGetFailed();
+ trySendDataFoundOrGetFailed(null);
finish();
}
@@ -367,7 +375,7 @@
return;
SimpleProgressMessage progress =
new SimpleProgressMessage(identifier,
(SplitfileProgressEvent)ce);
- trySendProgress(progress);
+ trySendProgress(progress, null);
}
public boolean isPersistent() {
@@ -441,6 +449,7 @@
fs.put("ReturnBucket.DecryptKey",
HexUtil.bytesToHex(b.getKey()));
fs.put("ReturnBucket.Filename",
((FileBucket)b.getUnderlying()).getName());
}
+ fs.put("Global", Boolean.toString(client.isGlobalQueue));
return fs;
}
Modified: trunk/freenet/src/freenet/node/fcp/ClientGetMessage.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientGetMessage.java 2006-03-04
19:51:04 UTC (rev 8160)
+++ trunk/freenet/src/freenet/node/fcp/ClientGetMessage.java 2006-03-04
20:25:41 UTC (rev 8161)
@@ -50,6 +50,7 @@
final File diskFile;
final File tempFile;
final String clientToken;
+ final boolean global;
// FIXME move these to the actual getter process
static final short RETURN_TYPE_DIRECT = 0; // over FCP
@@ -60,6 +61,7 @@
public ClientGetMessage(SimpleFieldSet fs) throws
MessageInvalidException {
short defaultPriority;
clientToken = fs.get("ClientToken");
+ global = Fields.stringToBool(fs.get("Global"), false);
ignoreDS = Fields.stringToBool(fs.get("IgnoreDS"), false);
dsOnly = Fields.stringToBool(fs.get("DSOnly"), false);
identifier = fs.get("Identifier");
Modified: trunk/freenet/src/freenet/node/fcp/ClientPut.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientPut.java 2006-03-04 19:51:04 UTC
(rev 8160)
+++ trunk/freenet/src/freenet/node/fcp/ClientPut.java 2006-03-04 20:25:41 UTC
(rev 8161)
@@ -85,7 +85,11 @@
this.origHandler = handler;
else
this.origHandler = null;
- client = handler.getClient();
+ if(message.global) {
+ client = handler.server.globalClient;
+ } else {
+ client = handler.getClient();
+ }
ctx = new InserterContext(client.defaultInsertContext, new
SimpleEventProducer());
ctx.dontCompress = message.dontCompress;
ctx.eventProducer.addEventListener(this);
@@ -96,6 +100,8 @@
clientToken = message.clientToken;
block = new InsertBlock(message.bucket, new
ClientMetadata(mimeType), uri);
inserter = new ClientPutter(this, message.bucket, uri, new
ClientMetadata(mimeType), ctx, client.node.putScheduler, priorityClass,
getCHKOnly, false, client);
+ if(persistenceType != PERSIST_CONNECTION && handler != null)
+ sendPendingMessages(handler.outputHandler, true);
}
/**
@@ -179,7 +185,7 @@
succeeded = true;
finished = true;
}
- trySendFinalMessage();
+ trySendFinalMessage(null);
block.getData().free();
finish();
}
@@ -189,7 +195,7 @@
finished = true;
putFailedMessage = new PutFailedMessage(e, identifier);
}
- trySendFinalMessage();
+ trySendFinalMessage(null);
block.getData().free();
finish();
}
@@ -200,7 +206,7 @@
Logger.error(this,
"onGeneratedURI("+uri+","+state+") but already set generatedURI to
"+generatedURI);
generatedURI = uri;
}
- trySendGeneratedURIMessage();
+ trySendGeneratedURIMessage(null);
}
public void onSuccess(FetchResult result, ClientGetter state) {
@@ -217,24 +223,24 @@
if((verbosity & VERBOSITY_SPLITFILE_PROGRESS) ==
VERBOSITY_SPLITFILE_PROGRESS) {
SimpleProgressMessage progress =
new SimpleProgressMessage(identifier,
(SplitfileProgressEvent)ce);
- trySendProgressMessage(progress);
+ trySendProgressMessage(progress,
VERBOSITY_SPLITFILE_PROGRESS, null);
}
} else if(ce instanceof StartedCompressionEvent) {
if((verbosity & VERBOSITY_COMPRESSION_START_END) ==
VERBOSITY_COMPRESSION_START_END) {
StartedCompressionMessage msg =
new
StartedCompressionMessage(identifier, ((StartedCompressionEvent)ce).codec);
- trySendProgressMessage(msg);
+ trySendProgressMessage(msg,
VERBOSITY_COMPRESSION_START_END, null);
}
} else if(ce instanceof FinishedCompressionEvent) {
if((verbosity & VERBOSITY_COMPRESSION_START_END) ==
VERBOSITY_COMPRESSION_START_END) {
FinishedCompressionMessage msg =
new
FinishedCompressionMessage(identifier, (FinishedCompressionEvent)ce);
- trySendProgressMessage(msg);
+ trySendProgressMessage(msg,
VERBOSITY_COMPRESSION_START_END, null);
}
}
}
- private void trySendFinalMessage() {
+ private void trySendFinalMessage(FCPConnectionOutputHandler handler) {
FCPMessage msg;
@@ -247,25 +253,28 @@
if(msg == null) {
Logger.error(this, "Trying to send null message on
"+this, new Exception("error"));
} else {
- FCPConnectionHandler conn = client.getConnection();
- if(conn != null)
- conn.outputHandler.queue(msg);
+ if(handler != null)
+ handler.queue(msg);
+ else
+ client.queueClientRequestMessage(msg, 0);
}
}
- private void trySendGeneratedURIMessage() {
+ private void trySendGeneratedURIMessage(FCPConnectionOutputHandler
handler) {
FCPMessage msg = new URIGeneratedMessage(generatedURI,
identifier);
- FCPConnectionHandler conn = client.getConnection();
- if(conn != null)
- conn.outputHandler.queue(msg);
+ if(handler != null)
+ handler.queue(msg);
+ else
+ client.queueClientRequestMessage(msg, 0);
}
- private void trySendProgressMessage(FCPMessage msg) {
+ private void trySendProgressMessage(FCPMessage msg, int verbosity,
FCPConnectionOutputHandler handler) {
if(persistenceType != PERSIST_CONNECTION)
progressMessage = msg;
- FCPConnectionHandler conn = client.getConnection();
- if(conn != null)
- conn.outputHandler.queue(msg);
+ if(handler != null)
+ handler.queue(msg);
+ else
+ client.queueClientRequestMessage(msg, verbosity);
}
public void sendPendingMessages(FCPConnectionOutputHandler handler,
boolean includePersistentRequest) {
@@ -274,15 +283,15 @@
return;
}
if(includePersistentRequest) {
- FCPMessage msg = new PersistentPut(identifier, uri,
verbosity, priorityClass, fromDisk, persistenceType, origFilename,
block.clientMetadata.getMIMEType());
+ FCPMessage msg = new PersistentPut(identifier, uri,
verbosity, priorityClass, fromDisk, persistenceType, origFilename,
block.clientMetadata.getMIMEType(), client.isGlobalQueue);
handler.queue(msg);
}
if(generatedURI != null)
- trySendGeneratedURIMessage();
+ trySendGeneratedURIMessage(handler);
if(progressMessage != null)
handler.queue(progressMessage);
if(finished)
- trySendFinalMessage();
+ trySendFinalMessage(handler);
}
/** Request completed. But we may have to stick around until we are
acked. */
@@ -308,8 +317,9 @@
}
public void write(BufferedWriter w) throws IOException {
- if(persistenceType != ClientRequest.PERSIST_REBOOT) {
+ if(persistenceType == ClientRequest.PERSIST_CONNECTION) {
Logger.error(this, "Not persisting as
persistenceType="+persistenceType);
+ return;
}
// Persist the request to disk
SimpleFieldSet fs = getFieldSet();
@@ -349,6 +359,7 @@
if(finished && (!succeeded))
// Should have a putFailedMessage... unless there is a
race condition.
fs.put("PutFailed",
putFailedMessage.getFieldSet(false));
+ fs.put("Global", Boolean.toString(client.isGlobalQueue));
return fs;
}
Modified: trunk/freenet/src/freenet/node/fcp/ClientPutMessage.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientPutMessage.java 2006-03-04
19:51:04 UTC (rev 8160)
+++ trunk/freenet/src/freenet/node/fcp/ClientPutMessage.java 2006-03-04
20:25:41 UTC (rev 8161)
@@ -49,6 +49,7 @@
final boolean dontCompress;
final String clientToken;
final File origFilename;
+ final boolean global;
public ClientPutMessage(SimpleFieldSet fs) throws
MessageInvalidException {
identifier = fs.get("Identifier");
@@ -62,6 +63,7 @@
} catch (MalformedURLException e) {
throw new
MessageInvalidException(ProtocolErrorMessage.URI_PARSE_ERROR, e.getMessage(),
identifier);
}
+ global = Fields.stringToBool(fs.get("Global"), false);
String verbosityString = fs.get("Verbosity");
if(verbosityString == null)
verbosity = 0;
Modified: trunk/freenet/src/freenet/node/fcp/ClientRequest.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientRequest.java 2006-03-04
19:51:04 UTC (rev 8160)
+++ trunk/freenet/src/freenet/node/fcp/ClientRequest.java 2006-03-04
20:25:41 UTC (rev 8161)
@@ -4,6 +4,7 @@
import java.io.BufferedWriter;
import java.io.IOException;
+import freenet.support.Fields;
import freenet.support.Logger;
import freenet.support.SimpleFieldSet;
@@ -66,7 +67,12 @@
public static ClientRequest readAndRegister(BufferedReader br,
FCPServer server) throws IOException {
SimpleFieldSet fs = new SimpleFieldSet(br, true);
String clientName = fs.get("ClientName");
- FCPClient client = server.registerClient(clientName,
server.node, null);
+ boolean isGlobal = Fields.stringToBool(fs.get("Global"), false);
+ FCPClient client;
+ if(!isGlobal)
+ client = server.registerClient(clientName, server.node,
null);
+ else
+ client = server.globalClient;
try {
String type = fs.get("Type");
if(type.equals("GET")) {
Added: trunk/freenet/src/freenet/node/fcp/EndListPersistentRequestsMessage.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/EndListPersistentRequestsMessage.java
2006-03-04 19:51:04 UTC (rev 8160)
+++ trunk/freenet/src/freenet/node/fcp/EndListPersistentRequestsMessage.java
2006-03-04 20:25:41 UTC (rev 8161)
@@ -0,0 +1,23 @@
+package freenet.node.fcp;
+
+import freenet.node.Node;
+import freenet.support.SimpleFieldSet;
+
+public class EndListPersistentRequestsMessage extends FCPMessage {
+
+ static final String name = "EndListPersistentRequests";
+
+ public SimpleFieldSet getFieldSet() {
+ return new SimpleFieldSet(true);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void run(FCPConnectionHandler handler, Node node)
+ throws MessageInvalidException {
+ throw new
MessageInvalidException(ProtocolErrorMessage.INVALID_MESSAGE,
"EndListPersistentRequests goes from server to client not the other way
around", null);
+ }
+
+}
Modified: trunk/freenet/src/freenet/node/fcp/FCPClient.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/FCPClient.java 2006-03-04 19:51:04 UTC
(rev 8160)
+++ trunk/freenet/src/freenet/node/fcp/FCPClient.java 2006-03-04 20:25:41 UTC
(rev 8161)
@@ -3,6 +3,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.Vector;
import freenet.client.FetcherContext;
@@ -10,6 +11,7 @@
import freenet.client.InserterContext;
import freenet.node.Node;
import freenet.support.LRUQueue;
+import freenet.support.Logger;
/**
* An FCP client.
@@ -20,7 +22,7 @@
/** Maximum number of unacknowledged completed requests */
private static final int MAX_UNACKED_REQUESTS = 256;
- public FCPClient(String name2, FCPServer server, FCPConnectionHandler
handler) {
+ public FCPClient(String name2, FCPServer server, FCPConnectionHandler
handler, boolean isGlobalQueue) {
this.name = name2;
this.currentConnection = handler;
this.runningPersistentRequests = new HashSet();
@@ -29,8 +31,11 @@
this.server = server;
this.node = server.node;
this.client = node.makeClient((short)0);
+ this.isGlobalQueue = isGlobalQueue;
defaultFetchContext = client.getFetcherContext();
defaultInsertContext = client.getInserterContext();
+ clientsWatching = new LinkedList();
+ watchGlobalVerbosityMask = Integer.MAX_VALUE;
}
/** The client's Name sent in the ClientHello message */
@@ -50,6 +55,15 @@
public final FetcherContext defaultFetchContext;
public final InserterContext defaultInsertContext;
public final Node node;
+ /** Are we the global queue? */
+ public final boolean isGlobalQueue;
+ /** Are we watching the global queue? */
+ boolean watchGlobal;
+ int watchGlobalVerbosityMask;
+ /** FCPClients watching us */
+ // FIXME how do we lazily init this without synchronization problems?
+ // We obviously can't synchronize on it when it hasn't been constructed
yet...
+ final LinkedList clientsWatching;
public synchronized FCPConnectionHandler getConnection() {
return currentConnection;
@@ -150,5 +164,58 @@
v.add(unacked[j]);
}
}
+
+ /**
+ * Enable or disable watch-the-global-queue.
+ * @param enabled Whether we want watch-global-queue to be enabled.
+ * @param verbosityMask If so, what verbosity mask to use (to filter
messages
+ * generated by the global queue).
+ */
+ public void setWatchGlobal(boolean enabled, int verbosityMask) {
+ if(isGlobalQueue) {
+ Logger.error(this, "Set watch global on global queue!");
+ return;
+ }
+ if(watchGlobal && !enabled) {
+ server.globalClient.unwatch(this);
+ watchGlobal = false;
+ } else if(enabled && !watchGlobal) {
+ server.globalClient.watch(this);
+ watchGlobal = true;
+ }
+ // Otherwise the status is unchanged.
+ this.watchGlobalVerbosityMask = verbosityMask;
+ }
+
+ public void queueClientRequestMessage(FCPMessage msg, int
verbosityLevel) {
+ if((verbosityLevel & watchGlobalVerbosityMask) !=
verbosityLevel)
+ return;
+ FCPConnectionHandler conn = getConnection();
+ if(conn != null) {
+ conn.outputHandler.queue(msg);
+ }
+ FCPClient[] clients;
+ if(isGlobalQueue) {
+ synchronized(clientsWatching) {
+ clients = (FCPClient[])
clientsWatching.toArray(new FCPClient[clientsWatching.size()]);
+ }
+ for(int i=0;i<clients.length;i++)
+ clients[i].queueClientRequestMessage(msg,
verbosityLevel);
+ }
+ }
+ private void unwatch(FCPClient client) {
+ if(!isGlobalQueue) return;
+ synchronized(clientsWatching) {
+ clientsWatching.remove(client);
+ }
+ }
+
+ private void watch(FCPClient client) {
+ if(!isGlobalQueue) return;
+ synchronized(clientsWatching) {
+ clientsWatching.add(client);
+ }
+ }
+
}
Modified: trunk/freenet/src/freenet/node/fcp/FCPMessage.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/FCPMessage.java 2006-03-04 19:51:04 UTC
(rev 8160)
+++ trunk/freenet/src/freenet/node/fcp/FCPMessage.java 2006-03-04 20:25:41 UTC
(rev 8161)
@@ -37,6 +37,8 @@
return new ListPersistentRequestsMessage(fs);
if(name.equals(RemovePersistentRequest.name))
return new RemovePersistentRequest(fs);
+ if(name.equals(WatchGlobal.name))
+ return new WatchGlobal(fs);
if(name.equals("Void"))
return null;
throw new
MessageInvalidException(ProtocolErrorMessage.INVALID_MESSAGE, "Unknown message
name "+name, null);
Modified: trunk/freenet/src/freenet/node/fcp/FCPServer.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/FCPServer.java 2006-03-04 19:51:04 UTC
(rev 8160)
+++ trunk/freenet/src/freenet/node/fcp/FCPServer.java 2006-03-04 20:25:41 UTC
(rev 8161)
@@ -42,6 +42,7 @@
final boolean enabled;
final String bindto;
final WeakHashMap clientsByName;
+ final FCPClient globalClient;
private boolean enablePersistentDownloads;
private File persistentDownloadsFile;
private File persistentDownloadsTempFile;
@@ -81,6 +82,7 @@
defaultInsertContext = client.getInserterContext();
Thread t = new Thread(this, "FCP server");
this.enablePersistentDownloads = persistentDownloadsEnabled;
+ globalClient = new FCPClient("Global Queue", this, null, true);
setPersistentDownloadsFile(new File(persistentDownloadsDir));
t.setDaemon(true);
t.start();
@@ -315,7 +317,7 @@
oldClient = (FCPClient) clientsByName.get(name);
if(oldClient == null) {
// Create new client
- FCPClient client = new FCPClient(name, this,
handler);
+ FCPClient client = new FCPClient(name, this,
handler, false);
clientsByName.put(name, client);
return client;
} else {
@@ -433,35 +435,30 @@
Logger.normal(this, "Not reading any persistent
requests from disk because no file exists");
return;
}
- BufferedInputStream bis = new BufferedInputStream(fis);
- InputStreamReader ris = new InputStreamReader(bis);
- BufferedReader br = new BufferedReader(ris);
try {
+ BufferedInputStream bis = new
BufferedInputStream(fis);
+ InputStreamReader ris = new
InputStreamReader(bis);
+ BufferedReader br = new BufferedReader(ris);
String r = br.readLine();
int count;
try {
count = Integer.parseInt(r);
} catch (NumberFormatException e) {
Logger.error(this, "Corrupt persistent
downloads file: "+persistentDownloadsFile);
- try {
- br.close();
- } catch (IOException e1) {
- Logger.error(this, "Error
closing: "+e1, e1);
- }
return;
}
for(int i=0;i<count;i++) {
ClientRequest req =
ClientRequest.readAndRegister(br, this);
}
- br.close();
} catch (IOException e) {
Logger.error(this, "Error reading persistent
downloads file: "+persistentDownloadsFile+" : "+e, e);
+ return;
+ } finally {
try {
- br.close();
+ fis.close();
} catch (IOException e1) {
Logger.error(this, "Error closing:
"+e1, e1);
}
- return;
}
return;
}
@@ -475,6 +472,7 @@
FCPClient client = (FCPClient) (i.next());
client.addPersistentRequests(v);
}
+ globalClient.addPersistentRequests(v);
}
return (ClientRequest[]) v.toArray(new ClientRequest[v.size()]);
}
Modified: trunk/freenet/src/freenet/node/fcp/ListPersistentRequestsMessage.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ListPersistentRequestsMessage.java
2006-03-04 19:51:04 UTC (rev 8160)
+++ trunk/freenet/src/freenet/node/fcp/ListPersistentRequestsMessage.java
2006-03-04 20:25:41 UTC (rev 8161)
@@ -23,6 +23,11 @@
throws MessageInvalidException {
handler.getClient().queuePendingMessagesOnConnectionRestart(handler.outputHandler);
handler.getClient().queuePendingMessagesFromRunningRequests(handler.outputHandler);
+ if(handler.getClient().watchGlobal) {
+
handler.server.globalClient.queuePendingMessagesOnConnectionRestart(handler.outputHandler);
+
handler.server.globalClient.queuePendingMessagesFromRunningRequests(handler.outputHandler);
+ }
+ handler.outputHandler.queue(new
EndListPersistentRequestsMessage());
}
}
Modified: trunk/freenet/src/freenet/node/fcp/PersistentGet.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/PersistentGet.java 2006-03-04
19:51:04 UTC (rev 8160)
+++ trunk/freenet/src/freenet/node/fcp/PersistentGet.java 2006-03-04
20:25:41 UTC (rev 8161)
@@ -24,10 +24,11 @@
final File targetFile;
final File tempFile;
final String clientToken;
+ final boolean global;
public PersistentGet(String identifier, FreenetURI uri, int verbosity,
short priorityClass, short returnType, short
persistenceType,
- File targetFile, File tempFile, String clientToken) {
+ File targetFile, File tempFile, String clientToken,
boolean global) {
this.identifier = identifier;
this.uri = uri;
this.verbosity = verbosity;
@@ -37,6 +38,7 @@
this.targetFile = targetFile;
this.tempFile = tempFile;
this.clientToken = clientToken;
+ this.global = global;
}
public SimpleFieldSet getFieldSet() {
@@ -52,6 +54,7 @@
}
if(clientToken != null)
fs.put("ClientToken", clientToken);
+ fs.put("Global", Boolean.toString(global));
return fs;
}
Modified: trunk/freenet/src/freenet/node/fcp/PersistentPut.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/PersistentPut.java 2006-03-04
19:51:04 UTC (rev 8160)
+++ trunk/freenet/src/freenet/node/fcp/PersistentPut.java 2006-03-04
20:25:41 UTC (rev 8161)
@@ -18,10 +18,11 @@
final short persistenceType;
final File origFilename;
final String mimeType;
+ final boolean global;
public PersistentPut(String identifier, FreenetURI uri, int verbosity,
short priorityClass, boolean fromDisk, short
persistenceType,
- File origFilename, String mimeType) {
+ File origFilename, String mimeType, boolean global) {
this.identifier = identifier;
this.uri = uri;
this.verbosity = verbosity;
@@ -30,6 +31,7 @@
this.persistenceType = persistenceType;
this.origFilename = origFilename;
this.mimeType = mimeType;
+ this.global = global;
}
public SimpleFieldSet getFieldSet() {
@@ -44,6 +46,7 @@
fs.put("Filename", origFilename.getAbsolutePath());
if(mimeType != null)
fs.put("Metadata.ContentType", mimeType);
+ fs.put("Global", Boolean.toString(global));
return fs;
}
Added: trunk/freenet/src/freenet/node/fcp/WatchGlobal.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/WatchGlobal.java 2006-03-04 19:51:04 UTC
(rev 8160)
+++ trunk/freenet/src/freenet/node/fcp/WatchGlobal.java 2006-03-04 20:25:41 UTC
(rev 8161)
@@ -0,0 +1,42 @@
+package freenet.node.fcp;
+
+import freenet.node.Node;
+import freenet.support.Fields;
+import freenet.support.SimpleFieldSet;
+
+public class WatchGlobal extends FCPMessage {
+
+ final boolean enabled;
+ final int verbosityMask;
+ static final String name = "WatchGlobal";
+
+ public WatchGlobal(SimpleFieldSet fs) throws MessageInvalidException {
+ enabled = Fields.stringToBool(fs.get("Enabled"), true);
+ String s = fs.get("VerbosityMask");
+ if(s != null)
+ try {
+ verbosityMask = Integer.parseInt(s);
+ } catch (NumberFormatException e) {
+ throw new
MessageInvalidException(ProtocolErrorMessage.ERROR_PARSING_NUMBER,
e.toString(), null);
+ }
+ else
+ verbosityMask = Integer.MAX_VALUE;
+ }
+
+ public SimpleFieldSet getFieldSet() {
+ SimpleFieldSet fs = new SimpleFieldSet(true);
+ fs.put("Enabled", Boolean.toString(enabled));
+ fs.put("VerbosityMask", Integer.toString(verbosityMask));
+ return fs;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void run(FCPConnectionHandler handler, Node node)
+ throws MessageInvalidException {
+ handler.getClient().setWatchGlobal(enabled, verbosityMask);
+ }
+
+}