Author: toad
Date: 2009-04-21 17:32:03 +0000 (Tue, 21 Apr 2009)
New Revision: 27162
Added:
trunk/freenet/src/freenet/clients/http/FProxyFetchInProgress.java
trunk/freenet/src/freenet/clients/http/FProxyFetchResult.java
trunk/freenet/src/freenet/clients/http/FProxyFetchTracker.java
trunk/freenet/src/freenet/clients/http/FProxyFetchWaiter.java
Modified:
trunk/freenet/src/freenet/clients/http/FProxyToadlet.java
trunk/freenet/src/freenet/support/MultiValueTable.java
Log:
Fix the leak, lots of refactoring, commit missing files
Added: trunk/freenet/src/freenet/clients/http/FProxyFetchInProgress.java
===================================================================
--- trunk/freenet/src/freenet/clients/http/FProxyFetchInProgress.java
(rev 0)
+++ trunk/freenet/src/freenet/clients/http/FProxyFetchInProgress.java
2009-04-21 17:32:03 UTC (rev 27162)
@@ -0,0 +1,253 @@
+package freenet.clients.http;
+
+import java.util.ArrayList;
+
+import com.db4o.ObjectContainer;
+
+import freenet.client.FetchContext;
+import freenet.client.FetchException;
+import freenet.client.FetchResult;
+import freenet.client.HighLevelSimpleClient;
+import freenet.client.InsertException;
+import freenet.client.async.BaseClientPutter;
+import freenet.client.async.ClientCallback;
+import freenet.client.async.ClientContext;
+import freenet.client.async.ClientGetter;
+import freenet.client.events.ClientEvent;
+import freenet.client.events.ClientEventListener;
+import freenet.client.events.SendingToNetworkEvent;
+import freenet.client.events.SplitfileProgressEvent;
+import freenet.keys.FreenetURI;
+import freenet.node.RequestClient;
+import freenet.support.Logger;
+import freenet.support.api.Bucket;
+
+/**
+ * Fetching a page for a browser.
+ *
+ * LOCKING: The lock on this object is always taken last.
+ */
+public class FProxyFetchInProgress implements ClientEventListener,
ClientCallback {
+
+ /** The key we are fetching */
+ final FreenetURI uri;
+ /** The maximum size specified by the client */
+ final long maxSize;
+ /** Unique ID for the fetch */
+ private final long identifier;
+ /** Fetcher */
+ private final ClientGetter getter;
+ /** Any request which is waiting for a progress screen or data.
+ * We may want to wake requests separately in future. */
+ private final ArrayList<FProxyFetchWaiter> waiters;
+ private final ArrayList<FProxyFetchResult> results;
+ /** The data, if we have it */
+ private Bucket data;
+ /** Creation time */
+ private final long timeStarted;
+ /** Finished? */
+ private boolean finished;
+ /** Size, if known */
+ private long size;
+ /** MIME type, if known */
+ private String mimeType;
+ /** Gone to network? */
+ private boolean goneToNetwork;
+ /** Total blocks */
+ private int totalBlocks;
+ /** Required blocks */
+ private int requiredBlocks;
+ /** Fetched blocks */
+ private int fetchedBlocks;
+ /** Failed blocks */
+ private int failedBlocks;
+ /** Fatally failed blocks */
+ private int fatallyFailedBlocks;
+ /** Finalized the block set? */
+ private boolean finalizedBlocks;
+ /** Fetch failed */
+ private FetchException failed;
+ private boolean hasWaited;
+ /** Last time the fetch was accessed from the fproxy end */
+ private long lastTouched;
+ final FProxyFetchTracker tracker;
+
+ public FProxyFetchInProgress(FProxyFetchTracker tracker, FreenetURI
key, long maxSize2, long identifier, ClientContext context, FetchContext fctx,
RequestClient rc) {
+ this.tracker = tracker;
+ this.uri = key;
+ this.maxSize = maxSize2;
+ this.timeStarted = System.currentTimeMillis();
+ this.identifier = identifier;
+ fctx = new FetchContext(fctx, FetchContext.IDENTICAL_MASK,
false, null);
+ fctx.maxOutputLength = fctx.maxTempLength = maxSize;
+ fctx.eventProducer.addEventListener(this);
+ waiters = new ArrayList<FProxyFetchWaiter>();
+ results = new ArrayList<FProxyFetchResult>();
+ getter = new ClientGetter(this, uri, fctx,
FProxyToadlet.PRIORITY, rc, null, null);
+ }
+
+ public synchronized FProxyFetchWaiter getWaiter() {
+ lastTouched = System.currentTimeMillis();
+ FProxyFetchWaiter waiter = new FProxyFetchWaiter(this);
+ waiters.add(waiter);
+ return waiter;
+ }
+
+ synchronized FProxyFetchResult innerGetResult() {
+ lastTouched = System.currentTimeMillis();
+ FProxyFetchResult res;
+ if(data != null)
+ res = new FProxyFetchResult(this, data, mimeType,
timeStarted, goneToNetwork);
+ else
+ res = new FProxyFetchResult(this, mimeType, size,
timeStarted, goneToNetwork,
+ totalBlocks, requiredBlocks,
fetchedBlocks, failedBlocks, fatallyFailedBlocks, finalizedBlocks, failed);
+ results.add(res);
+ return res;
+ }
+
+ public void start(ClientContext context) throws FetchException {
+ try {
+ context.start(getter);
+ } catch (FetchException e) {
+ synchronized(this) {
+ this.failed = e;
+ this.finished = true;
+ }
+ }
+ }
+
+ public void onRemoveEventProducer(ObjectContainer container) {
+ // Impossible
+ }
+
+ public void receive(ClientEvent ce, ObjectContainer maybeContainer,
ClientContext context) {
+ if(ce instanceof SplitfileProgressEvent) {
+ SplitfileProgressEvent split = (SplitfileProgressEvent)
ce;
+ synchronized(this) {
+ int oldReq = requiredBlocks - (fetchedBlocks +
failedBlocks + fatallyFailedBlocks);
+ totalBlocks = split.totalBlocks;
+ fetchedBlocks = split.succeedBlocks;
+ requiredBlocks = split.minSuccessfulBlocks;
+ failedBlocks = split.failedBlocks;
+ fatallyFailedBlocks = split.fatallyFailedBlocks;
+ finalizedBlocks = split.finalizedTotal;
+ int req = requiredBlocks - (fetchedBlocks +
failedBlocks + fatallyFailedBlocks);
+ if(!(req > 1024 && oldReq <= 1024)) return;
+ }
+ } else if(ce instanceof SendingToNetworkEvent) {
+ synchronized(this) {
+ if(goneToNetwork) return;
+ goneToNetwork = true;
+ }
+ } else return;
+ wakeWaiters(false);
+ }
+
+ private void wakeWaiters(boolean finished) {
+ FProxyFetchWaiter[] waiting;
+ synchronized(this) {
+ waiting = waiters.toArray(new
FProxyFetchWaiter[waiters.size()]);
+ }
+ for(FProxyFetchWaiter w : waiting) {
+ w.wakeUp(finished);
+ }
+ }
+
+ public void onFailure(FetchException e, ClientGetter state,
ObjectContainer container) {
+ synchronized(this) {
+ this.failed = e;
+ this.finished = true;
+ }
+ wakeWaiters(true);
+ }
+
+ public void onFailure(InsertException e, BaseClientPutter state,
ObjectContainer container) {
+ // Impossible
+ }
+
+ public void onFetchable(BaseClientPutter state, ObjectContainer
container) {
+ // Impossible
+ }
+
+ public void onGeneratedURI(FreenetURI uri, BaseClientPutter state,
ObjectContainer container) {
+ // Impossible
+ }
+
+ public void onMajorProgress(ObjectContainer container) {
+ // Ignore
+ }
+
+ public void onSuccess(FetchResult result, ClientGetter state,
ObjectContainer container) {
+ synchronized(this) {
+ this.data = result.asBucket();
+ this.mimeType = result.getMimeType();
+ this.finished = true;
+ }
+ wakeWaiters(true);
+ }
+
+ public void onSuccess(BaseClientPutter state, ObjectContainer
container) {
+ // Ignore
+ }
+
+ public boolean hasData() {
+ return data != null;
+ }
+
+ public boolean finished() {
+ return finished;
+ }
+
+ public void close(FProxyFetchWaiter waiter) {
+ synchronized(this) {
+ waiters.remove(waiter);
+ if(!results.isEmpty()) return;
+ if(!waiters.isEmpty()) return;
+ }
+ tracker.queueCancel(this);
+ }
+
+ /** Keep for 30 seconds after last access */
+ static final int LIFETIME = 30 * 1000;
+
+ /** Caller should take the lock on FProxyToadlet.fetchers, then call
this
+ * function, if it returns true then finish the cancel outside the lock.
+ */
+ public synchronized boolean canCancel() {
+ if(!waiters.isEmpty()) return false;
+ if(!results.isEmpty()) return false;
+ if(lastTouched + LIFETIME >= System.currentTimeMillis()) {
+ Logger.error(this, "Not able to cancel for "+this+" :
"+uri+" : "+maxSize);
+ return false;
+ }
+ Logger.error(this, "Can cancel for "+this+" : "+uri+" :
"+maxSize);
+ return true;
+ }
+
+ public void finishCancel() {
+ Logger.error(this, "Finishing cancel for "+this+" : "+uri+" :
"+maxSize);
+ if(data != null) {
+ try {
+ data.free();
+ } catch (Throwable t) {
+ // Ensure we get to the next bit
+ Logger.error(this, "Failed to free: "+t, t);
+ }
+ }
+ try {
+ getter.cancel();
+ } catch (Throwable t) {
+ // Ensure we get to the next bit
+ Logger.error(this, "Failed to cancel: "+t, t);
+ }
+ }
+
+ public void close(FProxyFetchResult result) {
+ synchronized(this) {
+ results.remove(result);
+ if(!results.isEmpty()) return;
+ if(!waiters.isEmpty()) return;
+ }
+ tracker.queueCancel(this);
+ }
+}
Added: trunk/freenet/src/freenet/clients/http/FProxyFetchResult.java
===================================================================
--- trunk/freenet/src/freenet/clients/http/FProxyFetchResult.java
(rev 0)
+++ trunk/freenet/src/freenet/clients/http/FProxyFetchResult.java
2009-04-21 17:32:03 UTC (rev 27162)
@@ -0,0 +1,83 @@
+package freenet.clients.http;
+
+import freenet.client.FetchException;
+import freenet.support.api.Bucket;
+
+/** The result of fproxy waiting for a fetch: It can either be the final data,
or it
+ * can be the progress of the fetch so far.
+ * @author Matthew Toseland <[email protected]> (0xE43DA450)
+ */
+public class FProxyFetchResult {
+
+ /** If we have fetched the data, we know this. If we haven't, we might
know it. */
+ final String mimeType;
+
+ /** If we have fetched the data, we know this. If we haven't, we might
know it. */
+ final long size;
+
+ /** If we have fetched the data */
+ final Bucket data;
+
+ /** If we have not fetched the data */
+ /** Creation time */
+ final long timeStarted;
+ /** Gone to network? */
+ final boolean goneToNetwork;
+ /** Total blocks */
+ final int totalBlocks;
+ /** Required blocks */
+ final int requiredBlocks;
+ /** Fetched blocks */
+ final int fetchedBlocks;
+ /** Failed blocks */
+ final int failedBlocks;
+ /** Fatally failed blocks */
+ final int fatallyFailedBlocks;
+ /** Finalized blocks? */
+ final boolean finalizedBlocks;
+
+ /** Failed */
+ final FetchException failed;
+
+ final FProxyFetchInProgress progress;
+
+ /** Constructor when we are returning the data */
+ FProxyFetchResult(FProxyFetchInProgress parent, Bucket data, String
mimeType, long timeStarted, boolean goneToNetwork) {
+ this.data = data;
+ this.mimeType = mimeType;
+ this.size = data.size();
+ this.timeStarted = timeStarted;
+ this.goneToNetwork = goneToNetwork;
+ totalBlocks = requiredBlocks = fetchedBlocks = failedBlocks =
fatallyFailedBlocks = 0;
+ finalizedBlocks = true;
+ failed = null;
+ this.progress = parent;
+ }
+
+ /** Constructor when we are not returning the data, because it is still
running or it failed */
+ FProxyFetchResult(FProxyFetchInProgress parent, String mimeType, long
size, long timeStarted, boolean goneToNetwork, int totalBlocks, int
requiredBlocks, int fetchedBlocks, int failedBlocks, int fatallyFailedBlocks,
boolean finalizedBlocks, FetchException failed) {
+ this.data = null;
+ this.mimeType = mimeType;
+ this.size = size;
+ this.timeStarted = timeStarted;
+ this.goneToNetwork = goneToNetwork;
+ this.totalBlocks = totalBlocks;
+ this.requiredBlocks = requiredBlocks;
+ this.fetchedBlocks = fetchedBlocks;
+ this.failedBlocks = failedBlocks;
+ this.fatallyFailedBlocks = fatallyFailedBlocks;
+ this.finalizedBlocks = finalizedBlocks;
+ this.failed = failed;
+ this.progress = parent;
+ }
+
+ /** Must be called when fproxy has finished with the data */
+ public void close() {
+ progress.close(this);
+ }
+
+ public boolean hasData() {
+ return data != null;
+ }
+
+}
Added: trunk/freenet/src/freenet/clients/http/FProxyFetchTracker.java
===================================================================
--- trunk/freenet/src/freenet/clients/http/FProxyFetchTracker.java
(rev 0)
+++ trunk/freenet/src/freenet/clients/http/FProxyFetchTracker.java
2009-04-21 17:32:03 UTC (rev 27162)
@@ -0,0 +1,108 @@
+package freenet.clients.http;
+
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.Vector;
+
+import freenet.client.FetchContext;
+import freenet.client.FetchException;
+import freenet.client.async.ClientContext;
+import freenet.keys.FreenetURI;
+import freenet.node.RequestClient;
+import freenet.support.Logger;
+import freenet.support.MultiValueTable;
+
+public class FProxyFetchTracker implements Runnable {
+
+ final MultiValueTable<FreenetURI, FProxyFetchInProgress> fetchers;
+ final ClientContext context;
+ private long fetchIdentifiers;
+ private final FetchContext fctx;
+ private final RequestClient rc;
+ private boolean queuedJob;
+ private boolean requeue;
+
+ FProxyFetchTracker(ClientContext context, FetchContext fctx,
RequestClient rc) {
+ fetchers = new MultiValueTable<FreenetURI,
FProxyFetchInProgress>();
+ this.context = context;
+ this.fctx = fctx;
+ this.rc = rc;
+ }
+
+ FProxyFetchWaiter makeFetcher(FreenetURI key, long maxSize) {
+ FProxyFetchInProgress progress;
+ /* LOCKING:
+ * Call getWaiter() inside the fetchers lock, since we will
purge old
+ * fetchers inside that lock, hence avoid a race condition.
FetchInProgress
+ * lock is always taken last. */
+ synchronized(fetchers) {
+ if(fetchers.containsKey(key)) {
+ Object[] check = fetchers.getArray(key);
+ for(int i=0;i<check.length;i++) {
+ progress = (FProxyFetchInProgress)
check[i];
+ if(progress.maxSize == maxSize
+ || progress.hasData())
return progress.getWaiter();
+ }
+ }
+ progress = new FProxyFetchInProgress(this, key,
maxSize, fetchIdentifiers++, context, fctx, rc);
+ fetchers.put(key, progress);
+ }
+ try {
+ progress.start(context);
+ } catch (FetchException e) {
+ synchronized(fetchers) {
+ fetchers.removeElement(key, progress);
+ }
+ }
+ return progress.getWaiter();
+ // FIXME promote a fetcher when it is re-used
+ // FIXME get rid of fetchers over some age
+ }
+
+ public void queueCancel(FProxyFetchInProgress progress) {
+ Logger.error(this, "Queueing removal of old
FProxyFetchInProgress's");
+ synchronized(this) {
+ if(queuedJob) {
+ requeue = true;
+ return;
+ }
+ queuedJob = true;
+ }
+ context.ticker.queueTimedJob(this,
FProxyFetchInProgress.LIFETIME);
+ }
+
+ public void run() {
+ Logger.error(this, "Removing old FProxyFetchInProgress's");
+ ArrayList<FProxyFetchInProgress> toRemove = null;
+ boolean needRequeue = false;
+ synchronized(fetchers) {
+ if(requeue) {
+ requeue = false;
+ needRequeue = true;
+ } else {
+ queuedJob = false;
+ }
+ // Horrible hack, FIXME
+ Enumeration e = fetchers.keys();
+ while(e.hasMoreElements()) {
+ FreenetURI uri = (FreenetURI) e.nextElement();
+ // Really horrible hack, FIXME
+ Vector<FProxyFetchInProgress> list =
(Vector<FProxyFetchInProgress>) fetchers.iterateAll(uri);
+ for(FProxyFetchInProgress f : list)
+ // FIXME remove on the fly, although
cancel must wait
+ if(f.canCancel()) {
+ if(toRemove == null) toRemove =
new ArrayList<FProxyFetchInProgress>();
+ toRemove.add(f);
+ }
+ }
+ for(FProxyFetchInProgress r : toRemove) {
+ fetchers.removeElement(r.uri, r);
+ }
+ }
+ for(FProxyFetchInProgress r : toRemove)
+ r.finishCancel();
+ if(needRequeue)
+ context.ticker.queueTimedJob(this,
FProxyFetchInProgress.LIFETIME);
+ }
+
+}
Added: trunk/freenet/src/freenet/clients/http/FProxyFetchWaiter.java
===================================================================
--- trunk/freenet/src/freenet/clients/http/FProxyFetchWaiter.java
(rev 0)
+++ trunk/freenet/src/freenet/clients/http/FProxyFetchWaiter.java
2009-04-21 17:32:03 UTC (rev 27162)
@@ -0,0 +1,42 @@
+package freenet.clients.http;
+
+/** An fproxy fetch which is stalled waiting for either the data or a progress
screen. */
+public class FProxyFetchWaiter {
+
+ public FProxyFetchWaiter(FProxyFetchInProgress progress2) {
+ this.progress = progress2;
+ }
+
+ private final FProxyFetchInProgress progress;
+
+ private boolean hasWaited;
+ private boolean finished;
+ private boolean awoken;
+
+ public FProxyFetchResult getResult() {
+ synchronized(this) {
+ if(!(finished || hasWaited || awoken)) {
+ awoken = false;
+ try {
+ wait(2000);
+ } catch (InterruptedException e) {
+ // Not likely
+ };
+ }
+ }
+ return progress.innerGetResult();
+ }
+
+ public void close() {
+ progress.close(this);
+ }
+
+ synchronized void wakeUp(boolean fin) {
+ if(fin)
+ this.finished = true;
+ else
+ this.awoken = true;
+ notifyAll();
+ }
+
+}
Modified: trunk/freenet/src/freenet/clients/http/FProxyToadlet.java
===================================================================
--- trunk/freenet/src/freenet/clients/http/FProxyToadlet.java 2009-04-21
16:13:07 UTC (rev 27161)
+++ trunk/freenet/src/freenet/clients/http/FProxyToadlet.java 2009-04-21
17:32:03 UTC (rev 27162)
@@ -49,8 +49,7 @@
private static byte[] random;
final NodeClientCore core;
final ClientContext context;
- final MultiValueTable<FreenetURI, FProxyFetchInProgress> fetchers;
- private long fetchIdentifiers;
+ final FProxyFetchTracker fetchTracker;
private static FoundURICallback prefetchHook;
static final Set<String> prefetchAllowedTypes = new HashSet<String>();
@@ -99,7 +98,7 @@
}
};
- fetchers = new MultiValueTable<FreenetURI,
FProxyFetchInProgress>();
+ fetchTracker = new FProxyFetchTracker(context,
getClientImpl().getFetchContext(), this);
}
@Override
@@ -493,17 +492,20 @@
MultiValueTable<String,String> headers = ctx.getHeaders();
String ua = headers.get("user-agent");
+ String accept = headers.get("accept");
FProxyFetchResult fr = null;
- if(isBrowser(ua)) {
- FProxyFetchInProgress fetch = makeFetcher(key, maxSize);
+ if(isBrowser(ua) && accept == null ||
accept.indexOf("text/html") > -1) {
+ FProxyFetchWaiter fetch = fetchTracker.makeFetcher(key,
maxSize);
while(true) {
fr = fetch.getResult();
if(fr.hasData()) {
data = fr.data;
mimeType = fr.mimeType;
+ fetch.close(); // Not waiting any more, but
still locked the results until sent
break;
} else if(fr.failed != null) {
fe = fr.failed;
+ fetch.close(); // Not waiting any more, but
still locked the results until sent
break;
} else {
// Still in progress
@@ -527,6 +529,7 @@
retHeaders.put("Refresh", "2; url="+location);
writeHTMLReply(ctx, 200, "OK", retHeaders,
pageNode.generate());
fr.close();
+ fetch.close();
return;
}
}
@@ -674,32 +677,6 @@
}
}
- private FProxyFetchInProgress makeFetcher(FreenetURI key, long maxSize)
{
- FProxyFetchInProgress progress;
- synchronized(fetchers) {
- if(fetchers.containsKey(key)) {
- Object[] check = fetchers.getArray(key);
- for(int i=0;i<check.length;i++) {
- progress = (FProxyFetchInProgress)
check[i];
- if(progress.maxSize == maxSize
- || progress.hasData())
return progress;
- }
- }
- progress = new FProxyFetchInProgress(key, maxSize,
fetchIdentifiers++, context, getClientImpl(), this);
- fetchers.put(key, progress);
- }
- try {
- progress.start(context);
- } catch (FetchException e) {
- synchronized(fetchers) {
- fetchers.removeElement(key, progress);
- }
- }
- return progress;
- // FIXME promote a fetcher when it is re-used
- // FIXME get rid of fetchers over some age
- }
-
private boolean isBrowser(String ua) {
if(ua == null) return false;
if(ua.indexOf("Mozilla/") > -1) return true;
Modified: trunk/freenet/src/freenet/support/MultiValueTable.java
===================================================================
--- trunk/freenet/src/freenet/support/MultiValueTable.java 2009-04-21
16:13:07 UTC (rev 27161)
+++ trunk/freenet/src/freenet/support/MultiValueTable.java 2009-04-21
17:32:03 UTC (rev 27162)
@@ -150,7 +150,7 @@
return table.keys();
}
}
-
+
public Enumeration<V> elements() {
synchronized (table) {
if (table.isEmpty())
_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs