Author: toad
Date: 2008-06-25 01:46:43 +0000 (Wed, 25 Jun 2008)
New Revision: 20666
Modified:
branches/db4o/freenet/src/freenet/node/fcp/ClientGet.java
branches/db4o/freenet/src/freenet/node/fcp/ClientPut.java
branches/db4o/freenet/src/freenet/node/fcp/ClientPutBase.java
branches/db4o/freenet/src/freenet/node/fcp/ClientPutDir.java
branches/db4o/freenet/src/freenet/node/fcp/ClientRequest.java
branches/db4o/freenet/src/freenet/node/fcp/FCPClient.java
branches/db4o/freenet/src/freenet/node/fcp/GetRequestStatusMessage.java
Log:
FCP activation
Modified: branches/db4o/freenet/src/freenet/node/fcp/ClientGet.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/fcp/ClientGet.java 2008-06-25
01:45:38 UTC (rev 20665)
+++ branches/db4o/freenet/src/freenet/node/fcp/ClientGet.java 2008-06-25
01:46:43 UTC (rev 20666)
@@ -300,7 +300,7 @@
}
if(persistenceType != PERSIST_CONNECTION && !noTags) {
FCPMessage msg = persistentTagMessage();
- client.queueClientRequestMessage(msg, 0);
+ client.queueClientRequestMessage(msg, 0,
container);
}
}
@@ -312,7 +312,7 @@
getter.start(container, context);
if(persistenceType != PERSIST_CONNECTION && !finished) {
FCPMessage msg = persistentTagMessage();
- client.queueClientRequestMessage(msg, 0);
+ client.queueClientRequestMessage(msg, 0,
container);
}
synchronized(this) {
started = true;
@@ -402,7 +402,7 @@
this.succeeded = true;
finished = true;
}
- trySendDataFoundOrGetFailed(null);
+ trySendDataFoundOrGetFailed(null, container);
if(adm != null)
trySendAllDataMessage(adm, null, container);
@@ -416,7 +416,7 @@
client.notifySuccess(this);
}
- private void trySendDataFoundOrGetFailed(FCPConnectionOutputHandler
handler) {
+ private void trySendDataFoundOrGetFailed(FCPConnectionOutputHandler
handler, ObjectContainer container) {
FCPMessage msg;
// Don't need to lock. succeeded is only ever set, never unset.
@@ -430,12 +430,12 @@
if(handler != null)
handler.queue(msg);
else
- client.queueClientRequestMessage(msg, 0);
+ client.queueClientRequestMessage(msg, 0, container);
if(postFetchProtocolErrorMessage != null) {
if(handler != null)
handler.queue(postFetchProtocolErrorMessage);
else
-
client.queueClientRequestMessage(postFetchProtocolErrorMessage, 0);
+
client.queueClientRequestMessage(postFetchProtocolErrorMessage, 0, container);
}
}
@@ -446,7 +446,7 @@
if(persistenceType == ClientRequest.PERSIST_FOREVER)
container.set(this);
} else {
- client.queueClientRequestMessage(msg, 0);
+ client.queueClientRequestMessage(msg, 0, container);
}
}
@@ -456,10 +456,10 @@
if(persistenceType == ClientRequest.PERSIST_FOREVER)
container.set(this);
}
- client.queueClientRequestMessage(msg,
VERBOSITY_SPLITFILE_PROGRESS);
+ client.queueClientRequestMessage(msg,
VERBOSITY_SPLITFILE_PROGRESS, container);
}
- public void sendPendingMessages(FCPConnectionOutputHandler handler,
boolean includePersistentRequest, boolean includeData, boolean onlyData) {
+ public void sendPendingMessages(FCPConnectionOutputHandler handler,
boolean includePersistentRequest, boolean includeData, boolean onlyData,
ObjectContainer container) {
if(persistenceType == ClientRequest.PERSIST_CONNECTION) {
Logger.error(this, "WTF?
persistenceType="+persistenceType, new Exception("error"));
return;
@@ -472,7 +472,7 @@
if(progressPending != null)
handler.queue(progressPending);
if(finished)
- trySendDataFoundOrGetFailed(handler);
+ trySendDataFoundOrGetFailed(handler, container);
}
if (onlyData && allDataPending == null) {
@@ -497,7 +497,7 @@
}
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Caught "+e, e);
- trySendDataFoundOrGetFailed(null);
+ trySendDataFoundOrGetFailed(null, container);
if(persistenceType == PERSIST_FOREVER)
container.set(this);
finish(container);
@@ -525,11 +525,11 @@
FetchException cancelled = new
FetchException(FetchException.CANCELLED);
getFailedMessage = new
GetFailedMessage(cancelled, identifier, global);
}
- trySendDataFoundOrGetFailed(null);
+ trySendDataFoundOrGetFailed(null, container);
}
// notify client that request was removed
FCPMessage msg = new
PersistentRequestRemovedMessage(getIdentifier(), global);
- client.queueClientRequestMessage(msg, 0);
+ client.queueClientRequestMessage(msg, 0, container);
freeData(container);
Modified: branches/db4o/freenet/src/freenet/node/fcp/ClientPut.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/fcp/ClientPut.java 2008-06-25
01:45:38 UTC (rev 20665)
+++ branches/db4o/freenet/src/freenet/node/fcp/ClientPut.java 2008-06-25
01:46:43 UTC (rev 20666)
@@ -339,7 +339,7 @@
oldProgress, targetFilename, binaryBlob);
if(persistenceType != PERSIST_CONNECTION) {
FCPMessage msg = persistentTagMessage();
- client.queueClientRequestMessage(msg, 0);
+ client.queueClientRequestMessage(msg, 0, null);
}
}
@@ -349,7 +349,7 @@
client.register(this, false, container);
if(persistenceType != PERSIST_CONNECTION && !noTags) {
FCPMessage msg = persistentTagMessage();
- client.queueClientRequestMessage(msg, 0);
+ client.queueClientRequestMessage(msg, 0, container);
}
}
@@ -363,7 +363,7 @@
putter.start(earlyEncode, false, container, context);
if(persistenceType != PERSIST_CONNECTION && !finished) {
FCPMessage msg = persistentTagMessage();
- client.queueClientRequestMessage(msg, 0);
+ client.queueClientRequestMessage(msg, 0,
container);
}
synchronized(this) {
started = true;
Modified: branches/db4o/freenet/src/freenet/node/fcp/ClientPutBase.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/fcp/ClientPutBase.java
2008-06-25 01:45:38 UTC (rev 20665)
+++ branches/db4o/freenet/src/freenet/node/fcp/ClientPutBase.java
2008-06-25 01:46:43 UTC (rev 20666)
@@ -146,7 +146,7 @@
}
freeData(container);
finish(container);
- trySendFinalMessage(null);
+ trySendFinalMessage(null, container);
client.notifySuccess(this);
}
@@ -158,7 +158,7 @@
}
freeData(container);
finish(container);
- trySendFinalMessage(null);
+ trySendFinalMessage(null, container);
client.notifyFailure(this);
}
@@ -170,7 +170,7 @@
}
if(persistenceType == PERSIST_FOREVER)
container.set(this);
- trySendGeneratedURIMessage(null);
+ trySendGeneratedURIMessage(null, container);
}
public void requestWasRemoved(ObjectContainer container) {
@@ -181,11 +181,11 @@
InsertException cancelled = new
InsertException(InsertException.CANCELLED);
putFailedMessage = new
PutFailedMessage(cancelled, identifier, global);
}
- trySendFinalMessage(null);
+ trySendFinalMessage(null, container);
}
// notify client that request was removed
FCPMessage msg = new
PersistentRequestRemovedMessage(getIdentifier(), global);
- client.queueClientRequestMessage(msg, 0);
+ client.queueClientRequestMessage(msg, 0, container);
freeData(container);
if(persistenceType == PERSIST_FOREVER) {
@@ -247,7 +247,7 @@
}
}
- private void trySendFinalMessage(FCPConnectionOutputHandler handler) {
+ private void trySendFinalMessage(FCPConnectionOutputHandler handler,
ObjectContainer container) {
FCPMessage msg;
synchronized (this) {
@@ -264,11 +264,11 @@
if(handler != null)
handler.queue(msg);
else
- client.queueClientRequestMessage(msg, 0);
+ client.queueClientRequestMessage(msg, 0,
container);
}
}
- private void trySendGeneratedURIMessage(FCPConnectionOutputHandler
handler) {
+ private void trySendGeneratedURIMessage(FCPConnectionOutputHandler
handler, ObjectContainer container) {
FCPMessage msg;
synchronized(this) {
msg = new URIGeneratedMessage(generatedURI, identifier);
@@ -276,7 +276,7 @@
if(handler != null)
handler.queue(msg);
else
- client.queueClientRequestMessage(msg, 0);
+ client.queueClientRequestMessage(msg, 0, container);
}
/**
@@ -307,10 +307,10 @@
if(handler != null)
handler.queue(msg);
else
- client.queueClientRequestMessage(msg, verbosity);
+ client.queueClientRequestMessage(msg, verbosity,
container);
}
- public void sendPendingMessages(FCPConnectionOutputHandler handler,
boolean includePersistentRequest, boolean includeData, boolean onlyData) {
+ public void sendPendingMessages(FCPConnectionOutputHandler handler,
boolean includePersistentRequest, boolean includeData, boolean onlyData,
ObjectContainer container) {
if(persistenceType == PERSIST_CONNECTION) {
Logger.error(this, "WTF?
persistenceType="+persistenceType, new Exception("error"));
return;
@@ -329,11 +329,11 @@
fin = finished;
}
if(generated)
- trySendGeneratedURIMessage(handler);
+ trySendGeneratedURIMessage(handler, container);
if(msg != null)
handler.queue(msg);
if(fin)
- trySendFinalMessage(handler);
+ trySendFinalMessage(handler, container);
}
public synchronized SimpleFieldSet getFieldSet() {
Modified: branches/db4o/freenet/src/freenet/node/fcp/ClientPutDir.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/fcp/ClientPutDir.java
2008-06-25 01:45:38 UTC (rev 20665)
+++ branches/db4o/freenet/src/freenet/node/fcp/ClientPutDir.java
2008-06-25 01:46:43 UTC (rev 20666)
@@ -88,7 +88,7 @@
client.register(this, false, container);
if(persistenceType != PERSIST_CONNECTION && !noTags) {
FCPMessage msg = persistentTagMessage();
- client.queueClientRequestMessage(msg, 0);
+ client.queueClientRequestMessage(msg, 0, container);
}
}
@@ -138,7 +138,7 @@
- public ClientPutDir(SimpleFieldSet fs, FCPClient client, FCPServer
server) throws PersistenceParseException, IOException {
+ public ClientPutDir(SimpleFieldSet fs, FCPClient client, FCPServer
server, ObjectContainer container) throws PersistenceParseException,
IOException {
super(fs, client, server);
logMINOR = Logger.shouldLog(Logger.MINOR, this);
SimpleFieldSet files = fs.subset("Files");
@@ -214,7 +214,7 @@
totalSize = size;
if(persistenceType != PERSIST_CONNECTION) {
FCPMessage msg = persistentTagMessage();
- client.queueClientRequestMessage(msg, 0);
+ client.queueClientRequestMessage(msg, 0, container);
}
}
@@ -228,7 +228,7 @@
if(logMINOR) Logger.minor(this, "Started "+putter);
if(persistenceType != PERSIST_CONNECTION && !finished) {
FCPMessage msg = persistentTagMessage();
- client.queueClientRequestMessage(msg, 0);
+ client.queueClientRequestMessage(msg, 0,
container);
}
if(persistenceType == PERSIST_FOREVER)
container.set(this); // Update
Modified: branches/db4o/freenet/src/freenet/node/fcp/ClientRequest.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/fcp/ClientRequest.java
2008-06-25 01:45:38 UTC (rev 20665)
+++ branches/db4o/freenet/src/freenet/node/fcp/ClientRequest.java
2008-06-25 01:46:43 UTC (rev 20666)
@@ -125,7 +125,7 @@
public abstract void onLostConnection(ObjectContainer container);
/** Send any pending messages for a persistent request e.g. after
reconnecting */
- public abstract void sendPendingMessages(FCPConnectionOutputHandler
handler, boolean includePersistentRequest, boolean includeData, boolean
onlyData);
+ public abstract void sendPendingMessages(FCPConnectionOutputHandler
handler, boolean includePersistentRequest, boolean includeData, boolean
onlyData, ObjectContainer container);
// Persistence
@@ -190,7 +190,7 @@
if(!lazyResume) cp.start(container, context);
return cp;
} else if(type.equals("PUTDIR")) {
- ClientPutDir cp = new ClientPutDir(fs, client,
server);
+ ClientPutDir cp = new ClientPutDir(fs, client,
server, container);
client.register(cp, lazyResume, container);
if(!lazyResume) cp.start(container, context);
return cp;
@@ -363,7 +363,7 @@
} else {
return; // paranoia, we should not be here if nothing
was changed!
}
- client.queueClientRequestMessage(modifiedMsg, 0);
+ client.queueClientRequestMessage(modifiedMsg, 0, container);
}
/** Utility method for storing details of a possibly encrypted bucket.
*/
Modified: branches/db4o/freenet/src/freenet/node/fcp/FCPClient.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/fcp/FCPClient.java 2008-06-25
01:45:38 UTC (rev 20665)
+++ branches/db4o/freenet/src/freenet/node/fcp/FCPClient.java 2008-06-25
01:46:43 UTC (rev 20666)
@@ -133,8 +133,12 @@
synchronized(this) {
reqs = completedUnackedRequests.toArray();
}
- for(int i=0;i<reqs.length;i++)
-
((ClientRequest)reqs[i]).sendPendingMessages(outputHandler, true, false, false);
+ for(int i=0;i<reqs.length;i++) {
+ ClientRequest req = (ClientRequest) reqs[i];
+ if(persistenceType == ClientRequest.PERSIST_FOREVER)
+ container.activate(req, 1);
+
((ClientRequest)reqs[i]).sendPendingMessages(outputHandler, true, false, false,
container);
+ }
}
/**
@@ -146,8 +150,12 @@
synchronized(this) {
reqs = runningPersistentRequests.toArray();
}
- for(int i=0;i<reqs.length;i++)
-
((ClientRequest)reqs[i]).sendPendingMessages(outputHandler, true, false, false);
+ for(int i=0;i<reqs.length;i++) {
+ ClientRequest req = (ClientRequest) reqs[i];
+ if(persistenceType == ClientRequest.PERSIST_FOREVER)
+ container.activate(req, 1);
+ req.sendPendingMessages(outputHandler, true, false,
false, container);
+ }
}
public void register(ClientRequest cg, boolean startLater,
ObjectContainer container) throws IdentifierCollisionException {
@@ -245,11 +253,11 @@
this.watchGlobalVerbosityMask = verbosityMask;
}
- public void queueClientRequestMessage(FCPMessage msg, int
verbosityLevel) {
- queueClientRequestMessage(msg, verbosityLevel, false);
+ public void queueClientRequestMessage(FCPMessage msg, int
verbosityLevel, ObjectContainer container) {
+ queueClientRequestMessage(msg, verbosityLevel, false,
container);
}
- public void queueClientRequestMessage(FCPMessage msg, int
verbosityLevel, boolean useGlobalMask) {
+ public void queueClientRequestMessage(FCPMessage msg, int
verbosityLevel, boolean useGlobalMask, ObjectContainer container) {
if(useGlobalMask && (verbosityLevel & watchGlobalVerbosityMask)
!= verbosityLevel)
return;
FCPConnectionHandler conn = getConnection();
@@ -265,8 +273,11 @@
clients = null;
}
if(clients != null)
- for(int i=0;i<clients.length;i++)
- clients[i].queueClientRequestMessage(msg,
verbosityLevel, true);
+ for(int i=0;i<clients.length;i++) {
+ if(persistenceType ==
ClientRequest.PERSIST_FOREVER)
+ container.activate(clients[i], 1);
+ clients[i].queueClientRequestMessage(msg,
verbosityLevel, true, container);
+ }
}
}
@@ -289,7 +300,10 @@
public synchronized ClientRequest getRequest(String identifier,
ObjectContainer container) {
assert((persistenceType == ClientRequest.PERSIST_FOREVER) ==
(container != null));
- return (ClientRequest)
clientRequestsByIdentifier.get(identifier);
+ ClientRequest req = (ClientRequest)
clientRequestsByIdentifier.get(identifier);
+ if(persistenceType == ClientRequest.PERSIST_FOREVER)
+ container.activate(req, 1);
+ return req;
}
/**
@@ -299,6 +313,7 @@
ClientRequest[] reqs;
synchronized(this) {
reqs = (ClientRequest[]) toStart.toArray(new
ClientRequest[toStart.size()]);
+ toStart.clear();
}
for(int i=0;i<reqs.length;i++) {
final ClientRequest req = reqs[i];
@@ -387,7 +402,11 @@
// FIXME speed this up with another hashmap or something.
for(int i=0;i<completedUnackedRequests.size();i++) {
ClientGet getter = (ClientGet)
completedUnackedRequests.get(i);
- if(getter.getURI().equals(key)) return getter;
+ if(getter.getURI().equals(key)) {
+ if(persistenceType ==
ClientRequest.PERSIST_FOREVER)
+ container.activate(getter, 1);
+ return getter;
+ }
}
return null;
}
Modified:
branches/db4o/freenet/src/freenet/node/fcp/GetRequestStatusMessage.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/fcp/GetRequestStatusMessage.java
2008-06-25 01:45:38 UTC (rev 20665)
+++ branches/db4o/freenet/src/freenet/node/fcp/GetRequestStatusMessage.java
2008-06-25 01:46:43 UTC (rev 20666)
@@ -47,13 +47,13 @@
ProtocolErrorMessage msg = new
ProtocolErrorMessage(ProtocolErrorMessage.NO_SUCH_IDENTIFIER, false, null,
identifier, global);
handler.outputHandler.queue(msg);
} else {
-
req.sendPendingMessages(handler.outputHandler, true, true, onlyData);
+
req.sendPendingMessages(handler.outputHandler, true, true, onlyData, container);
}
}
}, NativeThread.NORM_PRIORITY, false);
} else {
- req.sendPendingMessages(handler.outputHandler, true,
true, onlyData);
+ req.sendPendingMessages(handler.outputHandler, true,
true, onlyData, null);
}
}