Author: toad
Date: 2006-03-04 20:25:41 +0000 (Sat, 04 Mar 2006)
New Revision: 8161

Added:
   trunk/freenet/src/freenet/node/fcp/EndListPersistentRequestsMessage.java
   trunk/freenet/src/freenet/node/fcp/WatchGlobal.java
Modified:
   trunk/freenet/src/freenet/node/Version.java
   trunk/freenet/src/freenet/node/fcp/ClientGet.java
   trunk/freenet/src/freenet/node/fcp/ClientGetMessage.java
   trunk/freenet/src/freenet/node/fcp/ClientPut.java
   trunk/freenet/src/freenet/node/fcp/ClientPutMessage.java
   trunk/freenet/src/freenet/node/fcp/ClientRequest.java
   trunk/freenet/src/freenet/node/fcp/FCPClient.java
   trunk/freenet/src/freenet/node/fcp/FCPMessage.java
   trunk/freenet/src/freenet/node/fcp/FCPServer.java
   trunk/freenet/src/freenet/node/fcp/ListPersistentRequestsMessage.java
   trunk/freenet/src/freenet/node/fcp/PersistentGet.java
   trunk/freenet/src/freenet/node/fcp/PersistentPut.java
Log:
494:
Global queue support, apparently working (very lightly tested).
Add EndListPersistentRequests at end of a persistent request list call.

Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2006-03-04 19:51:04 UTC (rev 
8160)
+++ trunk/freenet/src/freenet/node/Version.java 2006-03-04 20:25:41 UTC (rev 
8161)
@@ -20,7 +20,7 @@
        public static final String protocolVersion = "1.0";

        /** The build number of the current revision */
-       private static final int buildNumber = 493;
+       private static final int buildNumber = 494;

        /** Oldest build of Fred we will talk to */
        private static final int lastGoodBuild = 475;

Modified: trunk/freenet/src/freenet/node/fcp/ClientGet.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientGet.java   2006-03-04 19:51:04 UTC 
(rev 8160)
+++ trunk/freenet/src/freenet/node/fcp/ClientGet.java   2006-03-04 20:25:41 UTC 
(rev 8161)
@@ -86,7 +86,11 @@
                        this.origHandler = handler;
                else
                        origHandler = null;
-               this.client = handler.getClient();
+               if(message.global) {
+                       client = handler.server.globalClient;
+               } else {
+                       client = handler.getClient();
+               }
                fctx = new FetcherContext(client.defaultFetchContext, 
FetcherContext.IDENTICAL_MASK);
                fctx.eventProducer.addEventListener(this);
                // ignoreDS
