Author: toad
Date: 2009-04-18 18:29:30 +0000 (Sat, 18 Apr 2009)
New Revision: 27005

Added:
   trunk/freenet/src/freenet/node/fcp/UnsubscribeUSKMessage.java
Modified:
   trunk/freenet/src/freenet/node/fcp/FCPConnectionHandler.java
   trunk/freenet/src/freenet/node/fcp/FCPMessage.java
   trunk/freenet/src/freenet/node/fcp/SubscribeUSK.java
   trunk/freenet/src/freenet/node/fcp/SubscribeUSKMessage.java
Log:
Track subscribed USKs by identifier, add UnsubscribeUSK, also unsubscribe on 
disconnect (fix leak)


Modified: trunk/freenet/src/freenet/node/fcp/FCPConnectionHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/FCPConnectionHandler.java        
2009-04-18 18:10:07 UTC (rev 27004)
+++ trunk/freenet/src/freenet/node/fcp/FCPConnectionHandler.java        
2009-04-18 18:29:30 UTC (rev 27005)
@@ -9,6 +9,7 @@
 import java.net.Socket;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Random;
 
 import com.db4o.ObjectContainer;
@@ -59,6 +60,7 @@
        final FCPServer server;
        final Socket sock;
        final FCPConnectionInputHandler inputHandler;
+       final Map<String, SubscribeUSK> uskSubscriptions;
        public final FCPConnectionOutputHandler outputHandler;
        private boolean isClosed;
        private boolean inputClosed;
@@ -92,6 +94,7 @@
                isClosed = false;
                this.bf = server.core.tempBucketFactory;
                requestsByIdentifier = new HashMap<String, ClientRequest>();
+               uskSubscriptions = new HashMap<String, SubscribeUSK>();
                this.inputHandler = new FCPConnectionInputHandler(this);
                this.outputHandler = new FCPConnectionOutputHandler(this);
                
@@ -112,14 +115,18 @@
                if(foreverClient != null)
                        foreverClient.onLostConnection(this);
                boolean dupe;
+               SubscribeUSK[] subscriptions;
                synchronized(this) {
                        isClosed = true;
                        requests = new 
ClientRequest[requestsByIdentifier.size()];
                        requests = 
requestsByIdentifier.values().toArray(requests);
+                       subscriptions = uskSubscriptions.values().toArray(new 
SubscribeUSK[uskSubscriptions.size()]);
                        dupe = killedDupe;
                }
                for(int i=0;i<requests.length;i++)
                        requests[i].onLostConnection(null, 
server.core.clientContext);
+               for(SubscribeUSK sub : subscriptions)
+                       sub.unsubscribe();
                if(!dupe) {
                server.core.clientContext.jobRunner.queue(new DBJob() {
 
@@ -140,6 +147,7 @@
                        
                }, NativeThread.NORM_PRIORITY, false);
                }
+               
                outputHandler.onClosed();
        }
        
@@ -738,6 +746,19 @@
                Logger.error(this, "Not storing FCPConnectionHandler in 
database", new Exception("error"));
                return false;
        }
-       
 
+       public synchronized void addSubscription(String identifier, 
SubscribeUSK subscribeUSK) throws IdentifierCollisionException {
+               if(uskSubscriptions.containsKey(identifier)) throw new 
IdentifierCollisionException();
+               uskSubscriptions.put(identifier, subscribeUSK);
+       }
+
+       public void unsubscribe(String identifier) throws 
MessageInvalidException {
+               SubscribeUSK sub;
+               synchronized(this) {
+                       if(!uskSubscriptions.containsKey(identifier)) throw new 
MessageInvalidException(ProtocolErrorMessage.NO_SUCH_IDENTIFIER, "No such 
identifier unsubscribing", identifier, false);
+                       sub = uskSubscriptions.remove(identifier);
+               }
+               sub.unsubscribe();
+       }
+
 }

Modified: trunk/freenet/src/freenet/node/fcp/FCPMessage.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/FCPMessage.java  2009-04-18 18:10:07 UTC 
(rev 27004)
+++ trunk/freenet/src/freenet/node/fcp/FCPMessage.java  2009-04-18 18:29:30 UTC 
(rev 27005)
@@ -86,6 +86,8 @@
                        return new ShutdownMessage();
                if(name.equals(SubscribeUSKMessage.NAME))
                        return new SubscribeUSKMessage(fs);
