Author: toad
Date: 2009-04-21 17:32:03 +0000 (Tue, 21 Apr 2009)
New Revision: 27162

Added:
   trunk/freenet/src/freenet/clients/http/FProxyFetchInProgress.java
   trunk/freenet/src/freenet/clients/http/FProxyFetchResult.java
   trunk/freenet/src/freenet/clients/http/FProxyFetchTracker.java
   trunk/freenet/src/freenet/clients/http/FProxyFetchWaiter.java
Modified:
   trunk/freenet/src/freenet/clients/http/FProxyToadlet.java
   trunk/freenet/src/freenet/support/MultiValueTable.java
Log:
Fix the leak, lots of refactoring, commit missing files


Added: trunk/freenet/src/freenet/clients/http/FProxyFetchInProgress.java
===================================================================
--- trunk/freenet/src/freenet/clients/http/FProxyFetchInProgress.java           
                (rev 0)
+++ trunk/freenet/src/freenet/clients/http/FProxyFetchInProgress.java   
2009-04-21 17:32:03 UTC (rev 27162)
@@ -0,0 +1,253 @@
+package freenet.clients.http;
+
+import java.util.ArrayList;
+
+import com.db4o.ObjectContainer;
+
+import freenet.client.FetchContext;
+import freenet.client.FetchException;
+import freenet.client.FetchResult;
+import freenet.client.HighLevelSimpleClient;
+import freenet.client.InsertException;
+import freenet.client.async.BaseClientPutter;
+import freenet.client.async.ClientCallback;
+import freenet.client.async.ClientContext;
+import freenet.client.async.ClientGetter;
+import freenet.client.events.ClientEvent;
+import freenet.client.events.ClientEventListener;
+import freenet.client.events.SendingToNetworkEvent;
+import freenet.client.events.SplitfileProgressEvent;
+import freenet.keys.FreenetURI;
+import freenet.node.RequestClient;
+import freenet.support.Logger;
+import freenet.support.api.Bucket;
+
+/** 
+ * Fetching a page for a browser.
+ * 
+ * LOCKING: The lock on this object is always taken last.
+ */
+public class FProxyFetchInProgress implements ClientEventListener, 
ClientCallback {
+       
+       /** The key we are fetching */
+       final FreenetURI uri;
+       /** The maximum size specified by the client */
+       final long maxSize;
+       /** Unique ID for the fetch */
+       private final long identifier;
+       /** Fetcher */
+       private final ClientGetter getter;
+       /** Any request which is waiting for a progress screen or data.
+        * We may want to wake requests separately in future. */
+       private final ArrayList<FProxyFetchWaiter> waiters;
+       private final ArrayList<FProxyFetchResult> results;
+       /** The data, if we have it */
+       private Bucket data;
+       /** Creation time */
+       private final long timeStarted;
+       /** Finished? */
+       private boolean finished;
+       /** Size, if known */
+       private long size;
+       /** MIME type, if known */
+       private String mimeType;
+       /** Gone to network? */
+       private boolean goneToNetwork;
+       /** Total blocks */
+       private int totalBlocks;
+       /** Required blocks */
+       private int requiredBlocks;
+       /** Fetched blocks */
+       private int fetchedBlocks;
+       /** Failed blocks */
+       private int failedBlocks;
+       /** Fatally failed blocks */
+       private int fatallyFailedBlocks;
+       /** Finalized the block set? */
+       private boolean finalizedBlocks;
+       /** Fetch failed */
+       private FetchException failed;
+       private boolean hasWaited;
+       /** Last time the fetch was accessed from the fproxy end */
+       private long lastTouched;
+       final FProxyFetchTracker tracker;
+       
+       public FProxyFetchInProgress(FProxyFetchTracker tracker, FreenetURI 
key, long maxSize2, long identifier, ClientContext context, FetchContext fctx, 
RequestClient rc) {
+               this.tracker = tracker;
+               this.uri = key;
+               this.maxSize = maxSize2;
+               this.timeStarted = System.currentTimeMillis();
+               this.identifier = identifier;
+               fctx = new FetchContext(fctx, FetchContext.IDENTICAL_MASK, 
false, null);
+               fctx.maxOutputLength = fctx.maxTempLength = maxSize;
+               fctx.eventProducer.addEventListener(this);
+               waiters = new ArrayList<FProxyFetchWaiter>();
+               results = new ArrayList<FProxyFetchResult>();
+               getter = new ClientGetter(this, uri, fctx, 
FProxyToadlet.PRIORITY, rc, null, null);
+       }
+       
+       public synchronized FProxyFetchWaiter getWaiter() {
+               lastTouched = System.currentTimeMillis();
+               FProxyFetchWaiter waiter = new FProxyFetchWaiter(this);
+               waiters.add(waiter);
+               return waiter;
+       }
+
+       synchronized FProxyFetchResult innerGetResult() {
+               lastTouched = System.currentTimeMillis();
+               FProxyFetchResult res;
+               if(data != null)
+                       res = new FProxyFetchResult(this, data, mimeType, 
timeStarted, goneToNetwork);
+               else
+                       res = new FProxyFetchResult(this, mimeType, size, 
timeStarted, goneToNetwork,
+                                       totalBlocks, requiredBlocks, 
fetchedBlocks, failedBlocks, fatallyFailedBlocks, finalizedBlocks, failed);
+               results.add(res);
+               return res;
+       }
+
+       public void start(ClientContext context) throws FetchException {
+               try {
+                       context.start(getter);
+               } catch (FetchException e) {
+                       synchronized(this) {
+                               this.failed = e;
+                               this.finished = true;
+                       }
+               }
+       }
+
+       public void onRemoveEventProducer(ObjectContainer container) {
+               // Impossible
+       }
+
+       public void receive(ClientEvent ce, ObjectContainer maybeContainer, 
ClientContext context) {
+               if(ce instanceof SplitfileProgressEvent) {
+                       SplitfileProgressEvent split = (SplitfileProgressEvent) 
ce;
+                       synchronized(this) {
+                               int oldReq = requiredBlocks - (fetchedBlocks + 
failedBlocks + fatallyFailedBlocks);
+                               totalBlocks = split.totalBlocks;
+                               fetchedBlocks = split.succeedBlocks;
+                               requiredBlocks = split.minSuccessfulBlocks;
+                               failedBlocks = split.failedBlocks;
+                               fatallyFailedBlocks = split.fatallyFailedBlocks;
+                               finalizedBlocks = split.finalizedTotal;
+                               int req = requiredBlocks - (fetchedBlocks + 
failedBlocks + fatallyFailedBlocks);
+                               if(!(req > 1024 && oldReq <= 1024)) return;
+                       }
+               } else if(ce instanceof SendingToNetworkEvent) {
+                       synchronized(this) {
+                               if(goneToNetwork) return;
+                               goneToNetwork = true;
+                       }
+               } else return;
+               wakeWaiters(false);
+       }
+
+       private void wakeWaiters(boolean finished) {
+               FProxyFetchWaiter[] waiting;
+               synchronized(this) {
+                       waiting = waiters.toArray(new 
FProxyFetchWaiter[waiters.size()]);
+               }
+               for(FProxyFetchWaiter w : waiting) {
+                       w.wakeUp(finished);
+               }
+       }
+
+       public void onFailure(FetchException e, ClientGetter state, 
ObjectContainer container) {
+               synchronized(this) {
+                       this.failed = e;
+                       this.finished = true;
+               }
+               wakeWaiters(true);
+       }
+
+       public void onFailure(InsertException e, BaseClientPutter state, 
ObjectContainer container) {
+               // Impossible
+       }
+
+       public void onFetchable(BaseClientPutter state, ObjectContainer 
container) {
+               // Impossible
+       }
+
+       public void onGeneratedURI(FreenetURI uri, BaseClientPutter state, 
ObjectContainer container) {
+               // Impossible
+       }
+
+       public void onMajorProgress(ObjectContainer container) {
+               // Ignore
+       }
+
+       public void onSuccess(FetchResult result, ClientGetter state, 
ObjectContainer container) {
+               synchronized(this) {
+                       this.data = result.asBucket();
+                       this.mimeType = result.getMimeType();
+                       this.finished = true;
+               }
+               wakeWaiters(true);
+       }
+
+       public void onSuccess(BaseClientPutter state, ObjectContainer 
container) {
+               // Ignore
+       }
+
+       public boolean hasData() {
+               return data != null;
+       }
+
+       public boolean finished() {
+               return finished;
+       }
+
+       public void close(FProxyFetchWaiter waiter) {
+               synchronized(this) {
+                       waiters.remove(waiter);
+                       if(!results.isEmpty()) return;
+                       if(!waiters.isEmpty()) return;
+               }
+               tracker.queueCancel(this);
+       }
+
+       /** Keep for 30 seconds after last access */
+       static final int LIFETIME = 30 * 1000;
+       
+       /** Caller should take the lock on FProxyToadlet.fetchers, then call 
this 
+        * function, if it returns true then finish the cancel outside the lock.
+        */
+       public synchronized boolean canCancel() {
+               if(!waiters.isEmpty()) return false;
+               if(!results.isEmpty()) return false;
+               if(lastTouched + LIFETIME >= System.currentTimeMillis()) {
+                       Logger.error(this, "Not able to cancel for "+this+" : 
"+uri+" : "+maxSize);
+                       return false;
+               }
+               Logger.error(this, "Can cancel for "+this+" : "+uri+" : 
"+maxSize);
+               return true;
+       }
+       
+       public void finishCancel() {
+               Logger.error(this, "Finishing cancel for "+this+" : "+uri+" : 
"+maxSize);
+               if(data != null) {
+                       try {
+                               data.free();
+                       } catch (Throwable t) {
+                               // Ensure we get to the next bit
+                               Logger.error(this, "Failed to free: "+t, t);
+                       }
+               }
+               try {
+                       getter.cancel();
+               } catch (Throwable t) {
+                       // Ensure we get to the next bit
+                       Logger.error(this, "Failed to cancel: "+t, t);
+               }
+       }
+
+       public void close(FProxyFetchResult result) {
+               synchronized(this) {
+                       results.remove(result);
+                       if(!results.isEmpty()) return;
+                       if(!waiters.isEmpty()) return;
+               }
+               tracker.queueCancel(this);
+       }
+}

Added: trunk/freenet/src/freenet/clients/http/FProxyFetchResult.java
===================================================================
--- trunk/freenet/src/freenet/clients/http/FProxyFetchResult.java               
                (rev 0)
+++ trunk/freenet/src/freenet/clients/http/FProxyFetchResult.java       
2009-04-21 17:32:03 UTC (rev 27162)
@@ -0,0 +1,83 @@
+package freenet.clients.http;
+
+import freenet.client.FetchException;
+import freenet.support.api.Bucket;
+
+/** The result of fproxy waiting for a fetch: It can either be the final data, 
or it
+ * can be the progress of the fetch so far.
+ * @author Matthew Toseland <[email protected]> (0xE43DA450)
+ */
+public class FProxyFetchResult {
+
+       /** If we have fetched the data, we know this. If we haven't, we might 
know it. */
+       final String mimeType;
+       
+       /** If we have fetched the data, we know this. If we haven't, we might 
know it. */
+       final long size;
+       
+       /** If we have fetched the data */
+       final Bucket data;
+       
+       /** If we have not fetched the data */
+       /** Creation time */
+       final long timeStarted;
+       /** Gone to network? */
+       final boolean goneToNetwork;
+       /** Total blocks */
+       final int totalBlocks;
+       /** Required blocks */
+       final int requiredBlocks;
+       /** Fetched blocks */
+       final int fetchedBlocks;
+       /** Failed blocks */
+       final int failedBlocks;
+       /** Fatally failed blocks */
+       final int fatallyFailedBlocks;
+       /** Finalized blocks? */
+       final boolean finalizedBlocks;
+       
+       /** Failed */
+       final FetchException failed;
+       
+       final FProxyFetchInProgress progress;
+
+       /** Constructor when we are returning the data */
+       FProxyFetchResult(FProxyFetchInProgress parent, Bucket data, String 
mimeType, long timeStarted, boolean goneToNetwork) {
+               this.data = data;
+               this.mimeType = mimeType;
+               this.size = data.size();
+               this.timeStarted = timeStarted;
+               this.goneToNetwork = goneToNetwork;
+               totalBlocks = requiredBlocks = fetchedBlocks = failedBlocks = 
fatallyFailedBlocks = 0;
+               finalizedBlocks = true;
+               failed = null;
+               this.progress = parent;
+       }
+
+       /** Constructor when we are not returning the data, because it is still 
running or it failed */
+       FProxyFetchResult(FProxyFetchInProgress parent, String mimeType, long 
size, long timeStarted, boolean goneToNetwork, int totalBlocks, int 
requiredBlocks, int fetchedBlocks, int failedBlocks, int fatallyFailedBlocks, 
boolean finalizedBlocks, FetchException failed) {
+               this.data = null;
+               this.mimeType = mimeType;
+               this.size = size;
+               this.timeStarted = timeStarted;
+               this.goneToNetwork = goneToNetwork;
+               this.totalBlocks = totalBlocks;
+               this.requiredBlocks = requiredBlocks;
+               this.fetchedBlocks = fetchedBlocks;
+               this.failedBlocks = failedBlocks;
+               this.fatallyFailedBlocks = fatallyFailedBlocks;
+               this.finalizedBlocks = finalizedBlocks;
+               this.failed = failed;
+               this.progress = parent;
+       }
+       
+       /** Must be called when fproxy has finished with the data */
+       public void close() {
+               progress.close(this);
+       }
+
+       public boolean hasData() {
+               return data != null;
+       }
+       
+}

Added: trunk/freenet/src/freenet/clients/http/FProxyFetchTracker.java
===================================================================
--- trunk/freenet/src/freenet/clients/http/FProxyFetchTracker.java              
                (rev 0)
+++ trunk/freenet/src/freenet/clients/http/FProxyFetchTracker.java      
2009-04-21 17:32:03 UTC (rev 27162)
@@ -0,0 +1,108 @@
+package freenet.clients.http;
+
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.Vector;
+
+import freenet.client.FetchContext;
+import freenet.client.FetchException;
+import freenet.client.async.ClientContext;
+import freenet.keys.FreenetURI;
+import freenet.node.RequestClient;
+import freenet.support.Logger;
+import freenet.support.MultiValueTable;
+
+public class FProxyFetchTracker implements Runnable {
+
+       final MultiValueTable<FreenetURI, FProxyFetchInProgress> fetchers;
+       final ClientContext context;
+       private long fetchIdentifiers;
+       private final FetchContext fctx;
+       private final RequestClient rc;
+       private boolean queuedJob;
+       private boolean requeue;
+
+       FProxyFetchTracker(ClientContext context, FetchContext fctx, 
RequestClient rc) {
+               fetchers = new MultiValueTable<FreenetURI, 
FProxyFetchInProgress>();
+               this.context = context;
+               this.fctx = fctx;
+               this.rc = rc;
+       }
+       
+       FProxyFetchWaiter makeFetcher(FreenetURI key, long maxSize) {
+               FProxyFetchInProgress progress;
+               /* LOCKING:
+                * Call getWaiter() inside the fetchers lock, since we will 
purge old 
+                * fetchers inside that lock, hence avoid a race condition. 
FetchInProgress 
+                * lock is always taken last. */
+               synchronized(fetchers) {
+                       if(fetchers.containsKey(key)) {
+                               Object[] check = fetchers.getArray(key);
+                               for(int i=0;i<check.length;i++) {
+                                       progress = (FProxyFetchInProgress) 
check[i];
+                                       if(progress.maxSize == maxSize 
+                                                       || progress.hasData()) 
return progress.getWaiter();
+                               }
+                       }
+                       progress = new FProxyFetchInProgress(this, key, 
maxSize, fetchIdentifiers++, context, fctx, rc);
+                       fetchers.put(key, progress);
+               }
+               try {
+                       progress.start(context);
+               } catch (FetchException e) {
+                       synchronized(fetchers) {
+                               fetchers.removeElement(key, progress);
+                       }
+               }
+               return progress.getWaiter();
+               // FIXME promote a fetcher when it is re-used
+               // FIXME get rid of fetchers over some age
+       }
+
+       public void queueCancel(FProxyFetchInProgress progress) {
+               Logger.error(this, "Queueing removal of old 
FProxyFetchInProgress's");
+               synchronized(this) {
+                       if(queuedJob) {
+                               requeue = true;
+                               return;
+                       }
+                       queuedJob = true;
+               }
+               context.ticker.queueTimedJob(this, 
FProxyFetchInProgress.LIFETIME);
+       }
+
+       public void run() {
+               Logger.error(this, "Removing old FProxyFetchInProgress's");
+               ArrayList<FProxyFetchInProgress> toRemove = null;
+               boolean needRequeue = false;
+               synchronized(fetchers) {
+                       if(requeue) {
+                               requeue = false;
+                               needRequeue = true;
+                       } else {
+                               queuedJob = false;
+                       }
+                       // Horrible hack, FIXME
+                       Enumeration e = fetchers.keys();
+                       while(e.hasMoreElements()) {
+                               FreenetURI uri = (FreenetURI) e.nextElement();
+                               // Really horrible hack, FIXME
+                               Vector<FProxyFetchInProgress> list = 
(Vector<FProxyFetchInProgress>) fetchers.iterateAll(uri);
+                               for(FProxyFetchInProgress f : list)
+                                       // FIXME remove on the fly, although 
cancel must wait
+                                       if(f.canCancel()) {
+                                               if(toRemove == null) toRemove = 
new ArrayList<FProxyFetchInProgress>();
+                                               toRemove.add(f);
+                                       }
+                       }
+                       for(FProxyFetchInProgress r : toRemove) {
+                               fetchers.removeElement(r.uri, r);
+                       }
+               }
+               for(FProxyFetchInProgress r : toRemove)
+                       r.finishCancel();
+               if(needRequeue)
+                       context.ticker.queueTimedJob(this, 
FProxyFetchInProgress.LIFETIME);
+       }
+
+}

Added: trunk/freenet/src/freenet/clients/http/FProxyFetchWaiter.java
===================================================================
--- trunk/freenet/src/freenet/clients/http/FProxyFetchWaiter.java               
                (rev 0)
+++ trunk/freenet/src/freenet/clients/http/FProxyFetchWaiter.java       
2009-04-21 17:32:03 UTC (rev 27162)
@@ -0,0 +1,42 @@
+package freenet.clients.http;
+
+/** An fproxy fetch which is stalled waiting for either the data or a progress 
screen. */
+public class FProxyFetchWaiter {
+       
+       public FProxyFetchWaiter(FProxyFetchInProgress progress2) {
+               this.progress = progress2;
+       }
+
+       private final FProxyFetchInProgress progress;
+       
+       private boolean hasWaited;
+       private boolean finished;
+       private boolean awoken;
+       
+       public FProxyFetchResult getResult() {
+               synchronized(this) {
+                       if(!(finished || hasWaited || awoken)) {
+                               awoken = false;
+                               try {
+                                       wait(2000);
+                               } catch (InterruptedException e) { 
+                                       // Not likely
+                               };
+                       }
+               }
+               return progress.innerGetResult();
+       }
+
+       public void close() {
+               progress.close(this);
+       }
+       
+       synchronized void wakeUp(boolean fin) {
+               if(fin)
+                       this.finished = true;
+               else
+                       this.awoken = true;
+               notifyAll();
+       }
+       
+}

Modified: trunk/freenet/src/freenet/clients/http/FProxyToadlet.java
===================================================================
--- trunk/freenet/src/freenet/clients/http/FProxyToadlet.java   2009-04-21 
16:13:07 UTC (rev 27161)
+++ trunk/freenet/src/freenet/clients/http/FProxyToadlet.java   2009-04-21 
17:32:03 UTC (rev 27162)
@@ -49,8 +49,7 @@
        private static byte[] random;
        final NodeClientCore core;
        final ClientContext context;
-       final MultiValueTable<FreenetURI, FProxyFetchInProgress> fetchers;
-       private long fetchIdentifiers;
+       final FProxyFetchTracker fetchTracker;
        
        private static FoundURICallback prefetchHook;
        static final Set<String> prefetchAllowedTypes = new HashSet<String>();
@@ -99,7 +98,7 @@
                                }
                                
                        };
-               fetchers = new MultiValueTable<FreenetURI, 
FProxyFetchInProgress>();
+               fetchTracker = new FProxyFetchTracker(context, 
getClientImpl().getFetchContext(), this);
        }
        
        @Override
@@ -493,17 +492,20 @@
 
                MultiValueTable<String,String> headers = ctx.getHeaders();
                String ua = headers.get("user-agent");
+               String accept = headers.get("accept");
                FProxyFetchResult fr = null;
-               if(isBrowser(ua)) {
-                       FProxyFetchInProgress fetch = makeFetcher(key, maxSize);
+               if(isBrowser(ua) && accept == null || 
accept.indexOf("text/html") > -1) {
+                       FProxyFetchWaiter fetch = fetchTracker.makeFetcher(key, 
maxSize);
                        while(true) {
                        fr = fetch.getResult();
                        if(fr.hasData()) {
                                data = fr.data;
                                mimeType = fr.mimeType;
+                               fetch.close(); // Not waiting any more, but 
still locked the results until sent
                                break;
                        } else if(fr.failed != null) {
                                fe = fr.failed;
+                               fetch.close(); // Not waiting any more, but 
still locked the results until sent
                                break;
                        } else {
                                // Still in progress
@@ -527,6 +529,7 @@
                                retHeaders.put("Refresh", "2; url="+location);
                                writeHTMLReply(ctx, 200, "OK", retHeaders, 
pageNode.generate());
                                fr.close();
+                               fetch.close();
                                return;
                        }
                        }
@@ -674,32 +677,6 @@
                }
        }
 
-       private FProxyFetchInProgress makeFetcher(FreenetURI key, long maxSize) 
{
-               FProxyFetchInProgress progress;
-               synchronized(fetchers) {
-                       if(fetchers.containsKey(key)) {
-                               Object[] check = fetchers.getArray(key);
-                               for(int i=0;i<check.length;i++) {
-                                       progress = (FProxyFetchInProgress) 
check[i];
-                                       if(progress.maxSize == maxSize 
-                                                       || progress.hasData()) 
return progress;
-                               }
-                       }
-                       progress = new FProxyFetchInProgress(key, maxSize, 
fetchIdentifiers++, context, getClientImpl(), this);
-                       fetchers.put(key, progress);
-               }
-               try {
-                       progress.start(context);
-               } catch (FetchException e) {
-                       synchronized(fetchers) {
-                               fetchers.removeElement(key, progress);
-                       }
-               }
-               return progress;
-               // FIXME promote a fetcher when it is re-used
-               // FIXME get rid of fetchers over some age
-       }
-
        private boolean isBrowser(String ua) {
                if(ua == null) return false;
                if(ua.indexOf("Mozilla/") > -1) return true;

Modified: trunk/freenet/src/freenet/support/MultiValueTable.java
===================================================================
--- trunk/freenet/src/freenet/support/MultiValueTable.java      2009-04-21 
16:13:07 UTC (rev 27161)
+++ trunk/freenet/src/freenet/support/MultiValueTable.java      2009-04-21 
17:32:03 UTC (rev 27162)
@@ -150,7 +150,7 @@
                        return table.keys();
                }
     }
-
+    
     public Enumeration<V> elements() {
                synchronized (table) {
                        if (table.isEmpty())

_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to