@@ -124,6 +128,9 @@
                }
                returnBucket = ret;
                getter = new ClientGetter(this, client.node.fetchScheduler, 
uri, fctx, priorityClass, client, returnBucket);
+               if(persistenceType != PERSIST_CONNECTION && handler != null)
+                       sendPendingMessages(handler.outputHandler, true);
+                       
        }

        /**
@@ -267,15 +274,15 @@
                        this.succeeded = true;
                        finished = true;
                }
-               trySendDataFoundOrGetFailed();
+               trySendDataFoundOrGetFailed(null);
                if(adm != null)
-                       trySendAllDataMessage(adm);
+                       trySendAllDataMessage(adm, null);
                if(!dontFree)
                        data.free();
                finish();
        }

-       private void trySendDataFoundOrGetFailed() {
+       private void trySendDataFoundOrGetFailed(FCPConnectionOutputHandler 
handler) {

                FCPMessage msg;

@@ -284,31 +291,32 @@
                } else {
                        msg = getFailedMessage;
                }
-               
-               FCPConnectionHandler conn = client.getConnection();
-               if(conn != null) {
-                       conn.outputHandler.queue(msg);
-                       if(postFetchProtocolErrorMessage != null)
-                               
conn.outputHandler.queue(postFetchProtocolErrorMessage);
+
+               if(handler != null)
+                       handler.queue(msg);
+               else
+                       client.queueClientRequestMessage(msg, 0);
+               if(postFetchProtocolErrorMessage != null) {
+                       if(handler != null)
+                               handler.queue(postFetchProtocolErrorMessage);
+                       else
+                               
client.queueClientRequestMessage(postFetchProtocolErrorMessage, 0);
                }
+
        }

-       private void trySendAllDataMessage(AllDataMessage msg) {
+       private void trySendAllDataMessage(AllDataMessage msg, 
FCPConnectionOutputHandler handler) {
                if(persistenceType != ClientRequest.PERSIST_CONNECTION) {
                        allDataPending = msg;
                }
-               FCPConnectionHandler conn = client.getConnection();
-               if(conn != null)
-                       conn.outputHandler.queue(msg);
+               client.queueClientRequestMessage(msg, 0);
        }

-       private void trySendProgress(SimpleProgressMessage msg) {
+       private void trySendProgress(SimpleProgressMessage msg, 
FCPConnectionOutputHandler handler) {
                if(persistenceType != ClientRequest.PERSIST_CONNECTION) {
                        progressPending = msg;
                }
-               FCPConnectionHandler conn = client.getConnection();
-               if(conn != null)
-                       conn.outputHandler.queue(msg);
+               client.queueClientRequestMessage(msg, 
VERBOSITY_SPLITFILE_PROGRESS);
        }

        public void sendPendingMessages(FCPConnectionOutputHandler handler, 
boolean includePersistentRequest) {
@@ -317,13 +325,13 @@
                        return;
                }
                if(includePersistentRequest) {
-                       FCPMessage msg = new PersistentGet(identifier, uri, 
verbosity, priorityClass, returnType, persistenceType, targetFile, tempFile, 
clientToken);
+                       FCPMessage msg = new PersistentGet(identifier, uri, 
verbosity, priorityClass, returnType, persistenceType, targetFile, tempFile, 
clientToken, client.isGlobalQueue);
                        handler.queue(msg);
                }
                if(progressPending != null)
                        handler.queue(progressPending);
                if(finished)
-                       trySendDataFoundOrGetFailed();
+                       trySendDataFoundOrGetFailed(handler);
                if(allDataPending != null)
                        handler.queue(allDataPending);
        }
@@ -335,7 +343,7 @@
                        finished = true;
                }
                Logger.minor(this, "Caught "+e, e);
-               trySendDataFoundOrGetFailed();
+               trySendDataFoundOrGetFailed(null);
                finish();
        }

@@ -367,7 +375,7 @@
                        return;
                SimpleProgressMessage progress = 
                        new SimpleProgressMessage(identifier, 
(SplitfileProgressEvent)ce);
-               trySendProgress(progress);
+               trySendProgress(progress, null);
        }

        public boolean isPersistent() {
@@ -441,6 +449,7 @@
                        fs.put("ReturnBucket.DecryptKey", 
HexUtil.bytesToHex(b.getKey()));
                        fs.put("ReturnBucket.Filename", 
((FileBucket)b.getUnderlying()).getName());
                }
+               fs.put("Global", Boolean.toString(client.isGlobalQueue));
                return fs;
        }


Modified: trunk/freenet/src/freenet/node/fcp/ClientGetMessage.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientGetMessage.java    2006-03-04 
19:51:04 UTC (rev 8160)
+++ trunk/freenet/src/freenet/node/fcp/ClientGetMessage.java    2006-03-04 
20:25:41 UTC (rev 8161)
@@ -50,6 +50,7 @@
        final File diskFile;
        final File tempFile;
        final String clientToken;
+       final boolean global;

        // FIXME move these to the actual getter process
        static final short RETURN_TYPE_DIRECT = 0; // over FCP
@@ -60,6 +61,7 @@
        public ClientGetMessage(SimpleFieldSet fs) throws 
MessageInvalidException {
                short defaultPriority;
                clientToken = fs.get("ClientToken");
+               global = Fields.stringToBool(fs.get("Global"), false);
                ignoreDS = Fields.stringToBool(fs.get("IgnoreDS"), false);
                dsOnly = Fields.stringToBool(fs.get("DSOnly"), false);
                identifier = fs.get("Identifier");

Modified: trunk/freenet/src/freenet/node/fcp/ClientPut.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientPut.java   2006-03-04 19:51:04 UTC 
(rev 8160)
+++ trunk/freenet/src/freenet/node/fcp/ClientPut.java   2006-03-04 20:25:41 UTC 
(rev 8161)
@@ -85,7 +85,11 @@
                        this.origHandler = handler;
                else
                        this.origHandler = null;
-               client = handler.getClient();
+               if(message.global) {
+                       client = handler.server.globalClient;
+               } else {
+                       client = handler.getClient();
+               }
                ctx = new InserterContext(client.defaultInsertContext, new 
SimpleEventProducer());
                ctx.dontCompress = message.dontCompress;
                ctx.eventProducer.addEventListener(this);
@@ -96,6 +100,8 @@
                clientToken = message.clientToken;
                block = new InsertBlock(message.bucket, new 
ClientMetadata(mimeType), uri);
                inserter = new ClientPutter(this, message.bucket, uri, new 
ClientMetadata(mimeType), ctx, client.node.putScheduler, priorityClass, 
getCHKOnly, false, client);
+               if(persistenceType != PERSIST_CONNECTION && handler != null)
+                       sendPendingMessages(handler.outputHandler, true);
        }

        /**
@@ -179,7 +185,7 @@
                        succeeded = true;
                        finished = true;
                }
-               trySendFinalMessage();
+               trySendFinalMessage(null);
                block.getData().free();
                finish();
        }
@@ -189,7 +195,7 @@
                        finished = true;
                        putFailedMessage = new PutFailedMessage(e, identifier);
                }
-               trySendFinalMessage();
+               trySendFinalMessage(null);
                block.getData().free();
                finish();
        }
@@ -200,7 +206,7 @@
                                Logger.error(this, 
"onGeneratedURI("+uri+","+state+") but already set generatedURI to 
"+generatedURI);
                        generatedURI = uri;
                }
-               trySendGeneratedURIMessage();
+               trySendGeneratedURIMessage(null);
        }

        public void onSuccess(FetchResult result, ClientGetter state) {
@@ -217,24 +223,24 @@
                        if((verbosity & VERBOSITY_SPLITFILE_PROGRESS) == 
VERBOSITY_SPLITFILE_PROGRESS) {
                                SimpleProgressMessage progress = 
                                        new SimpleProgressMessage(identifier, 
(SplitfileProgressEvent)ce);
-                               trySendProgressMessage(progress);
+                               trySendProgressMessage(progress, 
VERBOSITY_SPLITFILE_PROGRESS, null);
                        }
                } else if(ce instanceof StartedCompressionEvent) {
                        if((verbosity & VERBOSITY_COMPRESSION_START_END) == 
VERBOSITY_COMPRESSION_START_END) {
                                StartedCompressionMessage msg =
                                        new 
StartedCompressionMessage(identifier, ((StartedCompressionEvent)ce).codec);
-                               trySendProgressMessage(msg);
+                               trySendProgressMessage(msg, 
VERBOSITY_COMPRESSION_START_END, null);
                        }
                } else if(ce instanceof FinishedCompressionEvent) {
                        if((verbosity & VERBOSITY_COMPRESSION_START_END) == 
VERBOSITY_COMPRESSION_START_END) {
                                FinishedCompressionMessage msg = 
                                        new 
FinishedCompressionMessage(identifier, (FinishedCompressionEvent)ce);
-                               trySendProgressMessage(msg);
+                               trySendProgressMessage(msg, 
VERBOSITY_COMPRESSION_START_END, null);
                        }
                }
        }

-       private void trySendFinalMessage() {
+       private void trySendFinalMessage(FCPConnectionOutputHandler handler) {

                FCPMessage msg;

@@ -247,25 +253,28 @@
                if(msg == null) {
                        Logger.error(this, "Trying to send null message on 
"+this, new Exception("error"));
                } else {
-                       FCPConnectionHandler conn = client.getConnection();
-                       if(conn != null)
-                               conn.outputHandler.queue(msg);
+                       if(handler != null)
+                               handler.queue(msg);
+                       else
+                               client.queueClientRequestMessage(msg, 0);
                }
        }

-       private void trySendGeneratedURIMessage() {
+       private void trySendGeneratedURIMessage(FCPConnectionOutputHandler 
handler) {
                FCPMessage msg = new URIGeneratedMessage(generatedURI, 
identifier);
-               FCPConnectionHandler conn = client.getConnection();
-               if(conn != null)
-                       conn.outputHandler.queue(msg);
+               if(handler != null)
+                       handler.queue(msg);
+               else
+                       client.queueClientRequestMessage(msg, 0);
        }

-       private void trySendProgressMessage(FCPMessage msg) {
+       private void trySendProgressMessage(FCPMessage msg, int verbosity, 
FCPConnectionOutputHandler handler) {
                if(persistenceType != PERSIST_CONNECTION)
                        progressMessage = msg;
-               FCPConnectionHandler conn = client.getConnection();
-               if(conn != null)
-                       conn.outputHandler.queue(msg);
+               if(handler != null)
+                       handler.queue(msg);
+               else
+                       client.queueClientRequestMessage(msg, verbosity);
        }

        public void sendPendingMessages(FCPConnectionOutputHandler handler, 
boolean includePersistentRequest) {
@@ -274,15 +283,15 @@
                        return;
                }
                if(includePersistentRequest) {
-                       FCPMessage msg = new PersistentPut(identifier, uri, 
verbosity, priorityClass, fromDisk, persistenceType, origFilename, 
block.clientMetadata.getMIMEType());
+                       FCPMessage msg = new PersistentPut(identifier, uri, 
verbosity, priorityClass, fromDisk, persistenceType, origFilename, 
block.clientMetadata.getMIMEType(), client.isGlobalQueue);
                        handler.queue(msg);
                }
                if(generatedURI != null)
-                       trySendGeneratedURIMessage();
+                       trySendGeneratedURIMessage(handler);
                if(progressMessage != null)
                        handler.queue(progressMessage);
                if(finished)
-                       trySendFinalMessage();
+                       trySendFinalMessage(handler);
        }

        /** Request completed. But we may have to stick around until we are 
acked. */
@@ -308,8 +317,9 @@
        }

        public void write(BufferedWriter w) throws IOException {
-               if(persistenceType != ClientRequest.PERSIST_REBOOT) {
+               if(persistenceType == ClientRequest.PERSIST_CONNECTION) {
                        Logger.error(this, "Not persisting as 
persistenceType="+persistenceType);
+                       return;
                }
                // Persist the request to disk
                SimpleFieldSet fs = getFieldSet();
@@ -349,6 +359,7 @@
                if(finished && (!succeeded))
                        // Should have a putFailedMessage... unless there is a 
race condition.
                        fs.put("PutFailed", 
putFailedMessage.getFieldSet(false));
+               fs.put("Global", Boolean.toString(client.isGlobalQueue));
                return fs;
        }


Modified: trunk/freenet/src/freenet/node/fcp/ClientPutMessage.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientPutMessage.java    2006-03-04 
19:51:04 UTC (rev 8160)
+++ trunk/freenet/src/freenet/node/fcp/ClientPutMessage.java    2006-03-04 
20:25:41 UTC (rev 8161)
@@ -49,6 +49,7 @@
        final boolean dontCompress;
        final String clientToken;
        final File origFilename;
+       final boolean global;

        public ClientPutMessage(SimpleFieldSet fs) throws 
MessageInvalidException {
                identifier = fs.get("Identifier");
@@ -62,6 +63,7 @@
                } catch (MalformedURLException e) {
                        throw new 
MessageInvalidException(ProtocolErrorMessage.URI_PARSE_ERROR, e.getMessage(), 
identifier);
                }
+               global = Fields.stringToBool(fs.get("Global"), false);
                String verbosityString = fs.get("Verbosity");
                if(verbosityString == null)
                        verbosity = 0;

Modified: trunk/freenet/src/freenet/node/fcp/ClientRequest.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientRequest.java       2006-03-04 
19:51:04 UTC (rev 8160)
+++ trunk/freenet/src/freenet/node/fcp/ClientRequest.java       2006-03-04 
20:25:41 UTC (rev 8161)
@@ -4,6 +4,7 @@
 import java.io.BufferedWriter;
 import java.io.IOException;

+import freenet.support.Fields;
 import freenet.support.Logger;
 import freenet.support.SimpleFieldSet;

@@ -66,7 +67,12 @@
        public static ClientRequest readAndRegister(BufferedReader br, 
FCPServer server) throws IOException {
                SimpleFieldSet fs = new SimpleFieldSet(br, true);
                String clientName = fs.get("ClientName");
-               FCPClient client = server.registerClient(clientName, 
server.node, null);
+               boolean isGlobal = Fields.stringToBool(fs.get("Global"), false);
+               FCPClient client;
+               if(!isGlobal)
+                       client = server.registerClient(clientName, server.node, 
null);
+               else
+                       client = server.globalClient;
                try {
                        String type = fs.get("Type");
                        if(type.equals("GET")) {

Added: trunk/freenet/src/freenet/node/fcp/EndListPersistentRequestsMessage.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/EndListPersistentRequestsMessage.java    
2006-03-04 19:51:04 UTC (rev 8160)
+++ trunk/freenet/src/freenet/node/fcp/EndListPersistentRequestsMessage.java    
2006-03-04 20:25:41 UTC (rev 8161)
@@ -0,0 +1,23 @@
+package freenet.node.fcp;
+
+import freenet.node.Node;
+import freenet.support.SimpleFieldSet;
+
+public class EndListPersistentRequestsMessage extends FCPMessage {
+
+       static final String name = "EndListPersistentRequests";
+       
+       public SimpleFieldSet getFieldSet() {
+               return new SimpleFieldSet(true);
+       }
+
+       public String getName() {
+               return name;
+       }
+
+       public void run(FCPConnectionHandler handler, Node node)
+                       throws MessageInvalidException {
+               throw new 
MessageInvalidException(ProtocolErrorMessage.INVALID_MESSAGE, 
"EndListPersistentRequests goes from server to client not the other way 
around", null);
+       }
+
+}

Modified: trunk/freenet/src/freenet/node/fcp/FCPClient.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/FCPClient.java   2006-03-04 19:51:04 UTC 
(rev 8160)
+++ trunk/freenet/src/freenet/node/fcp/FCPClient.java   2006-03-04 20:25:41 UTC 
(rev 8161)
@@ -3,6 +3,7 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Vector;

 import freenet.client.FetcherContext;
@@ -10,6 +11,7 @@
 import freenet.client.InserterContext;
 import freenet.node.Node;
 import freenet.support.LRUQueue;
+import freenet.support.Logger;

 /**
  * An FCP client.
@@ -20,7 +22,7 @@
        /** Maximum number of unacknowledged completed requests */
        private static final int MAX_UNACKED_REQUESTS = 256;

-       public FCPClient(String name2, FCPServer server, FCPConnectionHandler 
handler) {
+       public FCPClient(String name2, FCPServer server, FCPConnectionHandler 
handler, boolean isGlobalQueue) {
                this.name = name2;
                this.currentConnection = handler;
                this.runningPersistentRequests = new HashSet();
@@ -29,8 +31,11 @@
                this.server = server;
                this.node = server.node;
                this.client = node.makeClient((short)0);
+               this.isGlobalQueue = isGlobalQueue;
                defaultFetchContext = client.getFetcherContext();
                defaultInsertContext = client.getInserterContext();
+               clientsWatching = new LinkedList();
+               watchGlobalVerbosityMask = Integer.MAX_VALUE;
        }

        /** The client's Name sent in the ClientHello message */
@@ -50,6 +55,15 @@
        public final FetcherContext defaultFetchContext;
        public final InserterContext defaultInsertContext;
        public final Node node;
+       /** Are we the global queue? */
+       public final boolean isGlobalQueue;
+       /** Are we watching the global queue? */
+       boolean watchGlobal;
+       int watchGlobalVerbosityMask;
+       /** FCPClients watching us */
+       // FIXME how do we lazily init this without synchronization problems?
+       // We obviously can't synchronize on it when it hasn't been constructed 
yet...
+       final LinkedList clientsWatching;

        public synchronized FCPConnectionHandler getConnection() {
                return currentConnection;
@@ -150,5 +164,58 @@
                                v.add(unacked[j]);
                }
        }
+
+       /**
+        * Enable or disable watch-the-global-queue.
+        * @param enabled Whether we want watch-global-queue to be enabled.
+        * @param verbosityMask If so, what verbosity mask to use (to filter 
messages
+        * generated by the global queue).
+        */
+       public void setWatchGlobal(boolean enabled, int verbosityMask) {
+               if(isGlobalQueue) {
+                       Logger.error(this, "Set watch global on global queue!");
+                       return;
+               }
+               if(watchGlobal && !enabled) {
+                       server.globalClient.unwatch(this);
+                       watchGlobal = false;
+               } else if(enabled && !watchGlobal) {
+                       server.globalClient.watch(this);
+                       watchGlobal = true;
+               }
+               // Otherwise the status is unchanged.
+               this.watchGlobalVerbosityMask = verbosityMask;
+       }
+
+       public void queueClientRequestMessage(FCPMessage msg, int 
verbosityLevel) {
+               if((verbosityLevel & watchGlobalVerbosityMask) != 
verbosityLevel)
+                       return;
+               FCPConnectionHandler conn = getConnection();
+               if(conn != null) {
+                       conn.outputHandler.queue(msg);
+               }
+               FCPClient[] clients;
+               if(isGlobalQueue) {
+                       synchronized(clientsWatching) {
+                               clients = (FCPClient[]) 
clientsWatching.toArray(new FCPClient[clientsWatching.size()]);
+                       }
+                       for(int i=0;i<clients.length;i++)
+                               clients[i].queueClientRequestMessage(msg, 
verbosityLevel);
+               }
+       }

+       private void unwatch(FCPClient client) {
+               if(!isGlobalQueue) return;
+               synchronized(clientsWatching) {
+                       clientsWatching.remove(client);
+               }
+       }
+
+       private void watch(FCPClient client) {
+               if(!isGlobalQueue) return;
+               synchronized(clientsWatching) {
+                       clientsWatching.add(client);
+               }
+       }
+
 }

Modified: trunk/freenet/src/freenet/node/fcp/FCPMessage.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/FCPMessage.java  2006-03-04 19:51:04 UTC 
(rev 8160)
+++ trunk/freenet/src/freenet/node/fcp/FCPMessage.java  2006-03-04 20:25:41 UTC 
(rev 8161)
@@ -37,6 +37,8 @@
                        return new ListPersistentRequestsMessage(fs);
                if(name.equals(RemovePersistentRequest.name))
                        return new RemovePersistentRequest(fs);
+               if(name.equals(WatchGlobal.name))
+                       return new WatchGlobal(fs);
                if(name.equals("Void"))
                        return null;
                throw new 
MessageInvalidException(ProtocolErrorMessage.INVALID_MESSAGE, "Unknown message 
name "+name, null);

Modified: trunk/freenet/src/freenet/node/fcp/FCPServer.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/FCPServer.java   2006-03-04 19:51:04 UTC 
(rev 8160)
+++ trunk/freenet/src/freenet/node/fcp/FCPServer.java   2006-03-04 20:25:41 UTC 
(rev 8161)
@@ -42,6 +42,7 @@
        final boolean enabled;
        final String bindto;
        final WeakHashMap clientsByName;
+       final FCPClient globalClient;
        private boolean enablePersistentDownloads;
        private File persistentDownloadsFile;
        private File persistentDownloadsTempFile;
@@ -81,6 +82,7 @@
                defaultInsertContext = client.getInserterContext();
                Thread t = new Thread(this, "FCP server");
                this.enablePersistentDownloads = persistentDownloadsEnabled;
+               globalClient = new FCPClient("Global Queue", this, null, true);
                setPersistentDownloadsFile(new File(persistentDownloadsDir));
                t.setDaemon(true);
                t.start();
@@ -315,7 +317,7 @@
                        oldClient = (FCPClient) clientsByName.get(name);
                        if(oldClient == null) {
                                // Create new client
-                               FCPClient client = new FCPClient(name, this, 
handler);
+                               FCPClient client = new FCPClient(name, this, 
handler, false);
                                clientsByName.put(name, client);
                                return client;
                        } else {
@@ -433,35 +435,30 @@
                                Logger.normal(this, "Not reading any persistent 
requests from disk because no file exists");
                                return;
                        }
-                       BufferedInputStream bis = new BufferedInputStream(fis);
-                       InputStreamReader ris = new InputStreamReader(bis);
-                       BufferedReader br = new BufferedReader(ris);
                        try {
+                               BufferedInputStream bis = new 
BufferedInputStream(fis);
+                               InputStreamReader ris = new 
InputStreamReader(bis);
+                               BufferedReader br = new BufferedReader(ris);
                                String r = br.readLine();
                                int count;
                                try {
                                        count = Integer.parseInt(r);
                                } catch (NumberFormatException e) {
                                        Logger.error(this, "Corrupt persistent 
downloads file: "+persistentDownloadsFile);
-                                       try {
-                                               br.close();
-                                       } catch (IOException e1) {
-                                               Logger.error(this, "Error 
closing: "+e1, e1);
-                                       }
                                        return;
                                }
                                for(int i=0;i<count;i++) {
                                        ClientRequest req = 
ClientRequest.readAndRegister(br, this);
                                }
-                               br.close();
                        } catch (IOException e) {
                                Logger.error(this, "Error reading persistent 
downloads file: "+persistentDownloadsFile+" : "+e, e);
+                               return;
+                       } finally {
                                try {
-                                       br.close();
+                                       fis.close();
                                } catch (IOException e1) {
                                        Logger.error(this, "Error closing: 
"+e1, e1);
                                }
-                               return;
                        }
                        return;
                }
@@ -475,6 +472,7 @@
                                FCPClient client = (FCPClient) (i.next());
                                client.addPersistentRequests(v);
                        }
+                       globalClient.addPersistentRequests(v);
                }
                return (ClientRequest[]) v.toArray(new ClientRequest[v.size()]);
        }

Modified: trunk/freenet/src/freenet/node/fcp/ListPersistentRequestsMessage.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ListPersistentRequestsMessage.java       
2006-03-04 19:51:04 UTC (rev 8160)
+++ trunk/freenet/src/freenet/node/fcp/ListPersistentRequestsMessage.java       
2006-03-04 20:25:41 UTC (rev 8161)
@@ -23,6 +23,11 @@
                        throws MessageInvalidException {
                
handler.getClient().queuePendingMessagesOnConnectionRestart(handler.outputHandler);
                
handler.getClient().queuePendingMessagesFromRunningRequests(handler.outputHandler);
+               if(handler.getClient().watchGlobal) {
+                       
handler.server.globalClient.queuePendingMessagesOnConnectionRestart(handler.outputHandler);
+                       
handler.server.globalClient.queuePendingMessagesFromRunningRequests(handler.outputHandler);
+               }
+               handler.outputHandler.queue(new 
EndListPersistentRequestsMessage());
        }

 }

Modified: trunk/freenet/src/freenet/node/fcp/PersistentGet.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/PersistentGet.java       2006-03-04 
19:51:04 UTC (rev 8160)
+++ trunk/freenet/src/freenet/node/fcp/PersistentGet.java       2006-03-04 
20:25:41 UTC (rev 8161)
@@ -24,10 +24,11 @@
        final File targetFile;
        final File tempFile;
        final String clientToken;
+       final boolean global;

        public PersistentGet(String identifier, FreenetURI uri, int verbosity, 
                        short priorityClass, short returnType, short 
persistenceType, 
-                       File targetFile, File tempFile, String clientToken) {
+                       File targetFile, File tempFile, String clientToken, 
boolean global) {
                this.identifier = identifier;
                this.uri = uri;
                this.verbosity = verbosity;
@@ -37,6 +38,7 @@
                this.targetFile = targetFile;
                this.tempFile = tempFile;
                this.clientToken = clientToken;
+               this.global = global;
        }

        public SimpleFieldSet getFieldSet() {
@@ -52,6 +54,7 @@
                }
                if(clientToken != null)
                        fs.put("ClientToken", clientToken);
+               fs.put("Global", Boolean.toString(global));
                return fs;
        }


Modified: trunk/freenet/src/freenet/node/fcp/PersistentPut.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/PersistentPut.java       2006-03-04 
19:51:04 UTC (rev 8160)
+++ trunk/freenet/src/freenet/node/fcp/PersistentPut.java       2006-03-04 
20:25:41 UTC (rev 8161)
@@ -18,10 +18,11 @@
        final short persistenceType; 
        final File origFilename;
        final String mimeType;
+       final boolean global;

        public PersistentPut(String identifier, FreenetURI uri, int verbosity, 
                        short priorityClass, boolean fromDisk, short 
persistenceType, 
-                       File origFilename, String mimeType) {
+                       File origFilename, String mimeType, boolean global) {
                this.identifier = identifier;
                this.uri = uri;
                this.verbosity = verbosity;
@@ -30,6 +31,7 @@
                this.persistenceType = persistenceType;
                this.origFilename = origFilename;
                this.mimeType = mimeType;
+               this.global = global;
        }

        public SimpleFieldSet getFieldSet() {
@@ -44,6 +46,7 @@
                        fs.put("Filename", origFilename.getAbsolutePath());
                if(mimeType != null)
                        fs.put("Metadata.ContentType", mimeType);
+               fs.put("Global", Boolean.toString(global));
                return fs;
        }


Added: trunk/freenet/src/freenet/node/fcp/WatchGlobal.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/WatchGlobal.java 2006-03-04 19:51:04 UTC 
(rev 8160)
+++ trunk/freenet/src/freenet/node/fcp/WatchGlobal.java 2006-03-04 20:25:41 UTC 
(rev 8161)
@@ -0,0 +1,42 @@
+package freenet.node.fcp;
+
+import freenet.node.Node;
+import freenet.support.Fields;
+import freenet.support.SimpleFieldSet;
+
+public class WatchGlobal extends FCPMessage {
+
+       final boolean enabled;
+       final int verbosityMask;
+       static final String name = "WatchGlobal";
+
+       public WatchGlobal(SimpleFieldSet fs) throws MessageInvalidException {
+               enabled = Fields.stringToBool(fs.get("Enabled"), true);
+               String s = fs.get("VerbosityMask");
+               if(s != null)
+                       try {
+                               verbosityMask = Integer.parseInt(s);
+                       } catch (NumberFormatException e) {
+                               throw new 
MessageInvalidException(ProtocolErrorMessage.ERROR_PARSING_NUMBER, 
e.toString(), null);
+                       }
+               else
+                       verbosityMask = Integer.MAX_VALUE;
+       }
+       
+       public SimpleFieldSet getFieldSet() {
+               SimpleFieldSet fs = new SimpleFieldSet(true);
+               fs.put("Enabled", Boolean.toString(enabled));
+               fs.put("VerbosityMask", Integer.toString(verbosityMask));
+               return fs;
+       }
+
+       public String getName() {
+               return name;
+       }
+
+       public void run(FCPConnectionHandler handler, Node node)
+                       throws MessageInvalidException {
+               handler.getClient().setWatchGlobal(enabled, verbosityMask);
+       }
+
+}


Reply via email to