+               if(name.equals(UnsubscribeUSKMessage.NAME))
+                       return new UnsubscribeUSKMessage(fs);
                if(name.equals(TestDDARequestMessage.NAME))
                        return new TestDDARequestMessage(fs);
                if(name.equals(TestDDAResponseMessage.NAME))

Modified: trunk/freenet/src/freenet/node/fcp/SubscribeUSK.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/SubscribeUSK.java        2009-04-18 
18:10:07 UTC (rev 27004)
+++ trunk/freenet/src/freenet/node/fcp/SubscribeUSK.java        2009-04-18 
18:29:30 UTC (rev 27005)
@@ -19,14 +19,17 @@
        final boolean dontPoll;
        final short prio;
        final short prioProgress;
+       final USK usk;
        
-       public SubscribeUSK(SubscribeUSKMessage message, NodeClientCore core, 
FCPConnectionHandler handler) {
+       public SubscribeUSK(SubscribeUSKMessage message, NodeClientCore core, 
FCPConnectionHandler handler) throws IdentifierCollisionException {
                this.handler = handler;
                this.dontPoll = message.dontPoll;
                this.identifier = message.identifier;
                this.core = core;
+               this.usk = message.key;
                prio = message.prio;
                prioProgress = message.prioProgress;
+               handler.addSubscription(identifier, this);
                core.uskManager.subscribe(message.key, this, !message.dontPoll, 
handler.getRebootClient().lowLevelClient);
        }
 
@@ -48,4 +51,8 @@
                return prioProgress;
        }
 
+       public void unsubscribe() {
+               core.uskManager.unsubscribe(usk, this);
+       }
+
 }

Modified: trunk/freenet/src/freenet/node/fcp/SubscribeUSKMessage.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/SubscribeUSKMessage.java 2009-04-18 
18:10:07 UTC (rev 27004)
+++ trunk/freenet/src/freenet/node/fcp/SubscribeUSKMessage.java 2009-04-18 
18:29:30 UTC (rev 27005)
@@ -70,8 +70,12 @@
        @Override
        public void run(FCPConnectionHandler handler, Node node)
                        throws MessageInvalidException {
-               new SubscribeUSK(this, node.clientCore, handler);
-               
+               try {
+                       new SubscribeUSK(this, node.clientCore, handler);
+               } catch (IdentifierCollisionException e) {
+                       handler.outputHandler.queue(new 
IdentifierCollisionMessage(identifier, false));
+                       return;
+               }
                SubscribedUSKMessage reply = new SubscribedUSKMessage(this);
                handler.outputHandler.queue(reply);
        }

Added: trunk/freenet/src/freenet/node/fcp/UnsubscribeUSKMessage.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/UnsubscribeUSKMessage.java               
                (rev 0)
+++ trunk/freenet/src/freenet/node/fcp/UnsubscribeUSKMessage.java       
2009-04-18 18:29:30 UTC (rev 27005)
@@ -0,0 +1,40 @@
+package freenet.node.fcp;
+
+import com.db4o.ObjectContainer;
+
+import freenet.node.Node;
+import freenet.support.SimpleFieldSet;
+
+public class UnsubscribeUSKMessage extends FCPMessage {
+
+       public static final String NAME = "UnsubscribeUSK";
+       private final String identifier;
+
+       public UnsubscribeUSKMessage(SimpleFieldSet fs) throws 
MessageInvalidException {
+               this.identifier = fs.get("Identifier");
+               if(identifier == null)
+                       throw new 
MessageInvalidException(ProtocolErrorMessage.MISSING_FIELD, "No Identifier!", 
null, false);
+       }
+
+       @Override
+       public SimpleFieldSet getFieldSet() {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public String getName() {
+               return NAME;
+       }
+
+       @Override
+       public void removeFrom(ObjectContainer container) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void run(FCPConnectionHandler handler, Node node)
+                       throws MessageInvalidException {
+               handler.unsubscribe(identifier);
+       }
+
+}

_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to