Author: toad
Date: 2009-04-18 18:29:30 +0000 (Sat, 18 Apr 2009)
New Revision: 27005
Added:
trunk/freenet/src/freenet/node/fcp/UnsubscribeUSKMessage.java
Modified:
trunk/freenet/src/freenet/node/fcp/FCPConnectionHandler.java
trunk/freenet/src/freenet/node/fcp/FCPMessage.java
trunk/freenet/src/freenet/node/fcp/SubscribeUSK.java
trunk/freenet/src/freenet/node/fcp/SubscribeUSKMessage.java
Log:
Track subscribed USKs by identifier, add UnsubscribeUSK, also unsubscribe on
disconnect (fix leak)
Modified: trunk/freenet/src/freenet/node/fcp/FCPConnectionHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/FCPConnectionHandler.java
2009-04-18 18:10:07 UTC (rev 27004)
+++ trunk/freenet/src/freenet/node/fcp/FCPConnectionHandler.java
2009-04-18 18:29:30 UTC (rev 27005)
@@ -9,6 +9,7 @@
import java.net.Socket;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.Map;
import java.util.Random;
import com.db4o.ObjectContainer;
@@ -59,6 +60,7 @@
final FCPServer server;
final Socket sock;
final FCPConnectionInputHandler inputHandler;
+ final Map<String, SubscribeUSK> uskSubscriptions;
public final FCPConnectionOutputHandler outputHandler;
private boolean isClosed;
private boolean inputClosed;
@@ -92,6 +94,7 @@
isClosed = false;
this.bf = server.core.tempBucketFactory;
requestsByIdentifier = new HashMap<String, ClientRequest>();
+ uskSubscriptions = new HashMap<String, SubscribeUSK>();
this.inputHandler = new FCPConnectionInputHandler(this);
this.outputHandler = new FCPConnectionOutputHandler(this);
@@ -112,14 +115,18 @@
if(foreverClient != null)
foreverClient.onLostConnection(this);
boolean dupe;
+ SubscribeUSK[] subscriptions;
synchronized(this) {
isClosed = true;
requests = new
ClientRequest[requestsByIdentifier.size()];
requests =
requestsByIdentifier.values().toArray(requests);
+ subscriptions = uskSubscriptions.values().toArray(new
SubscribeUSK[uskSubscriptions.size()]);
dupe = killedDupe;
}
for(int i=0;i<requests.length;i++)
requests[i].onLostConnection(null,
server.core.clientContext);
+ for(SubscribeUSK sub : subscriptions)
+ sub.unsubscribe();
if(!dupe) {
server.core.clientContext.jobRunner.queue(new DBJob() {
@@ -140,6 +147,7 @@
}, NativeThread.NORM_PRIORITY, false);
}
+
outputHandler.onClosed();
}
@@ -738,6 +746,19 @@
Logger.error(this, "Not storing FCPConnectionHandler in
database", new Exception("error"));
return false;
}
-
+ public synchronized void addSubscription(String identifier,
SubscribeUSK subscribeUSK) throws IdentifierCollisionException {
+ if(uskSubscriptions.containsKey(identifier)) throw new
IdentifierCollisionException();
+ uskSubscriptions.put(identifier, subscribeUSK);
+ }
+
+ public void unsubscribe(String identifier) throws
MessageInvalidException {
+ SubscribeUSK sub;
+ synchronized(this) {
+ if(!uskSubscriptions.containsKey(identifier)) throw new
MessageInvalidException(ProtocolErrorMessage.NO_SUCH_IDENTIFIER, "No such
identifier unsubscribing", identifier, false);
+ sub = uskSubscriptions.remove(identifier);
+ }
+ sub.unsubscribe();
+ }
+
}
Modified: trunk/freenet/src/freenet/node/fcp/FCPMessage.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/FCPMessage.java 2009-04-18 18:10:07 UTC
(rev 27004)
+++ trunk/freenet/src/freenet/node/fcp/FCPMessage.java 2009-04-18 18:29:30 UTC
(rev 27005)
@@ -86,6 +86,8 @@
return new ShutdownMessage();
if(name.equals(SubscribeUSKMessage.NAME))
return new SubscribeUSKMessage(fs);
+ if(name.equals(UnsubscribeUSKMessage.NAME))
+ return new UnsubscribeUSKMessage(fs);
if(name.equals(TestDDARequestMessage.NAME))
return new TestDDARequestMessage(fs);
if(name.equals(TestDDAResponseMessage.NAME))
Modified: trunk/freenet/src/freenet/node/fcp/SubscribeUSK.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/SubscribeUSK.java 2009-04-18
18:10:07 UTC (rev 27004)
+++ trunk/freenet/src/freenet/node/fcp/SubscribeUSK.java 2009-04-18
18:29:30 UTC (rev 27005)
@@ -19,14 +19,17 @@
final boolean dontPoll;
final short prio;
final short prioProgress;
+ final USK usk;
- public SubscribeUSK(SubscribeUSKMessage message, NodeClientCore core,
FCPConnectionHandler handler) {
+ public SubscribeUSK(SubscribeUSKMessage message, NodeClientCore core,
FCPConnectionHandler handler) throws IdentifierCollisionException {
this.handler = handler;
this.dontPoll = message.dontPoll;
this.identifier = message.identifier;
this.core = core;
+ this.usk = message.key;
prio = message.prio;
prioProgress = message.prioProgress;
+ handler.addSubscription(identifier, this);
core.uskManager.subscribe(message.key, this, !message.dontPoll,
handler.getRebootClient().lowLevelClient);
}
@@ -48,4 +51,8 @@
return prioProgress;
}
+ public void unsubscribe() {
+ core.uskManager.unsubscribe(usk, this);
+ }
+
}
Modified: trunk/freenet/src/freenet/node/fcp/SubscribeUSKMessage.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/SubscribeUSKMessage.java 2009-04-18
18:10:07 UTC (rev 27004)
+++ trunk/freenet/src/freenet/node/fcp/SubscribeUSKMessage.java 2009-04-18
18:29:30 UTC (rev 27005)
@@ -70,8 +70,12 @@
@Override
public void run(FCPConnectionHandler handler, Node node)
throws MessageInvalidException {
- new SubscribeUSK(this, node.clientCore, handler);
-
+ try {
+ new SubscribeUSK(this, node.clientCore, handler);
+ } catch (IdentifierCollisionException e) {
+ handler.outputHandler.queue(new
IdentifierCollisionMessage(identifier, false));
+ return;
+ }
SubscribedUSKMessage reply = new SubscribedUSKMessage(this);
handler.outputHandler.queue(reply);
}
Added: trunk/freenet/src/freenet/node/fcp/UnsubscribeUSKMessage.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/UnsubscribeUSKMessage.java
(rev 0)
+++ trunk/freenet/src/freenet/node/fcp/UnsubscribeUSKMessage.java
2009-04-18 18:29:30 UTC (rev 27005)
@@ -0,0 +1,40 @@
+package freenet.node.fcp;
+
+import com.db4o.ObjectContainer;
+
+import freenet.node.Node;
+import freenet.support.SimpleFieldSet;
+
+public class UnsubscribeUSKMessage extends FCPMessage {
+
+ public static final String NAME = "UnsubscribeUSK";
+ private final String identifier;
+
+ public UnsubscribeUSKMessage(SimpleFieldSet fs) throws
MessageInvalidException {
+ this.identifier = fs.get("Identifier");
+ if(identifier == null)
+ throw new
MessageInvalidException(ProtocolErrorMessage.MISSING_FIELD, "No Identifier!",
null, false);
+ }
+
+ @Override
+ public SimpleFieldSet getFieldSet() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
+ @Override
+ public void removeFrom(ObjectContainer container) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void run(FCPConnectionHandler handler, Node node)
+ throws MessageInvalidException {
+ handler.unsubscribe(identifier);
+ }
+
+}
_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs